Source code for demoparser.bytebuffer
from demoparser.bitbuffer import Bitbuffer
import struct
from demoparser.structures import CommandHeader
[docs]class Bytebuffer:
r"""Parse a stream of bytes from a .DEM file.
This class provdes convenience methods for parsing
.DEM files. It handles unpacking bytes to different
data types, reading variable-length integers, reading
strings, and creating Bitbuffers.
:param data: Buffer data
:type data: bytes
:Example:
>>> b = Bytebuffer(b'\x00\xf1\xaa')
>>> b.read(1)
0
>>> b.read_short()
43761
"""
def __init__(self, data):
self.data = data
self.index = 0
[docs] def read(self, num_bytes):
"""Read `num_bytes` bytes from buffer."""
d = self.data[self.index:self.index + num_bytes]
self.index += num_bytes
return d
[docs] def read_command_header(self):
"""Read the 6 byte command header.
See :ref:`CommandHeader description <header-format>`
for more details.
:returns: CommandHeader instance
"""
return CommandHeader.from_data(self.read(6))
[docs] def read_command_data(self):
"""Read command info structure.
This is not used by the parser.
:returns: bytes
"""
# This data fits the structure described in
# structures.py:CommandInfo. This data seems to always
# be all 0s though. It doesn't appear to be very useful
# and it is very expensive to create one of these structures
# for each message.
self.read(152)
[docs] def read_sequence_data(self):
"""Read two integers.
This data is not used by the parser.
:returns: Tuple of integers
"""
return struct.unpack("<ii", self.read(8))
[docs] def read_user_command(self):
"""Read a user command."""
seq = struct.unpack("<i", self.read(4))[0]
length, buf = self.read_raw_data()
return seq
[docs] def read_packet_data(self):
r"""Read a demo packet.
Each packet consists of a command and data.
The command is an ID that references either a NET\_ or
SVC\_ class defined in netmessages.proto.
The data is used to instantiate the class referred to
by command.
:returns: Tuple (command, data)
"""
length = struct.unpack("<i", self.read(4))[0]
index = 0
while index < length:
cmd = self.read_varint()
size = self.read_varint()
data = self.read(size)
index = index + size + \
self._varint_size(cmd) + self._varint_size(size)
yield cmd, size, data
[docs] def read_raw_data(self):
"""Read number of bytes specified by a signed int.
First a 32-bit signed integer is read, then
that number of bytes is read from the stream.
:returns: Tuple (bytes_read, bytes)
"""
length = struct.unpack("<i", self.read(4))[0]
buf = self.read(length)
return length, buf
[docs] def read_bitstream(self):
"""Create a Bitbuffer from a number of bytes.
The numbers of bytes to include in the Bitbuffer is
read. Then that number of bytes is used to instantiate
a Bitbuffer instance.
:returns: Bitbuffer instance
"""
length, buf = self.read_raw_data()
return Bitbuffer(buf)
[docs] def read_var_bytes(self):
"""Read number of bytes specified by a varint.
First a varint is read and then the number
of bytes specified by the varint are read.
:returns: bytestring
"""
length = self.read_varint()
return self.read(length)
[docs] def read_string(self):
r"""Read a zero-terminated string.
Reads characters until \\0 is encountered.
:returns: str
"""
output = []
while True:
char = struct.unpack("B", self.read(1))[0]
if char == 0:
break
output.append(chr(char))
return "".join(output)
[docs] def read_varint(self):
"""Read a variable-length integer.
:returns: Integer
"""
b = 0
count = 0
result = 0
cont = True
while cont:
data = self.read(1)
b = struct.unpack("B", data)
b = b[0]
if count < 5:
result |= (b & 0x7F) << (7 * count)
count += 1
cont = b & 0x80
return result
[docs] def read_short(self):
"""Read a 16-bit unsigned short."""
return struct.unpack("H", self.read(2))[0]
def _varint_size(self, value):
if (value < 1 << 7):
return 1
elif (value < 1 << 14):
return 2
elif (value < 1 << 21):
return 3
elif (value < 1 << 28):
return 4
else:
return 5