Skip to content

Instantly share code, notes, and snippets.

@rjpower
Last active November 25, 2025 23:17
Show Gist options
  • Select an option

  • Save rjpower/19de58a0b9231ce09d56b593fe36be46 to your computer and use it in GitHub Desktop.

Select an option

Save rjpower/19de58a0b9231ce09d56b593fe36be46 to your computer and use it in GitHub Desktop.
Parquet inspector - stream, filter, select columns, show metadata, bidirectional JSONL conversion
#!/usr/bin/env -S uv run --script
# /// script
# dependencies = [
# "pyarrow",
# "fsspec",
# "click",
# "s3fs",
# "gcsfs",
# ]
# ///
import json
import sys
from typing import Optional
import click
import fsspec
import pyarrow.parquet as pq
import pyarrow as pa
@click.command()
@click.argument("path", required=False)
@click.option("--columns", "-c", help="Comma-separated columns to select")
@click.option("--metadata", "-m", is_flag=True, help="Show metadata and exit")
@click.option("--schema", "-s", is_flag=True, help="Show schema and exit")
@click.option("--take", "-n", type=int, help="Take first N rows")
@click.option("--skip", type=int, default=0, help="Skip first N rows")
@click.option("--filter", "-f", "filters", multiple=True, help="Filter: column:substring (can specify multiple)")
@click.option("--row-group", "-g", type=int, help="Read specific row group")
@click.option("--from-jsonl", is_flag=True, help="Read JSONL from stdin, write Parquet to path")
@click.option("--output", "-o", help="Output parquet file (for use with --from-jsonl)")
def main(
path: Optional[str],
columns: Optional[str],
metadata: bool,
schema: bool,
take: Optional[int],
skip: int,
filters: tuple[str, ...],
row_group: Optional[int],
from_jsonl: bool,
output: Optional[str],
):
"""Inspect and stream Parquet files as JSONL.
Supports local files, S3 (s3://), GCS (gs://), etc.
Supports glob patterns to read multiple files.
Examples:
pq file.parquet
pq s3://bucket/file.parquet --columns id,name --take 10
pq gs://bucket/file.parquet --filter name:john --filter status:active
pq 'data/train-*-of-00006.parquet' --columns id,text
pq 'gs://bucket/raw/*/data/train-*.parquet' --take 100
cat file.parquet | pq --schema
pq file.parquet --metadata
cat data.jsonl | pq --from-jsonl -o output.parquet
"""
# Handle JSONL to Parquet conversion
if from_jsonl:
output_path = output or path
if not output_path:
raise click.BadParameter("Must specify output file with --output or as argument")
records = []
for line in sys.stdin:
if line.strip():
records.append(json.loads(line))
if not records:
raise click.ClickException("No records read from stdin")
table = pa.Table.from_pylist(records)
pq.write_table(table, output_path)
click.echo(f"Wrote {len(records)} rows to {output_path}", err=True)
return
# Handle stdin if no path provided
if path is None or path == "-":
if sys.stdin.isatty():
raise click.BadParameter("No input file specified and stdin is a terminal")
# Read from stdin into temp file
import tempfile
with tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") as tmp:
tmp.write(sys.stdin.buffer.read())
path = tmp.name
# Always use fsspec.open_files (works for both single files and globs)
of = fsspec.open_files(path)
if not of:
raise click.ClickException(f"No files matched: {path}")
# For metadata/schema, show info about first file
if metadata or schema:
path = of[0].path
with fsspec.open(path, "rb") as f:
parquet_file = pq.ParquetFile(f)
else:
# Parse column selection
col_list = columns.split(",") if columns else None
# Parse filters
filter_dict = {}
for f in filters:
if ":" not in f:
raise click.BadParameter(f"Filter must be column:substring, got: {f}")
col, val = f.split(":", 1)
filter_dict[col] = val
# Iterator to read records from all files in batches
def iter_records():
for open_file in of:
with open_file as f:
parquet_file = pq.ParquetFile(f)
if row_group is not None:
table = parquet_file.read_row_group(row_group, columns=col_list)
yield from table.to_pylist()
else:
# Read in batches for efficiency
for batch in parquet_file.iter_batches(columns=col_list, batch_size=1024):
yield from batch.to_pylist()
# Process records with skip/filter/take
skipped = 0
emitted = 0
for record in iter_records():
# Apply skip
if skipped < skip:
skipped += 1
continue
# Apply filters
if filter_dict:
matches = True
for col, substring in filter_dict.items():
if substring not in str(record.get(col, "")):
matches = False
break
if not matches:
continue
# Output record
print(json.dumps(record))
emitted += 1
# Apply take limit
if take is not None and emitted >= take:
break
return
# If we got here, we're showing metadata or schema
with fsspec.open(path, "rb") as f:
parquet_file = pq.ParquetFile(f)
# Show metadata
if metadata:
meta = parquet_file.metadata
click.echo(f"Created by: {meta.created_by}")
click.echo(f"Num rows: {meta.num_rows}")
click.echo(f"Num row groups: {meta.num_row_groups}")
click.echo(f"Num columns: {meta.num_columns}")
click.echo(f"Format version: {meta.format_version}")
click.echo(f"\nRow groups:")
for i in range(meta.num_row_groups):
rg = meta.row_group(i)
click.echo(f" [{i}] {rg.num_rows} rows, {rg.total_byte_size} bytes")
return
# Show schema
if schema:
click.echo(parquet_file.schema)
return
# Parse column selection
col_list = columns.split(",") if columns else None
# Parse filters
filter_dict = {}
for f in filters:
if ":" not in f:
raise click.BadParameter(f"Filter must be column:substring, got: {f}")
col, val = f.split(":", 1)
filter_dict[col] = val
# Read data
if row_group is not None:
table = parquet_file.read_row_group(row_group, columns=col_list)
else:
table = parquet_file.read(columns=col_list)
# Convert to dict
records = table.to_pylist()
# Apply skip
if skip > 0:
records = records[skip:]
# Apply filters
for col, substring in filter_dict.items():
records = [r for r in records if substring in str(r.get(col, ""))]
# Apply take
if take is not None:
records = records[:take]
# Output as JSONL
for record in records:
print(json.dumps(record))
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment