| Home | Trees | Indices | Help |
|
|---|
|
|
1 """pcap Next Generation file format"""
2 # Spec: https://pcapng.github.io/pcapng/
3
4 # pylint: disable=no-member
5 # pylint: disable=attribute-defined-outside-init
6 from __future__ import print_function
7 from __future__ import absolute_import
8
9 from struct import pack as struct_pack, unpack as struct_unpack
10 from time import time
11 import sys
12
13 from . import dpkt
14 from .compat import BytesIO
15
16 BYTE_ORDER_MAGIC = 0x1A2B3C4D
17 BYTE_ORDER_MAGIC_LE = 0x4D3C2B1A
18
19 PCAPNG_VERSION_MAJOR = 1
20 PCAPNG_VERSION_MINOR = 0
21
22 # Block types
23 PCAPNG_BT_IDB = 0x00000001 # Interface Description Block
24 PCAPNG_BT_PB = 0x00000002 # Packet Block (deprecated)
25 PCAPNG_BT_SPB = 0x00000003 # Simple Packet Block
26 PCAPNG_BT_EPB = 0x00000006 # Enhanced Packet Block
27 PCAPNG_BT_SHB = 0x0A0D0D0A # Section Header Block
28
29 # Options
30 PCAPNG_OPT_ENDOFOPT = 0 # end of options
31 PCAPNG_OPT_COMMENT = 1 # comment
32
33 # SHB options
34 PCAPNG_OPT_SHB_HARDWARE = 2 # description of the hardware
35 PCAPNG_OPT_SHB_OS = 3 # name of the operating system
36 PCAPNG_OPT_SHB_USERAPPL = 4 # name of the application
37
38 # IDB options
39 PCAPNG_OPT_IF_NAME = 2 # interface name
40 PCAPNG_OPT_IF_DESCRIPTION = 3 # interface description
41 PCAPNG_OPT_IF_IPV4ADDR = 4 # IPv4 network address and netmask for the interface
42 PCAPNG_OPT_IF_IPV6ADDR = 5 # IPv6 network address and prefix length for the interface
43 PCAPNG_OPT_IF_MACADDR = 6 # interface hardware MAC address
44 PCAPNG_OPT_IF_EUIADDR = 7 # interface hardware EUI address
45 PCAPNG_OPT_IF_SPEED = 8 # interface speed in bits/s
46 PCAPNG_OPT_IF_TSRESOL = 9 # timestamp resolution
47 PCAPNG_OPT_IF_TZONE = 10 # time zone
48 PCAPNG_OPT_IF_FILTER = 11 # capture filter
49 PCAPNG_OPT_IF_OS = 12 # operating system
50 PCAPNG_OPT_IF_FCSLEN = 13 # length of the Frame Check Sequence in bits
51 PCAPNG_OPT_IF_TSOFFSET = 14 # offset (in seconds) that must be added to packet timestamp
52
53 # <copied from pcap.py>
54 DLT_NULL = 0
55 DLT_EN10MB = 1
56 DLT_EN3MB = 2
57 DLT_AX25 = 3
58 DLT_PRONET = 4
59 DLT_CHAOS = 5
60 DLT_IEEE802 = 6
61 DLT_ARCNET = 7
62 DLT_SLIP = 8
63 DLT_PPP = 9
64 DLT_FDDI = 10
65 DLT_PFSYNC = 18
66 DLT_IEEE802_11 = 105
67 DLT_LINUX_SLL = 113
68 DLT_PFLOG = 117
69 DLT_IEEE802_11_RADIO = 127
70
71 if sys.platform.find('openbsd') != -1:
72 DLT_LOOP = 12
73 DLT_RAW = 14
74 else:
75 DLT_LOOP = 108
76 DLT_RAW = 12
77
78 dltoff = {DLT_NULL: 4, DLT_EN10MB: 14, DLT_IEEE802: 22, DLT_ARCNET: 6,
79 DLT_SLIP: 16, DLT_PPP: 4, DLT_FDDI: 21, DLT_PFLOG: 48, DLT_PFSYNC: 4,
80 DLT_LOOP: 4, DLT_LINUX_SLL: 16}
87
90 """Return int `i` aligned to the 32-bit boundary"""
91 r = i % 4
92 return i if not r else i + 4 - r
93
96 """Return bytes `s` padded with zeroes to align to the 32-bit boundary"""
97 return struct_pack('%ss' % _align32b(len(s)), s)
98
101 """Return size of padding required to align str `s` to the 32-bit boundary"""
102 return _align32b(len(s)) - len(s)
103
106
107 """Base class for a pcapng block with Options"""
108
109 __hdr__ = (
110 ('type', 'I', 0), # block type
111 ('len', 'I', 12), # block total length: total size of this block, in octets
112 #( body, variable size )
113 ('_len', 'I', 12), # dup of len
114 )
115
118
120 dpkt.Packet.unpack(self, buf)
121 if self.len > len(buf):
122 raise dpkt.NeedData
123 self._do_unpack_options(buf)
124
126 self.opts = []
127 self.data = ''
128 oo = oo or self.__hdr_len__ - 4 # options offset
129 ol = self.len - oo - 4 # length
130
131 opts_buf = buf[oo:oo + ol]
132 while opts_buf:
133 opt = (PcapngOptionLE(opts_buf) if self.__hdr_fmt__[0] == '<'
134 else PcapngOption(opts_buf))
135 self.opts.append(opt)
136
137 opts_buf = opts_buf[len(opt):]
138 if opt.code == PCAPNG_OPT_ENDOFOPT:
139 break
140
141 # duplicate total length field
142 self._len = struct_unpack(self.__hdr_fmt__[0] + 'I', buf[-4:])[0]
143 if self._len != self.len:
144 raise dpkt.UnpackError('length fields do not match')
145
147 if not getattr(self, 'opts', None):
148 return b''
149 if self.opts[-1].code != PCAPNG_OPT_ENDOFOPT:
150 raise dpkt.PackError('options must end with opt_endofopt')
151 return b''.join(bytes(o) for o in self.opts)
152
154 opts_buf = self._do_pack_options()
155 self.len = self._len = self.__hdr_len__ + len(opts_buf)
156
157 hdr_buf = dpkt.Packet.pack_hdr(self)
158 return hdr_buf[:-4] + opts_buf + hdr_buf[-4:]
159
161 if not getattr(self, 'opts', None):
162 return self.__hdr_len__
163
164 opts_len = sum(len(o) for o in self.opts)
165 return self.__hdr_len__ + opts_len
166
169 __byte_order__ = '<'
170
173
174 """A single Option"""
175
176 __hdr__ = (
177 ('code', 'H', PCAPNG_OPT_ENDOFOPT),
178 ('len', 'H', 0),
179 )
180
182 dpkt.Packet.unpack(self, buf)
183 self.data = buf[self.__hdr_len__:self.__hdr_len__ + self.len]
184
185 # decode comment
186 if self.code == PCAPNG_OPT_COMMENT:
187 self.text = self.data.decode('utf-8')
188
189
191 #return dpkt.Packet.__bytes__(self)
192 # encode comment
193 if self.code == PCAPNG_OPT_COMMENT:
194 text = getattr(self, 'text', self.data)
195
196 self.data = text.encode('utf-8') if not isinstance(text, bytes) else text
197
198 self.len = len(self.data)
199 return dpkt.Packet.pack_hdr(self) + _padded(self.data)
200
203
205 if self.code == PCAPNG_OPT_ENDOFOPT:
206 return '{0}(opt_endofopt)'.format(self.__class__.__name__)
207 else:
208 return dpkt.Packet.__repr__(self)
209
212 __byte_order__ = '<'
213
216
217 """Section Header block"""
218
219 __hdr__ = (
220 ('type', 'I', PCAPNG_BT_SHB),
221 ('len', 'I', 28),
222 ('bom', 'I', BYTE_ORDER_MAGIC),
223 ('v_major', 'H', PCAPNG_VERSION_MAJOR),
224 ('v_minor', 'H', PCAPNG_VERSION_MINOR),
225 ('sec_len', 'q', -1), # section length, -1 = auto
226 #( options, variable size )
227 ('_len', 'I', 28)
228 )
229
232 __byte_order__ = '<'
233
236
237 """Interface Description block"""
238
239 __hdr__ = (
240 ('type', 'I', PCAPNG_BT_IDB),
241 ('len', 'I', 20),
242 ('linktype', 'H', DLT_EN10MB),
243 ('_reserved', 'H', 0),
244 ('snaplen', 'I', 1500),
245 #( options, variable size )
246 ('_len', 'I', 20)
247 )
248
251 __byte_order__ = '<'
252
255
256 """Enhanced Packet block"""
257
258 __hdr__ = (
259 ('type', 'I', PCAPNG_BT_EPB),
260 ('len', 'I', 64),
261 ('iface_id', 'I', 0),
262 ('ts_high', 'I', 0), # timestamp high
263 ('ts_low', 'I', 0), # timestamp low
264 ('caplen', 'I', 0), # captured len, size of pkt_data
265 ('pkt_len', 'I', 0), # actual packet len
266 #( pkt_data, variable size )
267 #( options, variable size )
268 ('_len', 'I', 64)
269 )
270
272 dpkt.Packet.unpack(self, buf)
273 if self.len > len(buf):
274 raise dpkt.NeedData
275
276 # packet data
277 po = self.__hdr_len__ - 4 # offset of pkt_data
278 self.pkt_data = buf[po:po + self.caplen]
279
280 # skip padding between pkt_data and options
281 opts_offset = po + _align32b(self.caplen)
282 self._do_unpack_options(buf, opts_offset)
283
285 pkt_buf = self.pkt_data
286 self.caplen = self.pkt_len = len(pkt_buf)
287
288 opts_buf = self._do_pack_options()
289 self.len = self._len = self.__hdr_len__ + _align32b(self.caplen) + len(opts_buf)
290
291 hdr_buf = dpkt.Packet.pack_hdr(self)
292 return hdr_buf[:-4] + _padded(pkt_buf) + opts_buf + hdr_buf[-4:]
293
297
300 __byte_order__ = '<'
301
304
305 """Simple pcapng dumpfile writer."""
306
308 """
309 Create a pcapng dumpfile writer for the given fileobj.
310
311 shb can be an instance of SectionHeaderBlock(LE)
312 idb can be an instance of InterfaceDescriptionBlock(LE)
313 """
314 self.__f = fileobj
315 self.__le = sys.byteorder == 'little'
316
317 if shb:
318 self._validate_block('shb', shb, SectionHeaderBlock)
319 if idb:
320 self._validate_block('idb', idb, InterfaceDescriptionBlock)
321
322 if self.__le:
323 shb = shb or SectionHeaderBlockLE()
324 idb = idb or InterfaceDescriptionBlockLE(snaplen=snaplen, linktype=linktype)
325 else:
326 shb = shb or SectionHeaderBlock()
327 idb = idb or InterfaceDescriptionBlock(snaplen=snaplen, linktype=linktype)
328
329 self.__f.write(bytes(shb))
330 self.__f.write(bytes(idb))
331
333 """Check a user-defined block for correct type and endianness"""
334 if not isinstance(blk, expected_cls):
335 raise ValueError('{0}: expecting class {1}'.format(
336 arg_name, expected_cls.__name__))
337
338 if self.__le and blk.__hdr_fmt__[0] == '>':
339 raise ValueError('{0}: expecting class {1}LE on a little-endian system'.format(
340 arg_name, expected_cls.__name__))
341
342 if not self.__le and blk.__hdr_fmt__[0] == '<':
343 raise ValueError('{0}: expecting class {1} on a big-endian system'.format(
344 arg_name, expected_cls.__name__.replace('LE', '')))
345
347 """
348 Write a single packet with its timestamp.
349
350 pkt can be a buffer or an instance of EnhancedPacketBlock(LE)
351 ts is a Unix timestamp in seconds since Epoch (e.g. 1454725786.99)
352 """
353 if isinstance(pkt, EnhancedPacketBlock):
354 self._validate_block('pkt', pkt, EnhancedPacketBlock)
355
356 if ts is not None: # ts as an argument gets precedence
357 ts = int(round(ts * 1e6))
358 elif pkt.ts_high == pkt.ts_low == 0:
359 ts = int(round(time() * 1e6))
360
361 if ts is not None:
362 pkt.ts_high = ts >> 32
363 pkt.ts_low = ts & 0xffffffff
364
365 self.__f.write(bytes(pkt))
366 return
367
368 # pkt is a buffer - wrap it into an EPB
369 if ts is None:
370 ts = time()
371 ts = int(round(ts * 1e6)) # to int microseconds
372
373 s = bytes(pkt)
374 n = len(s)
375
376 kls = EnhancedPacketBlockLE if self.__le else EnhancedPacketBlock
377 epb = kls(ts_high=ts >> 32, ts_low=ts & 0xffffffff, caplen=n, pkt_len=n, pkt_data=s)
378 self.__f.write(bytes(epb))
379
381 self.__f.close()
382
385
386 """Simple pypcap-compatible pcapng file reader."""
387
389 self.name = getattr(fileobj, 'name', '<{0}>'.format(fileobj.__class__.__name__))
390 self.__f = fileobj
391
392 shb = SectionHeaderBlock()
393 buf = self.__f.read(shb.__hdr_len__)
394 if len(buf) < shb.__hdr_len__:
395 raise ValueError('invalid pcapng header')
396
397 # unpack just the header since endianness is not known
398 shb.unpack_hdr(buf)
399 if shb.type != PCAPNG_BT_SHB:
400 raise ValueError('invalid pcapng header: not a SHB')
401
402 # determine the correct byte order and reload full SHB
403 if shb.bom == BYTE_ORDER_MAGIC_LE:
404 self.__le = True
405 buf += self.__f.read(_swap32b(shb.len) - shb.__hdr_len__)
406 shb = SectionHeaderBlockLE(buf)
407 elif shb.bom == BYTE_ORDER_MAGIC:
408 self.__le = False
409 buf += self.__f.read(shb.len - shb.__hdr_len__)
410 shb = SectionHeaderBlock(buf)
411 else:
412 raise ValueError('unknown endianness')
413
414 # check if this version is supported
415 if shb.v_major != PCAPNG_VERSION_MAJOR:
416 raise ValueError('unknown pcapng version {0}.{1}'.format(shb.v_major, shb.v_minor,))
417
418 # look for a mandatory IDB
419 idb = None
420 while 1:
421 buf = self.__f.read(8)
422 if len(buf) < 8:
423 break
424
425 blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf)
426 buf += self.__f.read(blk_len - 8)
427
428 if blk_type == PCAPNG_BT_IDB:
429 idb = (InterfaceDescriptionBlockLE(buf) if self.__le
430 else InterfaceDescriptionBlock(buf))
431 break
432 # just skip other blocks
433
434 if idb is None:
435 raise ValueError('IDB not found')
436
437 # set timestamp resolution and offset
438 self._divisor = float(1e6) # defaults
439 self._tsoffset = 0
440 for opt in idb.opts:
441 if opt.code == PCAPNG_OPT_IF_TSRESOL:
442 # if MSB=0, the remaining bits is a neg power of 10 (e.g. 6 means microsecs)
443 # if MSB=1, the remaining bits is a neg power of 2 (e.g. 10 means 1/1024 of second)
444 opt_val = struct_unpack('b', opt.data)[0]
445 pow_num = 2 if opt_val & 0b10000000 else 10
446 self._divisor = float(pow_num ** (opt_val & 0b01111111))
447
448 elif opt.code == PCAPNG_OPT_IF_TSOFFSET:
449 # 64-bit int that specifies an offset (in seconds) that must be added to the
450 # timestamp of each packet
451 self._tsoffset = struct_unpack('<q' if self.__le else '>q', opt.data)[0]
452
453 if idb.linktype in dltoff:
454 self.dloff = dltoff[idb.linktype]
455 else:
456 self.dloff = 0
457
458 self.idb = idb
459 self.snaplen = idb.snaplen
460 self.filter = ''
461 self.__iter = iter(self)
462
463 @property
465 return self.__f.fileno()
466
468 return self.fd
469
471 return self.idb.linktype
472
475
478
480 return next(self.__iter)
481
483 """Collect and process packets with a user callback.
484
485 Return the number of packets processed, or 0 for a savefile.
486
487 Arguments:
488
489 cnt -- number of packets to process;
490 or 0 to process all packets until EOF
491 callback -- function with (timestamp, pkt, *args) prototype
492 *args -- optional arguments passed to callback on execution
493 """
494 processed = 0
495 if cnt > 0:
496 for _ in range(cnt):
497 try:
498 ts, pkt = next(iter(self))
499 except StopIteration:
500 break
501 callback(ts, pkt, *args)
502 processed += 1
503 else:
504 for ts, pkt in self:
505 callback(ts, pkt, *args)
506 processed += 1
507 return processed
508
510 self.dispatch(0, callback, *args)
511
513 while 1:
514 buf = self.__f.read(8)
515 if len(buf) < 8:
516 break
517
518 blk_type, blk_len = struct_unpack('<II' if self.__le else '>II', buf)
519 buf += self.__f.read(blk_len - 8)
520
521 if blk_type == PCAPNG_BT_EPB:
522 epb = EnhancedPacketBlockLE(buf) if self.__le else EnhancedPacketBlock(buf)
523 ts = self._tsoffset + (((epb.ts_high << 32) | epb.ts_low) / self._divisor)
524 yield (ts, epb.pkt_data)
525
526 # just ignore other blocks
527
528
529 #########
530 # TESTS #
531 #########
532
533
534 -def test_shb():
535 """Test SHB with options"""
536 buf = (
537 b'\x0a\x0d\x0d\x0a\x58\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff'
538 b'\xff\xff\x04\x00\x31\x00\x54\x53\x68\x61\x72\x6b\x20\x31\x2e\x31\x30\x2e\x30\x72\x63\x32'
539 b'\x20\x28\x53\x56\x4e\x20\x52\x65\x76\x20\x34\x39\x35\x32\x36\x20\x66\x72\x6f\x6d\x20\x2f'
540 b'\x74\x72\x75\x6e\x6b\x2d\x31\x2e\x31\x30\x29\x00\x00\x00\x00\x00\x00\x00\x58\x00\x00\x00')
541
542 opt_buf = b'\x04\x00\x31\x00TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)\x00\x00\x00'
543
544 # block unpacking
545 shb = SectionHeaderBlockLE(buf)
546 assert shb.type == PCAPNG_BT_SHB
547 assert shb.bom == BYTE_ORDER_MAGIC
548 assert shb.v_major == 1
549 assert shb.v_minor == 0
550 assert shb.sec_len == -1
551 assert shb.data == ''
552
553 # options unpacking
554 assert len(shb.opts) == 2
555 assert shb.opts[0].code == PCAPNG_OPT_SHB_USERAPPL
556 assert shb.opts[0].data == b'TShark 1.10.0rc2 (SVN Rev 49526 from /trunk-1.10)'
557 assert shb.opts[0].len == len(shb.opts[0].data)
558
559 assert shb.opts[1].code == PCAPNG_OPT_ENDOFOPT
560 assert shb.opts[1].len == 0
561
562 # option packing
563 assert str(shb.opts[0]) == str(opt_buf)
564 assert len(shb.opts[0]) == len(opt_buf)
565 assert bytes(shb.opts[1]) == b'\x00\x00\x00\x00'
566
567 # block packing
568 assert str(shb) == str(buf)
569 assert len(shb) == len(buf)
570
573 """Test IDB with options"""
574 buf = (
575 b'\x01\x00\x00\x00\x20\x00\x00\x00\x01\x00\x00\x00\xff\xff\x00\x00\x09\x00\x01\x00\x06\x00'
576 b'\x00\x00\x00\x00\x00\x00\x20\x00\x00\x00')
577
578 # block unpacking
579 idb = InterfaceDescriptionBlockLE(buf)
580 assert idb.type == PCAPNG_BT_IDB
581 assert idb.linktype == DLT_EN10MB
582 assert idb.snaplen == 0xffff
583 assert idb.data == ''
584
585 # options unpacking
586 assert len(idb.opts) == 2
587 assert idb.opts[0].code == PCAPNG_OPT_IF_TSRESOL
588 assert idb.opts[0].len == 1
589 assert idb.opts[0].data == b'\x06'
590
591 assert idb.opts[1].code == PCAPNG_OPT_ENDOFOPT
592 assert idb.opts[1].len == 0
593
594 # option packing
595 assert bytes(idb.opts[0]) == b'\x09\x00\x01\x00\x06\x00\x00\x00'
596 assert len(idb.opts[0]) == 8
597 assert bytes(idb.opts[1]) == b'\x00\x00\x00\x00'
598
599 # block packing
600 assert str(idb) == str(buf)
601 assert len(idb) == len(buf)
602
605 """Test EPB with a non-ascii comment option"""
606 buf = (
607 b'\x06\x00\x00\x00\x80\x00\x00\x00\x00\x00\x00\x00\x73\xe6\x04\x00\xbe\x37\xe2\x19\x4a\x00'
608 b'\x00\x00\x4a\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x08\x00\x45\x00'
609 b'\x00\x3c\x5d\xb3\x40\x00\x40\x06\xdf\x06\x7f\x00\x00\x01\x7f\x00\x00\x01\x98\x34\x11\x4e'
610 b'\x95\xcb\x2d\x3a\x00\x00\x00\x00\xa0\x02\xaa\xaa\xfe\x30\x00\x00\x02\x04\xff\xd7\x04\x02'
611 b'\x08\x0a\x05\x8f\x70\x89\x00\x00\x00\x00\x01\x03\x03\x07\x00\x00\x01\x00\x0a\x00\xd0\xbf'
612 b'\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00\x00\x00\x00\x00\x80\x00\x00\x00')
613
614 # block unpacking
615 epb = EnhancedPacketBlockLE(buf)
616 assert epb.type == PCAPNG_BT_EPB
617 assert epb.caplen == len(epb.pkt_data)
618 assert epb.pkt_len == len(epb.pkt_data)
619 assert epb.caplen == 74
620 assert epb.ts_high == 321139
621 assert epb.ts_low == 434255806
622 assert epb.data == ''
623
624 # options unpacking
625 assert len(epb.opts) == 2
626 assert epb.opts[0].code == PCAPNG_OPT_COMMENT
627 assert epb.opts[0].text == u'\u043f\u0430\u043a\u0435\u0442'
628
629 assert epb.opts[1].code == PCAPNG_OPT_ENDOFOPT
630 assert epb.opts[1].len == 0
631
632 # option packing
633 assert bytes(epb.opts[0]) == b'\x01\x00\x0a\x00\xd0\xbf\xd0\xb0\xd0\xba\xd0\xb5\xd1\x82\x00\x00'
634 assert len(epb.opts[0]) == 16
635 assert bytes(epb.opts[1]) == b'\x00\x00\x00\x00'
636
637 # block packing
638 assert str(epb) == str(buf)
639 assert len(epb) == len(buf)
640
643 """Test writing a basic pcapng and then reading it"""
644 fobj = BytesIO()
645
646 writer = Writer(fobj, snaplen=0x2000, linktype=DLT_LINUX_SLL)
647 writer.writepkt(b'foo', ts=1454725786.526401)
648 fobj.flush()
649 fobj.seek(0)
650
651 reader = Reader(fobj)
652 assert reader.snaplen == 0x2000
653 assert reader.datalink() == DLT_LINUX_SLL
654
655 ts, buf1 = next(iter(reader))
656 assert ts == 1454725786.526401
657 assert buf1 == b'foo'
658
659 # test dispatch()
660 fobj.seek(0)
661 reader = Reader(fobj)
662 assert reader.dispatch(1, lambda ts, pkt: None) == 1
663 assert reader.dispatch(1, lambda ts, pkt: None) == 0
664 fobj.close()
665
668 """Test a full pcapng file with 1 ICMP packet"""
669 buf = (
670 b'\x0a\x0d\x0d\x0a\x7c\x00\x00\x00\x4d\x3c\x2b\x1a\x01\x00\x00\x00\xff\xff\xff\xff\xff\xff'
671 b'\xff\xff\x03\x00\x1e\x00\x36\x34\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f\x77\x73\x20\x38'
672 b'\x2e\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00\x04\x00\x34\x00\x44\x75'
673 b'\x6d\x70\x63\x61\x70\x20\x31\x2e\x31\x32\x2e\x37\x20\x28\x76\x31\x2e\x31\x32\x2e\x37\x2d'
674 b'\x30\x2d\x67\x37\x66\x63\x38\x39\x37\x38\x20\x66\x72\x6f\x6d\x20\x6d\x61\x73\x74\x65\x72'
675 b'\x2d\x31\x2e\x31\x32\x29\x00\x00\x00\x00\x7c\x00\x00\x00\x01\x00\x00\x00\x7c\x00\x00\x00'
676 b'\x01\x00\x00\x00\x00\x00\x04\x00\x02\x00\x32\x00\x5c\x44\x65\x76\x69\x63\x65\x5c\x4e\x50'
677 b'\x46\x5f\x7b\x33\x42\x42\x46\x32\x31\x41\x37\x2d\x39\x31\x41\x45\x2d\x34\x44\x44\x42\x2d'
678 b'\x41\x42\x32\x43\x2d\x43\x37\x38\x32\x39\x39\x39\x43\x32\x32\x44\x35\x7d\x00\x00\x09\x00'
679 b'\x01\x00\x06\x00\x00\x00\x0c\x00\x1e\x00\x36\x34\x2d\x62\x69\x74\x20\x57\x69\x6e\x64\x6f'
680 b'\x77\x73\x20\x38\x2e\x31\x2c\x20\x62\x75\x69\x6c\x64\x20\x39\x36\x30\x30\x00\x00\x00\x00'
681 b'\x00\x00\x7c\x00\x00\x00\x06\x00\x00\x00\x84\x00\x00\x00\x00\x00\x00\x00\x63\x20\x05\x00'
682 b'\xd6\xc4\xab\x0b\x4a\x00\x00\x00\x4a\x00\x00\x00\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12'
683 b'\x35\x02\x08\x00\x45\x00\x00\x3c\xa4\x40\x00\x00\x1f\x01\x27\xa2\xc0\xa8\x03\x28\x0a\x00'
684 b'\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42\x43\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c'
685 b'\x4d\x4e\x4f\x50\x51\x52\x53\x54\x55\x56\x57\x41\x42\x43\x44\x45\x46\x47\x48\x49\x00\x00'
686 b'\x01\x00\x0f\x00\x64\x70\x6b\x74\x20\x69\x73\x20\x61\x77\x65\x73\x6f\x6d\x65\x00\x00\x00'
687 b'\x00\x00\x84\x00\x00\x00')
688
689 fobj = BytesIO(buf)
690
691 # test reading
692 reader = Reader(fobj)
693 assert reader.snaplen == 0x40000
694 assert reader.datalink() == DLT_EN10MB
695
696 assert reader.idb.opts[0].data.decode('utf-8') == '\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'
697 assert reader.idb.opts[2].data.decode('utf-8') == '64-bit Windows 8.1, build 9600'
698
699 ts, buf1 = next(iter(reader))
700 assert ts == 1442984653.2108380
701 assert len(buf1) == 74
702
703 assert buf1.startswith(b'\x08\x00\x27\x96')
704 assert buf1.endswith(b'FGHI')
705 fobj.close()
706
707 # test pcapng customized writing
708 shb = SectionHeaderBlockLE(opts=[
709 PcapngOptionLE(code=3, data=b'64-bit Windows 8.1, build 9600'),
710 PcapngOptionLE(code=4, data=b'Dumpcap 1.12.7 (v1.12.7-0-g7fc8978 from master-1.12)'),
711 PcapngOptionLE()
712 ])
713 idb = InterfaceDescriptionBlockLE(snaplen=0x40000, opts=[
714 PcapngOptionLE(code=2, data=b'\\Device\\NPF_{3BBF21A7-91AE-4DDB-AB2C-C782999C22D5}'),
715 PcapngOptionLE(code=9, data=b'\x06'),
716 PcapngOptionLE(code=12, data=b'64-bit Windows 8.1, build 9600'),
717 PcapngOptionLE()
718 ])
719 epb = EnhancedPacketBlockLE(opts=[
720 PcapngOptionLE(code=1, text=b'dpkt is awesome'),
721 PcapngOptionLE()
722 ], pkt_data=(
723 b'\x08\x00\x27\x96\xcb\x7c\x52\x54\x00\x12\x35\x02\x08\x00\x45\x00\x00\x3c\xa4\x40\x00\x00'
724 b'\x1f\x01\x27\xa2\xc0\xa8\x03\x28\x0a\x00\x02\x0f\x00\x00\x56\xf0\x00\x01\x00\x6d\x41\x42'
725 b'\x43\x44\x45\x46\x47\x48\x49\x4a\x4b\x4c\x4d\x4e\x4f\x50\x51\x52\x53\x54\x55\x56\x57\x41'
726 b'\x42\x43\x44\x45\x46\x47\x48\x49'
727 ))
728 fobj = BytesIO()
729 writer = Writer(fobj, shb=shb, idb=idb)
730 writer.writepkt(epb, ts=1442984653.210838)
731 assert fobj.getvalue() == buf
732 fobj.close()
733
734 # same with timestamps defined inside EPB
735 epb.ts_high = 335971
736 epb.ts_low = 195806422
737
738 fobj = BytesIO()
739 writer = Writer(fobj, shb=shb, idb=idb)
740 writer.writepkt(epb)
741 assert fobj.getvalue() == buf
742 fobj.close()
743
744
745 if __name__ == '__main__':
746 # TODO: big endian unit tests; could not find any examples..
747
748 test_shb()
749 test_idb()
750 test_epb()
751 test_simple_write_read()
752 test_custom_read_write()
753 repr(PcapngOptionLE())
754
755 print('Tests Successful...')
756
| Home | Trees | Indices | Help |
|
|---|
| Generated by Epydoc 3.0.1 on Tue Apr 30 11:26:51 2019 | http://epydoc.sourceforge.net |