ZIP as a bundle format
Table of contents
At some point in the past, I created a tool for a project (not relevant for this post). This tool generated multiple artefacts from an input (embeddings file, JSON metadata, and other files). The project was successful, but one thing I didn’t like about the “tool” I wrote was that it was not easy to manage those artefacts. The artefacts had to be uploaded and downloaded to some remote(s). Apart from that, the code required all the files to be available, in the same directory, and in sync. For example, if you re-generated the embeddings, you had to re-generate the JSON metadata, but if you forgot to do so, or the process failed and didn’t notice, the tool wouldn’t work as expected.
Anyhow, the project succeeded, and the “tool” did what it was supposed to do. However, I kept thinking about what could be done better if I had to do it again, and the answer was finding an easy way to bundle the artefacts. I would like to bundle the files inside a single file, but in a way that I can then read individual files without extracting them from the bundle. This would make it easy to handle the artefacts, versioning and distribution.
After some consideration, I decided to experiment with ZIP files. And here’s the code and explanations from my experiments.
Preamble
One of the reasons I looked into ZIP files is because I got curious once I learned that llamafile1 and the APE (Actually Portable Executable)2 formats both use ZIP as a “container” format. So even if the ZIP format wasn’t going to be the best solution for my problem (it turned out to be exactly what I needed), I still wanted to learn a bit more about the ZIP format.
ZIP files
After some online searches and with some help from ChatGPT/Claude, this is what I found out about the ZIP format.
It consists of three main sections:
- A series of local file headers followed by their corresponding file data
- A central directory containing metadata about all files
- An end of central directory record
The structure looks something like this:
[Local File Header 1][File Data 1] # File data follows each header
[Local File Header 2][File Data 2]
...
[Central Directory] # Contains metadata: headers, compression, offsets, attributes
[End of Central Directory Record] # Has Central Directory location, entry count, comments
- File Data: Raw file contents immediately after each Local File Header
- Central Directory: Contains metadata about all files including:
- File headers
- Compression information
- Offsets to Local File Headers
- File attributes
- End of Central Directory: Final structure containing:
- Location of the Central Directory
- Number of entries
- Comments
Each local header file has a fixed size of 30 bytes, and potentially a variable length extra field. This is the structure:
Offset | Bytes | Description |
---|---|---|
0 | 4 | Signature (0x04034b50) |
4 | 2 | Version needed |
6 | 2 | Flags |
8 | 2 | Compression method |
10 | 2 | Last mod time |
12 | 2 | Last mod date |
14 | 4 | CRC-32 |
18 | 4 | Compressed size |
22 | 4 | Uncompressed size |
26 | 2 | Filename length (n) |
28 | 2 | Extra field length (m) |
30 | n | Filename |
30+n | m | Extra field |
What makes this format particularly suitable for my needs is that the actual file data follows immediately after each local header. This means you can read individual files without having to process the entire ZIP archive - you just need to know where the file’s data starts and how long it is.
The central directory at the end acts like an index, storing metadata about all files including their offsets, making it efficient to locate specific files within the archive.
The code
As an example, I will try bundling two TXT files into a ZIP file, then read them back. This is the code I’m running at the beginning of my script to “reset” the local files:
from pathlib import Path
sample_files = [Path("t1.txt"), Path("t2.txt")]
archive_name = Path("archive.zip")
def create_sample_files() -> None:
with open(sample_files[0], "w") as f:
f.write("Hello, world!\n")
with open(sample_files[1], "w") as f:
f.write("foo\nbar\nbaz\n")
def delete_files_if_exists() -> None:
"""
Delete the archive file if it exists in the current working directory.
"""
if archive_name.exists():
archive_name.unlink()
print(f"Deleted existing archive: {archive_name}")
for file in sample_files:
if file.exists():
file.unlink()
print(f"Deleted existing file: {file}")
delete_files_if_exists()
create_sample_files()
First, we create an uncompressed ZIP file (using ZIP_STORED
compression mode). This allows us to read individual files directly by knowing their offset and size in the archive.
import zipfile
from pathlib import Path
def create_uncompressed_archive(files: list[Path], archive_name: Path) -> Path:
"""
Create an uncompressed ZIP archive containing the specified files.
Args:
files: List of Path objects to include in the archive
archive_name: Path object for the ZIP archive to create
"""
with zipfile.ZipFile(
archive_name, "w", compression=zipfile.ZIP_STORED
) as zf:
for file in files:
zf.write(file)
return archive_name
Now we need to read an individual file from the ZIP file. We need to find the offset, and then read the file data.
def get_file_offset(archive_name: Path, filename: str) -> tuple[int, int]:
"""
Get the offset and size of a file within the ZIP archive.
Args:
archive_name: Path object for the ZIP archive
filename: Name of the file within the archive
Returns:
Tuple of (offset, size) where the file data is located
"""
# ## ZIP File Structure Overview
# A ZIP file is composed of three main sections:
# 1. Local File Headers + File Data (repeated for each file)
# 2. Central Directory
# 3. End of Central Directory Record
# [Local File Header 1][File Data 1] # File data follows each header
# [Local File Header 2][File Data 2]
# ...
# [Central Directory] # Contains metadata: headers, compression, offsets, attributes
# [End of Central Directory Record] # Has Central Directory location, entry count, comments
# File Data: Raw file contents immediately after each Local File Header
# Central Directory: Contains metadata about all files including:
# - File headers
# - Compression information
# - Offsets to Local File Headers
# - File attributes
# End of Central Directory: Final structure containing:
# - Location of the Central Directory
# - Number of entries
# - Comments
# | Offset | Bytes | Description |
# |--------|-------|-------------------------|
# | 0 | 4 | Signature (0x04034b50) |
# | 4 | 2 | Version needed |
# | 6 | 2 | Flags |
# | 8 | 2 | Compression method |
# | 10 | 2 | Last mod time |
# | 12 | 2 | Last mod date |
# | 14 | 4 | CRC-32 |
# | 18 | 4 | Compressed size |
# | 22 | 4 | Uncompressed size |
# | 26 | 2 | Filename length (n) |
# | 28 | 2 | Extra field length (m) |
# | 30 | n | Filename |
# | 30+n | m | Extra field |
# Size of the fixed portion of the ZIP local file header
ZIP_LOCAL_HEADER_SIZE = 30
with zipfile.ZipFile(archive_name, "r") as zf:
info = zf.getinfo(filename)
print("info:", info)
# Will print something like:
# info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
print("Header offset:", info.header_offset)
# Will print something like:
# Header offset: 50
print("Extra field:", info.extra)
# Will print something like:
# Extra field: b''
# The header_offset points to the local file header
# We need to skip past the header to get to the actual data
n = len(info.filename)
m = len(info.extra)
header_size = ZIP_LOCAL_HEADER_SIZE + n + m
data_offset = info.header_offset + header_size
return (data_offset, info.file_size)
That function returns a tuple with two values: the offset and the size of the file data. We can use that to .seek()
and .read()
the file data from the ZIP file.
import io
from pathlib import Path
def read_file_at_offset(archive_name: Path, filename: str) -> bytes:
"""
Read file contents directly from the ZIP archive using offsets.
Args:
archive_name: Path object for the ZIP archive
filename: Name of the file within the archive
Returns:
Raw bytes of the file content
"""
offset, size = get_file_offset(archive_name, filename)
with open(archive_name, "rb") as f:
f.seek(offset)
return f.read(size)
And that’s basically it.
We can still make some improvements, though. Instead of reading the full file in memory, we could use a io.BufferedIOBase
subclass to allow reading the file in chunks, or even streaming the file data. Similar to how you’re can read a file from disk using open(file)
. Here’s the final code, with everything put together (local files preparation, ZIP file creation, and reading the file at offset).
import io
import sys
import zipfile
from pathlib import Path
sample_files = [Path("t1.txt"), Path("t2.txt")]
archive_name = Path("archive.zip")
def create_sample_files() -> None:
with open(sample_files[0], "w") as f:
f.write("Hello, world!\n")
with open(sample_files[1], "w") as f:
f.write("foo\nbar\nbaz\n")
def delete_files_if_exists() -> None:
"""
Delete the archive file if it exists in the current working directory.
"""
if archive_name.exists():
archive_name.unlink()
print(f"Deleted existing archive: {archive_name}")
for file in sample_files:
if file.exists():
file.unlink()
print(f"Deleted existing file: {file}")
def create_uncompressed_archive(files: list[Path], archive_name: Path) -> Path:
"""
Create an uncompressed ZIP archive containing the specified files.
Args:
files: List of Path objects to include in the archive
archive_name: Path object for the ZIP archive to create
"""
with zipfile.ZipFile(
archive_name, "w", compression=zipfile.ZIP_STORED
) as zf:
for file in files:
zf.write(file)
return archive_name
def list_archive_contents(archive_name: Path) -> Path:
"""
List the filenames in a ZIP archive.
Args:
archive_name: Path object for the ZIP archive to read
"""
try:
with zipfile.ZipFile(archive_name, "r") as zf:
print(f"\nFiles in {archive_name}:")
for filename in zf.namelist():
print(f"- {filename}")
return archive_name
except Exception as e:
print(f"Error reading archive: {e}", file=sys.stderr)
raise
def print_file_info(archive_name: Path, filename: str) -> None:
with zipfile.ZipFile(archive_name, "r") as zf:
info = zf.getinfo(filename)
print(f"File: {filename}")
print("info:", info)
# Will print something like:
# info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
print("Header offset:", info.header_offset)
# Will print something like:
# Header offset: 50
print("Extra field:", info.extra)
# Will print something like:
# Extra field: b''
print(f" Size: {info.file_size} bytes")
def get_file_offset(archive_name: Path, filename: str) -> tuple[int, int]:
"""
Get the offset and size of a file within the ZIP archive.
Args:
archive_name: Path object for the ZIP archive
filename: Name of the file within the archive
Returns:
Tuple of (offset, size) where the file data is located
"""
# ## ZIP File Structure Overview
# A ZIP file is composed of three main sections:
# 1. Local File Headers + File Data (repeated for each file)
# 2. Central Directory
# 3. End of Central Directory Record
# [Local File Header 1][File Data 1] # File data follows each header
# [Local File Header 2][File Data 2]
# ...
# [Central Directory] # Contains metadata: headers, compression, offsets, attributes
# [End of Central Directory Record] # Has Central Directory location, entry count, comments
# File Data: Raw file contents immediately after each Local File Header
# Central Directory: Contains metadata about all files including:
# - File headers
# - Compression information
# - Offsets to Local File Headers
# - File attributes
# End of Central Directory: Final structure containing:
# - Location of the Central Directory
# - Number of entries
# - Comments
# | Offset | Bytes | Description |
# |--------|-------|-------------------------|
# | 0 | 4 | Signature (0x04034b50) |
# | 4 | 2 | Version needed |
# | 6 | 2 | Flags |
# | 8 | 2 | Compression method |
# | 10 | 2 | Last mod time |
# | 12 | 2 | Last mod date |
# | 14 | 4 | CRC-32 |
# | 18 | 4 | Compressed size |
# | 22 | 4 | Uncompressed size |
# | 26 | 2 | Filename length (n) |
# | 28 | 2 | Extra field length (m) |
# | 30 | n | Filename |
# | 30+n | m | Extra field |
# Size of the fixed portion of the ZIP local file header
ZIP_LOCAL_HEADER_SIZE = 30
with zipfile.ZipFile(archive_name, "r") as zf:
info = zf.getinfo(filename)
# The header_offset points to the local file header
# We need to skip past the header to get to the actual data
n = len(info.filename)
m = len(info.extra)
header_size = ZIP_LOCAL_HEADER_SIZE + n + m
data_offset = info.header_offset + header_size
return (data_offset, info.file_size)
class ZipFileView(io.BufferedIOBase):
def __init__(self, archive_path: Path, offset: int, size: int) -> None:
self._archive_path = archive_path
self._offset = offset
self._size = size
self._pos = 0
self._file = open(archive_path, "rb")
self._file.seek(offset)
def read(self, size: int | None = None) -> bytes:
if size is None:
size = self._size - self._pos
size = min(size, self._size - self._pos)
if size <= 0:
return b""
data = self._file.read(size)
self._pos += len(data)
return data
def seek(self, offset: int, whence: int = io.SEEK_SET) -> int:
if whence == io.SEEK_SET:
new_pos = offset
elif whence == io.SEEK_CUR:
new_pos = self._pos + offset
elif whence == io.SEEK_END:
new_pos = self._size + offset
else:
raise ValueError("Invalid whence value")
new_pos = max(0, min(new_pos, self._size))
self._file.seek(self._offset + new_pos)
self._pos = new_pos
return self._pos
def tell(self) -> int:
return self._pos
def close(self) -> None:
self._file.close()
def readable(self) -> bool:
return True
def seekable(self) -> bool:
return True
def write(self, *args, **kwargs) -> int:
raise io.UnsupportedOperation("write")
def writable(self) -> bool:
return False
def read_file_at_offset(archive_name: Path, filename: str) -> ZipFileView:
"""
Read file contents directly from the ZIP archive using offsets.
Args:
archive_name: Path object for the ZIP archive
filename: Name of the file within the archive
Returns:
Raw bytes of the file content
"""
offset, size = get_file_offset(archive_name, filename)
return ZipFileView(archive_name, offset, size)
# with open(archive_name, "rb") as f:
# f.seek(offset)
# return f.read(size)
def main():
files_to_archive = [Path("t1.txt"), Path("t2.txt")]
archive_name = Path("archive.zip")
delete_files_if_exists()
create_sample_files()
create_uncompressed_archive(files_to_archive, archive_name)
print(f"Successfully created {archive_name}")
list_archive_contents(archive_name)
print("\n====\n")
print_file_info(archive_name, "t2.txt")
offset, size = get_file_offset(archive_name, "t2.txt")
print(
f"File 't2.txt' is located at offset {offset} with size {size} bytes"
)
print("\n====\n")
zf = read_file_at_offset(archive_name, "t2.txt")
print("File contents:")
print(zf.read().decode())
return 0
if __name__ == "__main__":
sys.exit(main())
It should print something like this:
Successfully created archive.zip
Files in archive.zip:
- t1.txt
- t2.txt
====
File: t2.txt
info: <ZipInfo filename='t2.txt' filemode='-rw-r--r--' file_size=12>
Header offset: 50
Extra field: b''
Size: 12 bytes
File 't2.txt' is located at offset 86 with size 12 bytes
====
File contents:
foo
bar
baz
https://github.com/Mozilla-Ocho/llamafile/blob/ef7321e01cf384ca9cb99178671185d969beb290/README.md#zip-weights-embedding ↩︎
https://github.com/jart/cosmopolitan/blob/102edf4ea2805749856094f21ef249c259e83740/tool/cosmocc/README.md#binary-archive-format [2]: “PKZIP Executables Make Pretty Good Containers”: https://justine.lol/ape.html ↩︎