Skip to content

MappedBinaryIO, Testimplementation for alternating KaitaiStream - maybe #76

@Hypnootika

Description

@Hypnootika

Hello everyone,

im decently new to working with binary files and KaitaiStruct. I love it but i unfortunately dont like the ReadWriteStruct.

I created a different approach based on the Python Runtime and i would like to have some feedback about possible improvements (and / or / or why) thats not suitable for Kaitai.

Please be kind with me, thats my first "package" and definitely the first mmap impl. i created.

The overall intention is (if you guys like the approach) that i would try to convert it and improve it further ( and create a new /different compiler-mode).

If you see mistakes or not logical implementations, please tell me. I want to learn!

Edit1: Note, there are obviously a lot of functions missing that Kaitai needs. This is just my usecase i currently build this around. Take it as a Prototype for a possible mmap approach.

Edit2: About the performance: I cant really say much at the moment but just by testing this, i already noticed a gain in speed (IDE runs the code a lot faster). Thats obviously a really bad comparison but if someone is interested, i could do tests aswell

import os
import struct
from mmap import mmap, ACCESS_COPY
from typing import List, Union


class Parser:
    """Parser class for binary data"""

    struct_mapping = {
        "u2be": struct.Struct(">H"),
        "u4be": struct.Struct(">I"),
        "u8be": struct.Struct(">Q"),
        "u2le": struct.Struct("<H"),
        "u4le": struct.Struct("<I"),
        "u8le": struct.Struct("<Q"),
        "s1": struct.Struct("b"),
        "s2be": struct.Struct(">h"),
        "s4be": struct.Struct(">i"),
        "s8be": struct.Struct(">q"),
        "s2le": struct.Struct("<h"),
        "s4le": struct.Struct("<i"),
        "s8le": struct.Struct("<q"),
        "f4be": struct.Struct(">f"),
        "f8be": struct.Struct(">d"),
        "f4le": struct.Struct("<f"),
        "f8le": struct.Struct("<d"),
        "u1": struct.Struct("B"),
    }

    range_mapping = {
        "u2be": (0, 65535),
        "u4be": (0, 4294967295),
        "u8be": (0, 18446744073709551615),
        "u2le": (0, 65535),
        "u4le": (0, 4294967295),
        "u8le": (0, 18446744073709551615),
        "s1": (-128, 127),
        "s2be": (-32768, 32767),
        "s4be": (-2147483648, 2147483647),
        "s8be": (-9223372036854775808, 9223372036854775807),
        "s2le": (-32768, 32767),
        "s4le": (-2147483648, 2147483647),
        "s8le": (-9223372036854775808, 9223372036854775807),
        "u1": (0, 255),
        "f4be": (-3.4e38, 3.4e38),
        "f8be": (-1.8e308, 1.8e308),
        "f4le": (-3.4e38, 3.4e38),
        "f8le": (-1.8e308, 1.8e308),
    }

    @classmethod
    def is_value_in_range(cls, pattern_id: str, value: Union[int, float]) -> bool:
        """Check if value is in range of pattern_id"""
        min_value, max_value = cls.range_mapping.get(pattern_id, (None, None))
        if min_value is None or max_value is None:
            raise ValueError(f"Pattern ID {pattern_id} not found.")
        return min_value <= value <= max_value

    @classmethod
    def pack_value(cls, pattern_id: str, value: Union[int, float]) -> bytes:
        """Convert value to bytes"""
        if not cls.is_value_in_range(pattern_id, value):
            raise ValueError(f"Value {value} out of range for pattern ID {pattern_id}.")
        struct_pattern = cls.struct_mapping.get(pattern_id)
        if struct_pattern is None:
            raise ValueError(f"Invalid pattern ID {pattern_id}.")
        return struct_pattern.pack(value)

    def read(self, data: bytes, pattern_id: str) -> bytes:
        """Read bytes from data"""
        size = self.struct_mapping.get(pattern_id, struct.Struct("")).size
        return data[:size]

    def read_value(self, data: bytes, pattern_id: str) -> Union[int, float]:
        """Read value from data"""
        packed_data = self.read(data, pattern_id)
        return self.struct_mapping[pattern_id].unpack(packed_data)[0]

    def read_array(
        self, data: bytes, count: int, pattern_id: str
    ) -> List[Union[int, float]]:
        """Read array of values from data"""
        size = self.struct_mapping[pattern_id].size
        return [
            self.read_value(data[i : i + size], pattern_id)
            for i in range(0, count * size, size)
        ]


class BaseMappedBinary:
    def __init__(self, file_path: str, output_file_path: str = None):
        self.file_path = file_path
        self.output_file_path = output_file_path
        if not os.path.exists(self.file_path):
            self.file = open(self.file_path, "w+b")
        else:
            self.file = open(self.file_path, "r+b")
        self.mapped_file = mmap(self.file.fileno(), 0, access=ACCESS_COPY)
        self.offset = 0
        self.parser = Parser()

    def __enter__(self):
        return self

    def __exit__(self, exc_type, exc_value, traceback):
        self.close()

    def _read_from_offset(self, size: int) -> bytes:
        return self.mapped_file[self.offset : self.offset + size]

    def _update_offset(self, size: int):
        self.offset += size

    def close(self):
        self.mapped_file.close()
        self.file.close()

    def seek(self, offset: int) -> int:
        """Seek to offset"""
        self.offset = offset
        return self.offset

    def tell(self) -> int:
        """Return current offset"""
        return self.offset

    def flush(self):
        self.mapped_file.flush()


class MappedBinaryReader(BaseMappedBinary):
    def __init__(self, file_path: str):
        super().__init__(file_path, output_file_path=None)

    def read(self, pattern_id: str) -> bytes:
        return self.parser.read(
            self._read_from_offset(self.parser.struct_mapping[pattern_id].size),
            pattern_id,
        )

    def read_value(self, pattern_id: str) -> Union[int, float]:
        size = self.parser.struct_mapping[pattern_id].size
        value = self.parser.read_value(self._read_from_offset(size), pattern_id)
        self._update_offset(size)
        return value

    def read_array(self, count: int, pattern_id: str) -> List[Union[int, float]]:
        size = self.parser.struct_mapping[pattern_id].size
        values = self.parser.read_array(
            self._read_from_offset(count * size), count, pattern_id
        )
        self._update_offset(count * size)
        return values

    def read_string(self, count: int) -> str:
        """Read string from data"""
        value = self._read_from_offset(count).decode("utf-8")
        self._update_offset(count)
        return value

    def read_string_array(self, count: int) -> List[str]:
        """Read array of strings from data"""
        return [self.read_string(count) for _ in range(count)]

    def read_string_array_with_count(self) -> List[str]:
        """Read array of strings from data"""
        count = self.read_value("u4le")
        return self.read_string_array(count)

    def read_string_with_count(self) -> str:
        """Read string from data"""
        count = self.read_value("u4le")
        return self.read_string(count)

    def read_bytes(self, count: int) -> bytes:
        """Read bytes from data"""
        return self._read_from_offset(count)

    def read_bytes_with_count(self) -> bytes:
        """Read bytes from data"""
        count = self.read_value("u4le")
        return self._read_from_offset(count)

    def read_value_array_with_count(self, pattern_id: str) -> List[Union[int, float]]:
        """Read array of values from data"""
        count = self.read_value("u4le")
        return self.read_array(count, pattern_id)

    def read_value_array(self, count: int, pattern_id: str) -> List[Union[int, float]]:
        """Read array of values from data"""
        return self.read_array(count, pattern_id)


class MappedBinaryWriter(BaseMappedBinary):
    def __init__(self, file_path: str):
        super().__init__(file_path, output_file_path=None)
        self.data = b""

    def get_data(self) -> bytes:
        """Return the collected data as bytes"""
        return self.data

    def write(self, pattern_id: str, value: Union[int, float]) -> None:
        """Write value to data"""
        self.data += self.parser.pack_value(pattern_id, value)

    def write_value(self, pattern_id: str, value: Union[int, float]) -> None:
        """Write value to data"""
        self.write(pattern_id, value)

    def write_array(self, pattern_id: str, values: List[Union[int, float]]) -> None:
        """Write array of values to data"""
        for value in values:
            self.write_value(pattern_id, value)

    def write_value_array(
        self, pattern_id: str, values: List[Union[int, float]]
    ) -> None:
        """Write array of values to data"""
        self.write_array(pattern_id, values)

    def write_bytes(self, value: bytes) -> None:
        """Write bytes to data"""
        self.data += value

    def write_bytes_with_count(self, value: bytes) -> None:
        """Write bytes to data"""
        self.write_value("u4le", len(value))
        self.write_bytes(value)

    def write_string(self, value: str) -> None:
        """Write string to data"""
        self.data += value.encode("utf-8")

    def write_string_array(self, values: List[str]) -> None:
        """Write array of strings to data"""
        for value in values:
            self.write_string(value)

    def write_string_array_with_count(self, values: List[str]) -> None:
        """Write array of strings to data"""
        self.write_value("u4le", len(values))
        self.write_string_array(values)

    def write_string_with_count(self, value: str) -> None:
        """Write string to data"""
        self.write_value("u4le", len(value))
        self.write_string(value)

    def write_value_array_with_count(
        self, pattern_id: str, values: List[Union[int, float]]
    ) -> None:
        """Write array of values to data"""
        self.write_value("u4le", len(values))
        self.write_array(pattern_id, values)


class MappedBinaryIO(MappedBinaryReader, MappedBinaryWriter):
    def __init__(self, file_path: str, output_file_path: str = None):
        self.file_path = file_path

        if output_file_path is None:
            self.output_file_path = file_path + ".bin"
        else:
            self.output_file_path = output_file_path
        self.reader = MappedBinaryReader(self.file_path)
        self.writer = MappedBinaryWriter(self.file_path)

    def read_value(self, pattern_id: str) -> Union[int, float]:
        return self.reader.read_value(pattern_id)

    def write_value(self, pattern_id: str, value: Union[int, float]) -> None:
        self.writer.write_value(pattern_id, value)

    def flush(self) -> None:
        self.writer.flush()

    def seek(self, offset: int) -> int:
        return self.reader.seek(offset)

    def tell(self) -> int:
        return self.reader.tell()

    def close(self) -> None:
        self.reader.close()
        self.writer.close()

and a testfile class:



class ExpFile(MappedBinaryIO):
    def __init__(self, file_path: str, output_file_path: str = None):
        super().__init__(file_path)
        self._read()
        self.data = self.writer.get_data()
        if output_file_path is None:
            self.output_file_path = file_path + ".bin"
        else:
            self.output_file_path = output_file_path
        self.mapped_file = self.reader.mapped_file

    def _read(self):
        self.magic = self.reader.read_string(4)
        self.version = self.reader.read_value("u2le")
        self.uk = self.reader.read_value("u4le")
        self.header_size = self.reader.read_value("u4le")

    def __repr__(self):
        return (
            f"ExpFile({self.magic=}, {self.version=}, {self.uk=}, {self.header_size=})"
        )

    def _write(self):
        self.writer.write_string(self.magic)
        self.writer.write("u2le", self.version)
        self.writer.write("u4le", self.uk)
        self.writer.write("u4le", self.header_size)
        return self.writer.get_data()

    def write_to_file(self):
        with open(self.output_file_path, "wb") as f:
            f.write(self._write())


if __name__ == "__main__":
    mt = ExpFile(r"D:\binparser\eso0001.dat")
    mt.write_to_file()
    print(mt)
    print(mt.tell())

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions