Qt
Internal/Contributor docs for the Qt SDK. Note: These are NOT official API docs; those are found at https://doc.qt.io/
Loading...
Searching...
No Matches
encode_pdf_filter.py
Go to the documentation of this file.
1#!/usr/bin/env python3
2# Copyright 2019 The PDFium Authors
3# Use of this source code is governed by a BSD-style license that can be
4# found in the LICENSE file.
5"""Encodes binary data using one or more PDF stream filters.
6
7This tool helps with the common task of converting binary data into ASCII PDF
8streams. In test PDFs (and the corresponding .in files), we often want the
9contents to be plain (or mostly plain) ASCII.
10
11Requires Python 3 (mainly for Ascii85 support). This should be fine for a
12manually-run script.
13"""
14
15import argparse
16from abc import ABCMeta, abstractmethod
17import base64
18import collections
19import collections.abc
20import io
21import sys
22import zlib
23
24
25class _PdfStream(metaclass=ABCMeta):
26 _unique_filter_classes = []
27 _filter_classes = {}
28
29 @classmethod
30 @property
31 @abstractmethod
32 def name(cls):
33 pass
34
35 @classmethod
36 @property
37 @abstractmethod
38 def aliases(cls):
39 pass
40
41 @staticmethod
42 def GetFilterByName(name):
43 # Tolerate any case-insensitive match for "/Name" or "Name", or an alias.
44 key_name = name.lower()
45 if key_name and key_name[0] == '/':
46 key_name = key_name[:1]
47
48 filter_class = _PdfStream._filter_classes.get(key_name)
49 if not filter_class:
50 raise KeyError(name)
51
52 return filter_class
53
54 @classmethod
55 def Register(cls):
56 assert cls not in _PdfStream._unique_filter_classes
57 _PdfStream._unique_filter_classes.append(cls)
58 cls.RegisterByName()
60
61 @classmethod
63 assert cls.namename[0] == '/'
64 lower_name = cls.namename.lower()
65 _PdfStream._filter_classes[lower_name] = cls
66 _PdfStream._filter_classes[lower_name[1:]] = cls
67
68 @classmethod
70 for alias in cls.aliasesaliases:
71 _PdfStream._filter_classes[alias.lower()] = cls
72
73 @staticmethod
74 def GetHelp():
75 text = 'Available filters:\n'
76 for filter_class in _PdfStream._unique_filter_classes:
77 text += ' {} (aliases: {})\n'.format(filter_class.name,
78 ', '.join(filter_class.aliases))
79 return text
80
81 @classmethod
82 def AddEntries(cls, entries):
83 _PdfStream.AddListEntry(entries, 'Filter', cls.namename)
84
85 @staticmethod
86 def AddListEntry(entries, key, value):
87 old_value = entries.get(key)
88 if old_value is None:
89 entries[key] = value
90 else:
91 if not isinstance(old_value, collections.abc.MutableSequence):
92 old_value = [old_value]
93 entries[key] = old_value
94 old_value.append(value)
95
96 def __init__(self, out_buffer, **kwargs):
97 del kwargs
98 self.buffer = out_buffer
99
100 def write(self, data):
101 self.buffer.write(data)
102
103 def flush(self):
104 self.buffer.flush()
105
106 def close(self):
107 self.buffer.close()
108
109
111
112 def __init__(self):
113 super().__init__(io.BytesIO())
114
115 @classmethod
116 @property
117 def name(cls):
118 # Return an invalid name, so as to ensure _SinkPdfStream.Register()
119 # cannot be called. This method has to be implemented, because this
120 # script create `_SinkPdfStream` instances.
121 return ''
122
123 @classmethod
124 @property
125 def aliases(cls):
126 # Return an invalid aliases, so as to ensure _SinkPdfStream.Register()
127 # cannot be called. This method has to be implemented, because this
128 # script create `_SinkPdfStream` instances.
129 return ()
130
131 def close(self):
132 # Don't call io.BytesIO.close(); this deallocates the written data.
133 self.flush()
134
135 def getbuffer(self):
136 return self.buffer.getbuffer()
137
138
140
141 def __init__(self, out_buffer, wrapcol=0, **kwargs):
142 super().__init__(out_buffer, **kwargs)
143 self.wrapcolwrapcol = wrapcol
144 self.column = 0
145
146 @classmethod
147 @property
148 @abstractmethod
149 def name(cls):
150 pass
151
152 @classmethod
153 @property
154 @abstractmethod
155 def aliases(cls):
156 pass
157
158 def write(self, data):
159 if not self.wrapcolwrapcol:
160 self.buffer.write(data)
161 return
162
163 tail = self.wrapcolwrapcol - self.column
164 self.buffer.write(data[:tail])
165 if tail >= len(data):
166 self.column += len(data)
167 return
168
169 for start in range(tail, len(data), self.wrapcolwrapcol):
170 self.buffer.write(b'\n')
171 self.buffer.write(data[start:start + self.wrapcolwrapcol])
172
173 tail = len(data) - tail
174 self.column = self.wrapcolwrapcol - -tail % self.wrapcolwrapcol
175
176
178 _name = '/ASCII85Decode'
179 _aliases = ('ascii85', 'base85')
180
181 @classmethod
182 @property
183 def name(cls):
184 return cls._name
185
186 @classmethod
187 @property
188 def aliases(cls):
189 return cls._aliases
190
191 def __init__(self, out_buffer, **kwargs):
192 super().__init__(out_buffer, **kwargs)
193 self.trailer = b''
194
195 def write(self, data):
196 # Need to write ASCII85 in units of 4.
197 data = self.trailer + data
198 trailer_length = len(data) % 4
199 super().write(base64.a85encode(data[:-trailer_length]))
200 self.trailer = data[-trailer_length:]
201
202 def close(self):
203 super().write(base64.a85encode(self.trailer))
204 # Avoid breaking the end-of-data marker (but still try to wrap).
205 if self.wrapcolwrapcol and self.column > self.wrapcolwrapcol - 2:
206 self.buffer.write(b'\n')
207 self.buffer.write(b'~>')
208 self.buffer.close()
209
210
212 _name = '/ASCIIHexDecode'
213 _aliases = ('base16', 'hex', 'hexadecimal')
214
215 @classmethod
216 @property
217 def name(cls):
218 return cls._name
219
220 @classmethod
221 @property
222 def aliases(cls):
223 return cls._aliases
224
225 def __init__(self, out_buffer, **kwargs):
226 super().__init__(out_buffer, **kwargs)
227
228 def write(self, data):
229 super().write(base64.b16encode(data))
230
231
233 _name = '/FlateDecode'
234 _aliases = ('deflate', 'flate', 'zlib')
235
236 def __init__(self, out_buffer, **kwargs):
237 super().__init__(out_buffer, **kwargs)
238 self.deflate = zlib.compressobj(level=9, memLevel=9)
239
240 @classmethod
241 @property
242 def name(cls):
243 return cls._name
244
245 @classmethod
246 @property
247 def aliases(cls):
248 return cls._aliases
249
250 def write(self, data):
251 self.buffer.write(self.deflate.compress(data))
252
253 def flush(self):
254 self.buffer.write(self.deflate.flush(zlib.Z_NO_FLUSH))
255
256 def close(self):
257 self.buffer.write(self.deflate.flush())
258 self.buffer.close()
259
260
262
263 @classmethod
264 @property
265 @abstractmethod
266 def name(cls):
267 pass
268
269 @classmethod
270 @property
271 @abstractmethod
272 def aliases(cls):
273 pass
274
275 @classmethod
277 pass
278
279 @classmethod
280 def AddEntries(cls, entries):
281 pass
282
283
284class _PassthroughPdfStream(_VirtualPdfStream):
285 _name = '(virtual) passthrough'
286 _aliases = ('noop', 'passthrough')
287
288 @classmethod
289 @property
290 def name(cls):
291 return cls._name
292
293 @classmethod
294 @property
295 def aliases(cls):
296 return cls._aliases
297
298
300 _name = '(virtual) PNG IDAT'
301 _aliases = ('png',)
302
303 _EXPECT_HEADER = -1
304 _EXPECT_LENGTH = -2
305 _EXPECT_CHUNK_TYPE = -3
306 _EXPECT_CRC = -4
307
308 _PNG_HEADER = 0x89504E470D0A1A0A
309 _PNG_CHUNK_IDAT = 0x49444154
310
311 @classmethod
312 @property
313 def name(cls):
314 return cls._name
315
316 @classmethod
317 @property
318 def aliases(cls):
319 return cls._aliases
320
321 @classmethod
322 def AddEntries(cls, entries):
323 # Technically only true for compression method 0 (zlib), but no other
324 # methods have been standardized.
325 _PdfStream.AddListEntry(entries, 'Filter', '/FlateDecode')
326
327 def __init__(self, out_buffer, **kwargs):
328 super().__init__(out_buffer, **kwargs)
329 self.chunk = _PngIdatPdfStream._EXPECT_HEADER
330 self.remaining = 8
331 self.accumulator = 0
332 self.length = 0
333
334 def write(self, data):
335 position = 0
336 while position < len(data):
337 if self.chunk >= 0:
338 # Only pass through IDAT chunk data.
339 read_size = min(self.remaining, len(data) - position)
340 if self.chunk == _PngIdatPdfStream._PNG_CHUNK_IDAT:
341 self.buffer.write(data[position:position + read_size])
342 self.remaining -= read_size
343 if self.remaining == 0:
344 self.ResetAccumulator(_PngIdatPdfStream._EXPECT_CRC, 4)
345 position += read_size
346 else:
347 # As far as we're concerned, PNG files are just a header followed by a
348 # series of (length, chunk type, data[length], CRC) chunks.
349 if self.AccumulateByte(data[position]):
350 if self.chunk == _PngIdatPdfStream._EXPECT_HEADER:
351 if self.accumulator != _PngIdatPdfStream._PNG_HEADER:
352 raise ValueError('Invalid PNG header', self.accumulator)
353 self.ResetAccumulator(_PngIdatPdfStream._EXPECT_LENGTH, 4)
354 elif self.chunk == _PngIdatPdfStream._EXPECT_LENGTH:
355 self.length = self.accumulator
356 self.ResetAccumulator(_PngIdatPdfStream._EXPECT_CHUNK_TYPE, 4)
357 elif self.chunk == _PngIdatPdfStream._EXPECT_CHUNK_TYPE:
358 self.ResetAccumulator(self.accumulator, self.length)
359 elif self.chunk == _PngIdatPdfStream._EXPECT_CRC:
360 # Don't care if the CRC is correct.
361 self.ResetAccumulator(_PngIdatPdfStream._EXPECT_LENGTH, 4)
362 position += 1
363
364 def ResetAccumulator(self, chunk, remaining):
365 self.chunk = chunk
366 self.remaining = remaining
367 self.accumulator = 0
368
369 def AccumulateByte(self, byte):
370 assert self.remaining > 0
371 self.accumulator = self.accumulator << 8 | byte
372 self.remaining -= 1
373 return self.remaining == 0
374
375
376_Ascii85DecodePdfStream.Register()
377_AsciiHexDecodePdfStream.Register()
378_FlateDecodePdfStream.Register()
379_PassthroughPdfStream.Register()
380_PngIdatPdfStream.Register()
381
382_DEFAULT_FILTERS = (_Ascii85DecodePdfStream, _FlateDecodePdfStream)
383
384
386 arg_parser = argparse.ArgumentParser(
387 description='Encodes binary data using one or more PDF stream filters.',
388 epilog=_PdfStream.GetHelp(),
389 formatter_class=argparse.RawDescriptionHelpFormatter)
390 arg_parser.add_argument(
391 '-r',
392 '--raw',
393 action='store_true',
394 help='output raw bytes (no PDF stream header or trailer)')
395 arg_parser.add_argument(
396 '-l',
397 '--length',
398 action='store_true',
399 help='output actual /Length, instead of {{streamlen}}')
400 arg_parser.add_argument(
401 '-w',
402 '--wrap',
403 default=80,
404 type=int,
405 help='wrap ASCII lines at COLUMN; defaults to 80 (0 = off)',
406 metavar='COLUMN')
407 arg_parser.add_argument(
408 '-f',
409 '--filter',
410 action='append',
411 type=_PdfStream.GetFilterByName,
412 help=('one or more filters, in decoding order; defaults to ' + ' '.join(
413 [f.name for f in _DEFAULT_FILTERS])),
414 metavar='NAME')
415 arg_parser.add_argument(
416 'infile',
417 nargs='?',
418 default=sys.stdin,
419 type=argparse.FileType('r'),
420 help='input file; use - for standard input (default)')
421 arg_parser.add_argument(
422 'outfile',
423 nargs='?',
424 default=sys.stdout,
425 type=argparse.FileType('w'),
426 help='output file; use - for standard output (default)')
427 args = arg_parser.parse_intermixed_args(argv)
428 args.filter = args.filter or _DEFAULT_FILTERS
429 assert args.wrap >= 0, '--wrap COLUMN must be non-negative'
430 return args
431
432
433def _WrapWithFilters(out_buffer, filter_classes, **kwargs):
434 for filter_class in filter_classes:
435 out_buffer = filter_class(out_buffer, **kwargs)
436 return out_buffer
437
438
439def _CopyBytes(in_buffer, out_buffer):
440 data = bytearray(io.DEFAULT_BUFFER_SIZE)
441 while True:
442 data_length = in_buffer.readinto(data)
443 if not data_length:
444 return
445 out_buffer.write(data[:data_length])
446
447
449 data,
450 entries,
451 raw=False,
452 use_streamlen=False):
453 if not raw:
454 out_buffer.write(b'<<\n')
455 entries['Length'] = len(data)
456 for k, v in entries.items():
457 v = _EncodePdfValue(v)
458 if k == 'Length' and use_streamlen:
459 out_buffer.write(b' {{streamlen}}\n')
460 else:
461 out_buffer.write(' /{} {}\n'.format(k, v).encode('ascii'))
462 out_buffer.write(b'>>\nstream\n')
463
464 out_buffer.write(data)
465
466 if not raw:
467 if data and data[-1] != '\n':
468 out_buffer.write(b'\n')
469 out_buffer.write(b'endstream\n')
470
471
473 if isinstance(value, collections.abc.MutableSequence):
474 value = '[' + ' '.join(value) + ']'
475 return value
476
477
478def main(argv):
479 args = _ParseCommandLine(argv)
480
481 encoded_sink = _SinkPdfStream()
482 with args.infile:
483 out_buffer = _WrapWithFilters(encoded_sink, args.filter, wrapcol=args.wrap)
484 _CopyBytes(args.infile.buffer, out_buffer)
485 out_buffer.close()
486
487 entries = collections.OrderedDict()
488 for f in args.filter:
489 f.AddEntries(entries)
491 args.outfile.buffer,
492 data=encoded_sink.getbuffer(),
493 entries=entries,
494 raw=args.raw,
495 use_streamlen=not args.length)
496 return args.outfile.close()
497
498
499if __name__ == '__main__':
500 sys.exit(main(sys.argv[1:]))
__init__(self, out_buffer, wrapcol=0, **kwargs)
int main()
[0]
_WritePdfStreamObject(out_buffer, data, entries, raw=False, use_streamlen=False)
_WrapWithFilters(out_buffer, filter_classes, **kwargs)
_CopyBytes(in_buffer, out_buffer)
#define encode(x)
QByteArray bytearray
[3]