rand[om]

rand[om]

med ∩ ml

ZIP as a bundle format

Table of contents

At some point in the past, I created a tool for a project (not relevant for this post). This tool generated multiple artefacts from an input (embeddings file, JSON metadata, and other files). The project was successful, but one thing I didn’t like about the “tool” I wrote was that it was not easy to manage those artefacts. The artefacts had to be uploaded and downloaded to some remote(s). Apart from that, the code required all the files to be available, in the same directory, and in sync. For example, if you re-generated the embeddings, you had to re-generate the JSON metadata, but if you forgot to do so, or the process failed and didn’t notice, the tool wouldn’t work as expected.

Anyhow, the project succeeded, and the “tool” did what it was supposed to do. However, I kept thinking about what could be done better if I had to do it again, and the answer was finding an easy way to bundle the artefacts. I would like to bundle the files inside a single file, but in a way that I can then read individual files without extracting them from the bundle. This would make it easy to handle the artefacts, versioning and distribution.

After some consideration, I decided to experiment with ZIP files. And here’s the code and explanations from my experiments.

Preamble

One of the reasons I looked into ZIP files is because I got curious once I learned that llamafile1 and the APE (Actually Portable Executable)2 formats both use ZIP as a “container” format. So even if the ZIP format wasn’t going to be the best solution for my problem (it turned out to be exactly what I needed), I still wanted to learn a bit more about the ZIP format.

ZIP files

After some online searches and with some help from ChatGPT/Claude, this is what I found out about the ZIP format.

It consists of three main sections:

  1. A series of local file headers followed by their corresponding file data
  2. A central directory containing metadata about all files
  3. An end of central directory record

The structure looks something like this:

[Local File Header 1][File Data 1]  # File data follows each header
[Local File Header 2][File Data 2]
...
[Central Directory]  # Contains metadata: headers, compression, offsets, attributes
[End of Central Directory Record]  # Has Central Directory location, entry count, comments
  • File Data: Raw file contents immediately after each Local File Header
  • Central Directory: Contains metadata about all files including:
    • File headers
    • Compression information
    • Offsets to Local File Headers
    • File attributes
  • End of Central Directory: Final structure containing:
    • Location of the Central Directory
    • Number of entries
    • Comments

Each local header file has a fixed size of 30 bytes, and potentially a variable length extra field. This is the structure:

OffsetBytesDescription
04Signature (0x04034b50)
42Version needed
62Flags
82Compression method
102Last mod time
122Last mod date
144CRC-32
184Compressed size
224Uncompressed size
262Filename length (n)
282Extra field length (m)
30nFilename
30+nmExtra field

What makes this format particularly suitable for my needs is that the actual file data follows immediately after each local header. This means you can read individual files without having to process the entire ZIP archive - you just need to know where the file’s data starts and how long it is.

The central directory at the end acts like an index, storing metadata about all files including their offsets, making it efficient to locate specific files within the archive.

The code

As an example, I will try bundling two TXT files into a ZIP file, then read them back. This is the code I’m running at the beginning of my script to “reset” the local files:

from pathlib import Path

sample_files = [Path("t1.txt"), Path("t2.txt")]
archive_name = Path("archive.zip")


def create_sample_files() -> None:
    with open(sample_files[0], "w") as f:
        f.write("Hello, world!\n")
    with open(sample_files[1], "w") as f:
        f.write("foo\nbar\nbaz\n")


def delete_files_if_exists() -> None:
    """
    Delete the archive file if it exists in the current working directory.
    """
    if archive_name.exists():
        archive_name.unlink()
        print(f"Deleted existing archive: {archive_name}")
    for file in sample_files:
        if file.exists():
            file.unlink()
            print(f"Deleted existing file: {file}")

delete_files_if_exists()
create_sample_files()

First, we create an uncompressed ZIP file (using ZIP_STORED compression mode). This allows us to read individual files directly by knowing their offset and size in the archive.

import zipfile
from pathlib import Path


def create_uncompressed_archive(files: list[Path], archive_name: Path) -> Path:
    """
    Create an uncompressed ZIP archive containing the specified files.

    Args:
        files: List of Path objects to include in the archive
        archive_name: Path object for the ZIP archive to create
    """
    with zipfile.ZipFile(
        archive_name, "w", compression=zipfile.ZIP_STORED
    ) as zf:
        for file in files:
            zf.write(file)
    return archive_name

Now we need to read an individual file from the ZIP file. We need to find the offset, and then read the file data.

def get_file_offset(archive_name: Path, filename: str) -> tuple[int, int]:
    """
    Get the offset and size of a file within the ZIP archive.

    Args:
        archive_name: Path object for the ZIP archive
        filename: Name of the file within the archive

    Returns:
        Tuple of (offset, size) where the file data is located
    """

    # ## ZIP File Structure Overview
    # A ZIP file is composed of three main sections:
    # 1. Local File Headers + File Data (repeated for each file)
    # 2. Central Directory
    # 3. End of Central Directory Record

    # [Local File Header 1][File Data 1]  # File data follows each header
    # [Local File Header 2][File Data 2]
    # ...
    # [Central Directory]  # Contains metadata: headers, compression, offsets, attributes
    # [End of Central Directory Record]  # Has Central Directory location, entry count, comments

    # File Data: Raw file contents immediately after each Local File Header
    # Central Directory: Contains metadata about all files including:
    #   - File headers
    #   - Compression information
    #   - Offsets to Local File Headers
    #   - File attributes
    # End of Central Directory: Final structure containing:
    #   - Location of the Central Directory
    #   - Number of entries
    #   - Comments

    # | Offset | Bytes | Description             |
    # |--------|-------|-------------------------|
    # | 0      | 4     | Signature (0x04034b50) |
    # | 4      | 2     | Version needed         |
    # | 6      | 2     | Flags                  |
    # | 8      | 2     | Compression method     |
    # | 10     | 2     | Last mod time          |
    # | 12     | 2     | Last mod date          |
    # | 14     | 4     | CRC-32                 |
    # | 18     | 4     | Compressed size        |
    # | 22     | 4     | Uncompressed size      |
    # | 26     | 2     | Filename length (n)    |
    # | 28     | 2     | Extra field length (m) |
    # | 30     | n     | Filename               |
    # | 30+n   | m     | Extra field            |

    # Size of the fixed portion of the ZIP local file header
    ZIP_LOCAL_HEADER_SIZE = 30

    with zipfile.ZipFile(archive_name, "r") as zf:
        info = zf.getinfo(filename)

        print("info:", info)
        # Will print something like:
        # info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
        print("Header offset:", info.header_offset)
        # Will print something like:
        # Header offset: 50
        print("Extra field:", info.extra)
        # Will print something like:
        # Extra field: b''

        # The header_offset points to the local file header
        # We need to skip past the header to get to the actual data
        n = len(info.filename)
        m = len(info.extra)
        header_size = ZIP_LOCAL_HEADER_SIZE + n + m
        data_offset = info.header_offset + header_size
        return (data_offset, info.file_size)

That function returns a tuple with two values: the offset and the size of the file data. We can use that to .seek() and .read() the file data from the ZIP file.

import io
from pathlib import Path


def read_file_at_offset(archive_name: Path, filename: str) -> bytes:
    """
    Read file contents directly from the ZIP archive using offsets.

    Args:
        archive_name: Path object for the ZIP archive
        filename: Name of the file within the archive

    Returns:
        Raw bytes of the file content
    """
    offset, size = get_file_offset(archive_name, filename)
    with open(archive_name, "rb") as f:
        f.seek(offset)
        return f.read(size)

And that’s basically it.

We can still make some improvements, though. Instead of reading the full file in memory, we could use a io.BufferedIOBase subclass to allow reading the file in chunks, or even streaming the file data. Similar to how you’re can read a file from disk using open(file). Here’s the final code, with everything put together (local files preparation, ZIP file creation, and reading the file at offset).

import io
import sys
import zipfile
from pathlib import Path

sample_files = [Path("t1.txt"), Path("t2.txt")]
archive_name = Path("archive.zip")


def create_sample_files() -> None:
    with open(sample_files[0], "w") as f:
        f.write("Hello, world!\n")
    with open(sample_files[1], "w") as f:
        f.write("foo\nbar\nbaz\n")


def delete_files_if_exists() -> None:
    """
    Delete the archive file if it exists in the current working directory.
    """
    if archive_name.exists():
        archive_name.unlink()
        print(f"Deleted existing archive: {archive_name}")
    for file in sample_files:
        if file.exists():
            file.unlink()
            print(f"Deleted existing file: {file}")


def create_uncompressed_archive(files: list[Path], archive_name: Path) -> Path:
    """
    Create an uncompressed ZIP archive containing the specified files.

    Args:
        files: List of Path objects to include in the archive
        archive_name: Path object for the ZIP archive to create
    """
    with zipfile.ZipFile(
        archive_name, "w", compression=zipfile.ZIP_STORED
    ) as zf:
        for file in files:
            zf.write(file)
    return archive_name


def list_archive_contents(archive_name: Path) -> Path:
    """
    List the filenames in a ZIP archive.

    Args:
        archive_name: Path object for the ZIP archive to read
    """
    try:
        with zipfile.ZipFile(archive_name, "r") as zf:
            print(f"\nFiles in {archive_name}:")
            for filename in zf.namelist():
                print(f"- {filename}")
        return archive_name
    except Exception as e:
        print(f"Error reading archive: {e}", file=sys.stderr)
        raise


def print_file_info(archive_name: Path, filename: str) -> None:
    with zipfile.ZipFile(archive_name, "r") as zf:
        info = zf.getinfo(filename)
        print(f"File: {filename}")
        print("info:", info)
        # Will print something like:
        # info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
        print("Header offset:", info.header_offset)
        # Will print something like:
        # Header offset: 50
        print("Extra field:", info.extra)
        # Will print something like:
        # Extra field: b''
        print(f" Size: {info.file_size} bytes")


def get_file_offset(archive_name: Path, filename: str) -> tuple[int, int]:
    """
    Get the offset and size of a file within the ZIP archive.

    Args:
        archive_name: Path object for the ZIP archive
        filename: Name of the file within the archive

    Returns:
        Tuple of (offset, size) where the file data is located
    """

    # ## ZIP File Structure Overview
    # A ZIP file is composed of three main sections:
    # 1. Local File Headers + File Data (repeated for each file)
    # 2. Central Directory
    # 3. End of Central Directory Record

    # [Local File Header 1][File Data 1]  # File data follows each header
    # [Local File Header 2][File Data 2]
    # ...
    # [Central Directory]  # Contains metadata: headers, compression, offsets, attributes
    # [End of Central Directory Record]  # Has Central Directory location, entry count, comments

    # File Data: Raw file contents immediately after each Local File Header
    # Central Directory: Contains metadata about all files including:
    #   - File headers
    #   - Compression information
    #   - Offsets to Local File Headers
    #   - File attributes
    # End of Central Directory: Final structure containing:
    #   - Location of the Central Directory
    #   - Number of entries
    #   - Comments

    # | Offset | Bytes | Description             |
    # |--------|-------|-------------------------|
    # | 0      | 4     | Signature (0x04034b50) |
    # | 4      | 2     | Version needed         |
    # | 6      | 2     | Flags                  |
    # | 8      | 2     | Compression method     |
    # | 10     | 2     | Last mod time          |
    # | 12     | 2     | Last mod date          |
    # | 14     | 4     | CRC-32                 |
    # | 18     | 4     | Compressed size        |
    # | 22     | 4     | Uncompressed size      |
    # | 26     | 2     | Filename length (n)    |
    # | 28     | 2     | Extra field length (m) |
    # | 30     | n     | Filename               |
    # | 30+n   | m     | Extra field            |

    # Size of the fixed portion of the ZIP local file header
    ZIP_LOCAL_HEADER_SIZE = 30

    with zipfile.ZipFile(archive_name, "r") as zf:
        info = zf.getinfo(filename)

        # The header_offset points to the local file header
        # We need to skip past the header to get to the actual data
        n = len(info.filename)
        m = len(info.extra)
        header_size = ZIP_LOCAL_HEADER_SIZE + n + m
        data_offset = info.header_offset + header_size
        return (data_offset, info.file_size)


class ZipFileView(io.BufferedIOBase):
    def __init__(self, archive_path: Path, offset: int, size: int) -> None:
        self._archive_path = archive_path
        self._offset = offset
        self._size = size
        self._pos = 0
        self._file = open(archive_path, "rb")
        self._file.seek(offset)

    def read(self, size: int | None = None) -> bytes:
        if size is None:
            size = self._size - self._pos

        size = min(size, self._size - self._pos)
        if size <= 0:
            return b""

        data = self._file.read(size)
        self._pos += len(data)
        return data

    def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
        if whence == io.SEEK_SET:
            new_pos = offset
        elif whence == io.SEEK_CUR:
            new_pos = self._pos + offset
        elif whence == io.SEEK_END:
            new_pos = self._size + offset
        else:
            raise ValueError("Invalid whence value")

        new_pos = max(0, min(new_pos, self._size))
        self._file.seek(self._offset + new_pos)
        self._pos = new_pos
        return self._pos

    def tell(self) -> int:
        return self._pos

    def close(self) -> None:
        self._file.close()

    def readable(self) -> bool:
        return True

    def seekable(self) -> bool:
        return True

    def write(self, *args, **kwargs) -> int:
        raise io.UnsupportedOperation("write")

    def writable(self) -> bool:
        return False


def read_file_at_offset(archive_name: Path, filename: str) -> ZipFileView:
    """
    Read file contents directly from the ZIP archive using offsets.

    Args:
        archive_name: Path object for the ZIP archive
        filename: Name of the file within the archive

    Returns:
        Raw bytes of the file content
    """
    offset, size = get_file_offset(archive_name, filename)
    return ZipFileView(archive_name, offset, size)

    # with open(archive_name, "rb") as f:
    #     f.seek(offset)
    #     return f.read(size)


def main():
    files_to_archive = [Path("t1.txt"), Path("t2.txt")]
    archive_name = Path("archive.zip")

    delete_files_if_exists()
    create_sample_files()
    create_uncompressed_archive(files_to_archive, archive_name)
    print(f"Successfully created {archive_name}")
    list_archive_contents(archive_name)

    print("\n====\n")
    print_file_info(archive_name, "t2.txt")
    offset, size = get_file_offset(archive_name, "t2.txt")
    print(
        f"File 't2.txt' is located at offset {offset} with size {size} bytes"
    )
    print("\n====\n")
    zf = read_file_at_offset(archive_name, "t2.txt")
    print("File contents:")
    print(zf.read().decode())

    return 0


if __name__ == "__main__":
    sys.exit(main())

It should print something like this:

Successfully created archive.zip

Files in archive.zip:
- t1.txt
- t2.txt

====

File: t2.txt
info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
Header offset: 50
Extra field: b''
 Size: 12 bytes
File 't2.txt' is located at offset 86 with size 12 bytes

====

File contents:
foo
bar
baz