rand[om]

rand[om]

med ∩ ml

Stream a CSV from S3 to a parquet file

I wanted to query a large TSV file stored in S3. To achieve this, I decided to convert it to Parquet and query it using DuckDB. However, I didn’t want to download the full file and then convert it. Instead, I wanted to stream the CSV file directly from S3 and write the output to a Parquet file. Here are a couple of approaches that worked quite nicely.

Setup

I’m running these experiments on an EC2 instance with 8 cores and 32GB of RAM. The data is stored in a 700GB gp2 volume with 2100 IOPS.

As a sample file, I’m using a gzip-compressed TSV file. The file is 4.1 GiB compressed.

Using subprocesses and the AWS CLI

The first approach is quite simple, flexible, and fast. We can use the aws CLI to get the file from S3. We can use - as the destination of the aws s3 cp command and the data will be streamed to the standard output. We can then pipe this data to pyarrow or to another process to decompress the data.

This TSV file is not utf-8, so I need to use a custom csv.ReadOptions object. I’m also using parseopt = csv.ParseOptions(delimiter="\t") to tell pyarrow this is a tab-separated file. Here’s the code with explanatory comments. The trick is using stdout=subprocess.PIPE and then passing the .stdout property of the subprocess as the input to the pyarrow csv reader.

from pyarrow import csv
import pyarrow.parquet as pq
import subprocess
import sys

# Get S3 URI from arguments
uri = sys.argv[1]

# Stream data from S3
s3cp = subprocess.Popen(["aws", "s3", "cp", uri, "-"], stdout=subprocess.PIPE)
# Decompress the data and pipe the output to stdout
decomp = subprocess.Popen(
    ["gzip", "-d", "-c"], stdin=s3cp.stdout, stdout=subprocess.PIPE
)

# Just to tell the type checker that stdout is not None
assert decomp.stdout


readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")

# Use the `stdout` from the `decomp` subprocess as the input.
reader = csv.open_csv(
    decomp.stdout,
    read_options=readopt,
    parse_options=parseopt,
)


# Write to parquet in batches
writer = pq.ParquetWriter("output.parquet", schema=reader.schema)

while True:
    try:
        writer.write_batch(reader.read_next_batch())
    except StopIteration:
        break


writer.close()

Other advantages

pyarrow can automatically decompress some formats, but by chaining subprocesses, we can manipulate the data however we want before passing it to the pyarrow reader.

  1. Support other compression formats. pyarrow can automatically decompress gzipped files, but not zstd files. By chaining subprocesses, we can manipulate the data however we want and it’s easier to support other compression formats.

  2. Less dependecies1. pyarrow already bundles some dependencies to create virtual filesystems. But in general, this approach let’s us do the job without having to worry about setting up boto3 and other stuff. If the aws CLI works, the subprocess will work.

Using parquet.fs

Now, instead of using the aws CLI we will use a pyarrow filesystem. Everything else is similar to the previous approach. Here’s the commented code:

import sys

import pyarrow.parquet as pq
from pyarrow import csv, fs

readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")

# Create filesystem
s3, path = fs.FileSystem.from_uri(sys.argv[1])

# Open a file handle pointing to the S3 file
fh = s3.open_input_stream(path)

reader = csv.open_csv(
    fh,
    read_options=readopt,
    parse_options=parseopt,
)


writer = pq.ParquetWriter("output.parquet", schema=reader.schema)

while True:
    try:
        writer.write_batch(reader.read_next_batch())
    except StopIteration:
        break


writer.close()

Alternative (s3fs)

Similar to the approach above. Right before publishing this post I was having some issues with AWS permissions using the pyarrow filesystem. This script uses s3fs directly instead of using pyarrow’s built-in classes.

import pyarrow.parquet as pq
import s3fs
from pyarrow import csv
import sys

# Create filesystem
s3 = s3fs.S3FileSystem()

readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")

# Open file, this requires just the path, not the full URI
# so we are stripping the `s3://` prefix
fh = s3.open(
	sys.argv[1].lstrip("s3://"),
    compression="gzip",
)

reader = csv.open_csv(
    fh,
    read_options=readopt,
    parse_options=parseopt,
)


writer = pq.ParquetWriter("output.parquet", schema=reader.schema)

while True:
    try:
        writer.write_batch(reader.read_next_batch())
    except StopIteration:
        break


writer.close()