5"""Encodes binary data using one or more PDF stream filters.
7This tool helps with the common task of converting binary data into ASCII PDF
8streams. In test PDFs (and the corresponding .in files), we often want the
9contents to be plain (or mostly plain) ASCII.
11Requires Python 3 (mainly for Ascii85 support). This should be fine for a
16from abc
import ABCMeta, abstractmethod
26 _unique_filter_classes = []
44 key_name = name.lower()
45 if key_name
and key_name[0] ==
'/':
46 key_name = key_name[:1]
48 filter_class = _PdfStream._filter_classes.get(key_name)
56 assert cls
not in _PdfStream._unique_filter_classes
57 _PdfStream._unique_filter_classes.append(cls)
65 _PdfStream._filter_classes[lower_name] = cls
66 _PdfStream._filter_classes[lower_name[1:]] = cls
71 _PdfStream._filter_classes[alias.lower()] = cls
75 text =
'Available filters:\n'
76 for filter_class
in _PdfStream._unique_filter_classes:
77 text +=
' {} (aliases: {})\n'.format(filter_class.name,
78 ', '.join(filter_class.aliases))
83 _PdfStream.AddListEntry(entries,
'Filter', cls.
namename)
87 old_value = entries.get(key)
91 if not isinstance(old_value, collections.abc.MutableSequence):
92 old_value = [old_value]
93 entries[key] = old_value
94 old_value.append(value)
141 def __init__(self, out_buffer, wrapcol=0, **kwargs):
142 super().
__init__(out_buffer, **kwargs)
165 if tail >= len(data):
173 tail = len(data) - tail
178 _name =
'/ASCII85Decode'
179 _aliases = (
'ascii85',
'base85')
192 super().
__init__(out_buffer, **kwargs)
198 trailer_length = len(data) % 4
199 super().
write(base64.a85encode(data[:-trailer_length]))
200 self.
trailer = data[-trailer_length:]
212 _name =
'/ASCIIHexDecode'
213 _aliases = (
'base16',
'hex',
'hexadecimal')
226 super().
__init__(out_buffer, **kwargs)
229 super().
write(base64.b16encode(data))
233 _name =
'/FlateDecode'
234 _aliases = (
'deflate',
'flate',
'zlib')
237 super().
__init__(out_buffer, **kwargs)
238 self.
deflate = zlib.compressobj(level=9, memLevel=9)
284class _PassthroughPdfStream(_VirtualPdfStream):
285 _name =
'(virtual) passthrough'
286 _aliases = (
'noop',
'passthrough')
300 _name =
'(virtual) PNG IDAT'
305 _EXPECT_CHUNK_TYPE = -3
308 _PNG_HEADER = 0x89504E470D0A1A0A
309 _PNG_CHUNK_IDAT = 0x49444154
325 _PdfStream.AddListEntry(entries,
'Filter',
'/FlateDecode')
328 super().
__init__(out_buffer, **kwargs)
329 self.
chunk = _PngIdatPdfStream._EXPECT_HEADER
336 while position < len(data):
339 read_size = min(self.
remaining, len(data) - position)
340 if self.
chunk == _PngIdatPdfStream._PNG_CHUNK_IDAT:
341 self.
buffer.
write(data[position:position + read_size])
345 position += read_size
350 if self.
chunk == _PngIdatPdfStream._EXPECT_HEADER:
351 if self.
accumulator != _PngIdatPdfStream._PNG_HEADER:
352 raise ValueError(
'Invalid PNG header', self.
accumulator)
354 elif self.
chunk == _PngIdatPdfStream._EXPECT_LENGTH:
357 elif self.
chunk == _PngIdatPdfStream._EXPECT_CHUNK_TYPE:
359 elif self.
chunk == _PngIdatPdfStream._EXPECT_CRC:
376_Ascii85DecodePdfStream.Register()
377_AsciiHexDecodePdfStream.Register()
378_FlateDecodePdfStream.Register()
379_PassthroughPdfStream.Register()
380_PngIdatPdfStream.Register()
382_DEFAULT_FILTERS = (_Ascii85DecodePdfStream, _FlateDecodePdfStream)
386 arg_parser = argparse.ArgumentParser(
387 description=
'Encodes binary data using one or more PDF stream filters.',
388 epilog=_PdfStream.GetHelp(),
389 formatter_class=argparse.RawDescriptionHelpFormatter)
390 arg_parser.add_argument(
394 help=
'output raw bytes (no PDF stream header or trailer)')
395 arg_parser.add_argument(
399 help=
'output actual /Length, instead of {{streamlen}}')
400 arg_parser.add_argument(
405 help=
'wrap ASCII lines at COLUMN; defaults to 80 (0 = off)',
407 arg_parser.add_argument(
411 type=_PdfStream.GetFilterByName,
412 help=(
'one or more filters, in decoding order; defaults to ' +
' '.join(
413 [f.name
for f
in _DEFAULT_FILTERS])),
415 arg_parser.add_argument(
419 type=argparse.FileType(
'r'),
420 help=
'input file; use - for standard input (default)')
421 arg_parser.add_argument(
425 type=argparse.FileType(
'w'),
426 help=
'output file; use - for standard output (default)')
427 args = arg_parser.parse_intermixed_args(argv)
428 args.filter = args.filter
or _DEFAULT_FILTERS
429 assert args.wrap >= 0,
'--wrap COLUMN must be non-negative'
434 for filter_class
in filter_classes:
435 out_buffer = filter_class(out_buffer, **kwargs)
442 data_length = in_buffer.readinto(data)
445 out_buffer.write(data[:data_length])
452 use_streamlen=False):
454 out_buffer.write(b
'<<\n')
455 entries[
'Length'] = len(data)
456 for k, v
in entries.items():
458 if k ==
'Length' and use_streamlen:
459 out_buffer.write(b
' {{streamlen}}\n')
461 out_buffer.write(
' /{} {}\n'.format(k, v).
encode(
'ascii'))
462 out_buffer.write(b
'>>\nstream\n')
464 out_buffer.write(data)
467 if data
and data[-1] !=
'\n':
468 out_buffer.write(b
'\n')
469 out_buffer.write(b
'endstream\n')
473 if isinstance(value, collections.abc.MutableSequence):
474 value =
'[' +
' '.join(value) +
']'
487 entries = collections.OrderedDict()
488 for f
in args.filter:
489 f.AddEntries(entries)
492 data=encoded_sink.getbuffer(),
495 use_streamlen=
not args.length)
496 return args.outfile.close()
499if __name__ ==
'__main__':
500 sys.exit(
main(sys.argv[1:]))