Stream a CSV from S3 to a parquet file
Table of contents
I wanted to query a large TSV file stored in S3. To achieve this, I decided to convert it to Parquet and query it using DuckDB. However, I didn’t want to download the full file and then convert it. Instead, I wanted to stream the CSV file directly from S3 and write the output to a Parquet file. Here are a couple of approaches that worked quite nicely.
Setup
I’m running these experiments on an EC2 instance with 8 cores and 32GB of RAM. The data is stored in a 700GB gp2
volume with 2100 IOPS.
As a sample file, I’m using a gzip-compressed TSV file. The file is 4.1 GiB compressed.
Using subprocesses and the AWS CLI
The first approach is quite simple, flexible, and fast. We can use the aws
CLI to get the file from S3. We can use -
as the destination of the aws s3 cp
command and the data will be streamed to the standard output. We can then pipe this data to pyarrow
or to another process to decompress the data.
This TSV file is not utf-8
, so I need to use a custom csv.ReadOptions
object. I’m also using parseopt = csv.ParseOptions(delimiter="\t")
to tell pyarrow
this is a tab-separated file. Here’s the code with explanatory comments. The trick is using stdout=subprocess.PIPE
and then passing the .stdout
property of the subprocess as the input to the pyarrow
csv reader.
from pyarrow import csv
import pyarrow.parquet as pq
import subprocess
import sys
# Get S3 URI from arguments
uri = sys.argv[1]
# Stream data from S3
s3cp = subprocess.Popen(["aws", "s3", "cp", uri, "-"], stdout=subprocess.PIPE)
# Decompress the data and pipe the output to stdout
decomp = subprocess.Popen(
["gzip", "-d", "-c"], stdin=s3cp.stdout, stdout=subprocess.PIPE
)
# Just to tell the type checker that stdout is not None
assert decomp.stdout
readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")
# Use the `stdout` from the `decomp` subprocess as the input.
reader = csv.open_csv(
decomp.stdout,
read_options=readopt,
parse_options=parseopt,
)
# Write to parquet in batches
writer = pq.ParquetWriter("output.parquet", schema=reader.schema)
while True:
try:
writer.write_batch(reader.read_next_batch())
except StopIteration:
break
writer.close()
Other advantages
pyarrow
can automatically decompress some formats, but by chaining subprocesses, we can manipulate the data however we want before passing it to the pyarrow
reader.
Support other compression formats.
pyarrow
can automatically decompress gzipped files, but not zstd files. By chaining subprocesses, we can manipulate the data however we want and it’s easier to support other compression formats.Less dependecies1.
pyarrow
already bundles some dependencies to create virtual filesystems. But in general, this approach let’s us do the job without having to worry about setting upboto3
and other stuff. If theaws
CLI works, the subprocess will work.
Using parquet.fs
Now, instead of using the aws
CLI we will use a pyarrow
filesystem. Everything else is similar to the previous approach. Here’s the commented code:
import sys
import pyarrow.parquet as pq
from pyarrow import csv, fs
readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")
# Create filesystem
s3, path = fs.FileSystem.from_uri(sys.argv[1])
# Open a file handle pointing to the S3 file
fh = s3.open_input_stream(path)
reader = csv.open_csv(
fh,
read_options=readopt,
parse_options=parseopt,
)
writer = pq.ParquetWriter("output.parquet", schema=reader.schema)
while True:
try:
writer.write_batch(reader.read_next_batch())
except StopIteration:
break
writer.close()
Alternative (s3fs)
Similar to the approach above. Right before publishing this post I was having some issues with AWS permissions using the pyarrow
filesystem. This script uses s3fs directly instead of using pyarrow
’s built-in classes.
import pyarrow.parquet as pq
import s3fs
from pyarrow import csv
import sys
# Create filesystem
s3 = s3fs.S3FileSystem()
readopt = csv.ReadOptions(encoding="latin-1")
parseopt = csv.ParseOptions(delimiter="\t")
# Open file, this requires just the path, not the full URI
# so we are stripping the `s3://` prefix
fh = s3.open(
sys.argv[1].lstrip("s3://"),
compression="gzip",
)
reader = csv.open_csv(
fh,
read_options=readopt,
parse_options=parseopt,
)
writer = pq.ParquetWriter("output.parquet", schema=reader.schema)
while True:
try:
writer.write_batch(reader.read_next_batch())
except StopIteration:
break
writer.close()