-
-
Save rjpower/19de58a0b9231ce09d56b593fe36be46 to your computer and use it in GitHub Desktop.
Parquet inspector - stream, filter, select columns, show metadata, bidirectional JSONL conversion
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env -S uv run --script | |
| # /// script | |
| # dependencies = [ | |
| # "pyarrow", | |
| # "fsspec", | |
| # "click", | |
| # "s3fs", | |
| # "gcsfs", | |
| # ] | |
| # /// | |
| import json | |
| import sys | |
| from typing import Optional | |
| import click | |
| import fsspec | |
| import pyarrow.parquet as pq | |
| import pyarrow as pa | |
| @click.command() | |
| @click.argument("path", required=False) | |
| @click.option("--columns", "-c", help="Comma-separated columns to select") | |
| @click.option("--metadata", "-m", is_flag=True, help="Show metadata and exit") | |
| @click.option("--schema", "-s", is_flag=True, help="Show schema and exit") | |
| @click.option("--take", "-n", type=int, help="Take first N rows") | |
| @click.option("--skip", type=int, default=0, help="Skip first N rows") | |
| @click.option("--filter", "-f", "filters", multiple=True, help="Filter: column:substring (can specify multiple)") | |
| @click.option("--row-group", "-g", type=int, help="Read specific row group") | |
| @click.option("--from-jsonl", is_flag=True, help="Read JSONL from stdin, write Parquet to path") | |
| @click.option("--output", "-o", help="Output parquet file (for use with --from-jsonl)") | |
| def main( | |
| path: Optional[str], | |
| columns: Optional[str], | |
| metadata: bool, | |
| schema: bool, | |
| take: Optional[int], | |
| skip: int, | |
| filters: tuple[str, ...], | |
| row_group: Optional[int], | |
| from_jsonl: bool, | |
| output: Optional[str], | |
| ): | |
| """Inspect and stream Parquet files as JSONL. | |
| Supports local files, S3 (s3://), GCS (gs://), etc. | |
| Supports glob patterns to read multiple files. | |
| Examples: | |
| pq file.parquet | |
| pq s3://bucket/file.parquet --columns id,name --take 10 | |
| pq gs://bucket/file.parquet --filter name:john --filter status:active | |
| pq 'data/train-*-of-00006.parquet' --columns id,text | |
| pq 'gs://bucket/raw/*/data/train-*.parquet' --take 100 | |
| cat file.parquet | pq --schema | |
| pq file.parquet --metadata | |
| cat data.jsonl | pq --from-jsonl -o output.parquet | |
| """ | |
| # Handle JSONL to Parquet conversion | |
| if from_jsonl: | |
| output_path = output or path | |
| if not output_path: | |
| raise click.BadParameter("Must specify output file with --output or as argument") | |
| records = [] | |
| for line in sys.stdin: | |
| if line.strip(): | |
| records.append(json.loads(line)) | |
| if not records: | |
| raise click.ClickException("No records read from stdin") | |
| table = pa.Table.from_pylist(records) | |
| pq.write_table(table, output_path) | |
| click.echo(f"Wrote {len(records)} rows to {output_path}", err=True) | |
| return | |
| # Handle stdin if no path provided | |
| if path is None or path == "-": | |
| if sys.stdin.isatty(): | |
| raise click.BadParameter("No input file specified and stdin is a terminal") | |
| # Read from stdin into temp file | |
| import tempfile | |
| with tempfile.NamedTemporaryFile(delete=False, suffix=".parquet") as tmp: | |
| tmp.write(sys.stdin.buffer.read()) | |
| path = tmp.name | |
| # Always use fsspec.open_files (works for both single files and globs) | |
| of = fsspec.open_files(path) | |
| if not of: | |
| raise click.ClickException(f"No files matched: {path}") | |
| # For metadata/schema, show info about first file | |
| if metadata or schema: | |
| path = of[0].path | |
| with fsspec.open(path, "rb") as f: | |
| parquet_file = pq.ParquetFile(f) | |
| else: | |
| # Parse column selection | |
| col_list = columns.split(",") if columns else None | |
| # Parse filters | |
| filter_dict = {} | |
| for f in filters: | |
| if ":" not in f: | |
| raise click.BadParameter(f"Filter must be column:substring, got: {f}") | |
| col, val = f.split(":", 1) | |
| filter_dict[col] = val | |
| # Iterator to read records from all files in batches | |
| def iter_records(): | |
| for open_file in of: | |
| with open_file as f: | |
| parquet_file = pq.ParquetFile(f) | |
| if row_group is not None: | |
| table = parquet_file.read_row_group(row_group, columns=col_list) | |
| yield from table.to_pylist() | |
| else: | |
| # Read in batches for efficiency | |
| for batch in parquet_file.iter_batches(columns=col_list, batch_size=1024): | |
| yield from batch.to_pylist() | |
| # Process records with skip/filter/take | |
| skipped = 0 | |
| emitted = 0 | |
| for record in iter_records(): | |
| # Apply skip | |
| if skipped < skip: | |
| skipped += 1 | |
| continue | |
| # Apply filters | |
| if filter_dict: | |
| matches = True | |
| for col, substring in filter_dict.items(): | |
| if substring not in str(record.get(col, "")): | |
| matches = False | |
| break | |
| if not matches: | |
| continue | |
| # Output record | |
| print(json.dumps(record)) | |
| emitted += 1 | |
| # Apply take limit | |
| if take is not None and emitted >= take: | |
| break | |
| return | |
| # If we got here, we're showing metadata or schema | |
| with fsspec.open(path, "rb") as f: | |
| parquet_file = pq.ParquetFile(f) | |
| # Show metadata | |
| if metadata: | |
| meta = parquet_file.metadata | |
| click.echo(f"Created by: {meta.created_by}") | |
| click.echo(f"Num rows: {meta.num_rows}") | |
| click.echo(f"Num row groups: {meta.num_row_groups}") | |
| click.echo(f"Num columns: {meta.num_columns}") | |
| click.echo(f"Format version: {meta.format_version}") | |
| click.echo(f"\nRow groups:") | |
| for i in range(meta.num_row_groups): | |
| rg = meta.row_group(i) | |
| click.echo(f" [{i}] {rg.num_rows} rows, {rg.total_byte_size} bytes") | |
| return | |
| # Show schema | |
| if schema: | |
| click.echo(parquet_file.schema) | |
| return | |
| # Parse column selection | |
| col_list = columns.split(",") if columns else None | |
| # Parse filters | |
| filter_dict = {} | |
| for f in filters: | |
| if ":" not in f: | |
| raise click.BadParameter(f"Filter must be column:substring, got: {f}") | |
| col, val = f.split(":", 1) | |
| filter_dict[col] = val | |
| # Read data | |
| if row_group is not None: | |
| table = parquet_file.read_row_group(row_group, columns=col_list) | |
| else: | |
| table = parquet_file.read(columns=col_list) | |
| # Convert to dict | |
| records = table.to_pylist() | |
| # Apply skip | |
| if skip > 0: | |
| records = records[skip:] | |
| # Apply filters | |
| for col, substring in filter_dict.items(): | |
| records = [r for r in records if substring in str(r.get(col, ""))] | |
| # Apply take | |
| if take is not None: | |
| records = records[:take] | |
| # Output as JSONL | |
| for record in records: | |
| print(json.dumps(record)) | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment