Last active
August 14, 2025 21:40
-
-
Save zhensongren/b12195bae8b9111331aad8f1f764c286 to your computer and use it in GitHub Desktop.
Excel/Sheets ➜ export UTF-8 CSV in Git ➜ FastAPI reads & validates on startup ➜ serves responses.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| # Avoid reading formulas/macros; target a single sheet | |
| df = pd.read_excel("source.xlsx", sheet_name="data", engine="openpyxl") | |
| df.to_csv("export.csv", index=False) | |
| import pandas as pd | |
| import pyarrow as pa | |
| import pyarrow.parquet as pq | |
| # Read CSV with explicit dtypes when possible | |
| df = pd.read_csv( | |
| "input.csv", | |
| dtype={"id": "string", "category": "string"}, | |
| parse_dates=["event_date"], | |
| keep_default_na=False, na_values=["", "NA", "N/A", "null"] | |
| ) | |
| # Light normalization | |
| df["id"] = df["id"].str.strip() | |
| df = df.drop_duplicates(subset=["id"]) | |
| # Quick validations (use pandera/Great Expectations for richer tests) | |
| assert df["id"].notnull().all(), "Null IDs found" | |
| assert set(["id","category","event_date"]).issubset(df.columns) | |
| # Write Parquet (snappy compression) | |
| table = pa.Table.from_pandas(df, preserve_index=False) | |
| pq.write_table(table, "dataset.parquet", compression="snappy") | |
| # Option A — Read-only API (edit in Excel/Sheets, serve from CSV) | |
| # main.py | |
| from fastapi import FastAPI, HTTPException | |
| from pydantic import BaseModel, Field, ValidationError | |
| from typing import List, Optional | |
| import pandas as pd | |
| from functools import lru_cache | |
| from pathlib import Path | |
| DATA_PATH = Path("data/dataset.csv") | |
| class Item(BaseModel): | |
| id: str = Field(..., min_length=1) | |
| name: str | |
| category: Optional[str] = None | |
| value: Optional[float] = None | |
| # add your columns here with types | |
| def _load_csv() -> List[Item]: | |
| df = pd.read_csv( | |
| DATA_PATH, | |
| dtype={"id": "string", "name": "string", "category": "string"}, | |
| keep_default_na=False, | |
| na_values=["", "NA", "N/A", "null"] | |
| ) | |
| # light normalization & checks | |
| if "id" not in df.columns: | |
| raise RuntimeError("CSV missing required 'id' column.") | |
| df["id"] = df["id"].str.strip() | |
| if df["id"].duplicated().any(): | |
| dups = df[df["id"].duplicated()]["id"].tolist() | |
| raise RuntimeError(f"Duplicate ids in CSV: {dups[:5]}") | |
| try: | |
| return [Item(**rec) for rec in df.to_dict(orient="records")] | |
| except ValidationError as e: | |
| raise RuntimeError(f"Schema/typing problem: {e}") | |
| @lru_cache(maxsize=1) | |
| def get_data(items_mtime: float) -> List[Item]: | |
| # cache invalidates when file mtime changes | |
| return _load_csv() | |
| app = FastAPI() | |
| def _mtime() -> float: | |
| return DATA_PATH.stat().st_mtime | |
| @app.get("/items", response_model=List[Item]) | |
| def list_items(category: Optional[str] = None, q: Optional[str] = None): | |
| data = get_data(_mtime()) | |
| rows = data | |
| if category: | |
| rows = [r for r in rows if (r.category or "").lower() == category.lower()] | |
| if q: | |
| ql = q.lower() | |
| rows = [r for r in rows if ql in (r.name or "").lower()] | |
| return rows | |
| @app.get("/items/{item_id}", response_model=Item) | |
| def get_item(item_id: str): | |
| data = get_data(_mtime()) | |
| for r in data: | |
| if r.id == item_id: | |
| return r | |
| raise HTTPException(status_code=404, detail="Not found") | |
| """Why this works for you | |
| Tiny, fast, and dependency-light. | |
| Editors can keep using Excel/Sheets; you gatekeep with types and duplicate checks. | |
| The API auto-reloads data when the CSV file changes (via mtime-aware cache). | |
| Tips | |
| Standardize CSV: UTF-8, comma delimiter, headers in row 1, no formulas. | |
| Keep a small schema.yml (columns + types + allowed values) alongside the CSV for review. | |
| If you want stricter checks, add pandera or Great Expectations in a pre-commit/CI step. | |
| """ | |
| # Option B — If the API must support writes (POST/PUT/DELETE) | |
| """Flow: Excel/Sheets ➜ export CSV ➜ one-shot import to SQLite ➜ FastAPI reads/writes SQLite ➜ optional nightly export back to CSV/Parquet. | |
| """ | |
| # models.py | |
| from sqlmodel import SQLModel, Field | |
| from typing import Optional | |
| class Item(SQLModel, table=True): | |
| id: str = Field(primary_key=True) | |
| name: str | |
| category: Optional[str] = None | |
| value: Optional[float] = None | |
| # app.py | |
| from fastapi import FastAPI, HTTPException | |
| from sqlmodel import SQLModel, Session, create_engine, select | |
| from models import Item | |
| engine = create_engine("sqlite:///data.db", connect_args={"check_same_thread": False}) | |
| app = FastAPI() | |
| @app.on_event("startup") | |
| def on_startup(): | |
| SQLModel.metadata.create_all(engine) | |
| @app.get("/items") | |
| def list_items(category: str | None = None, q: str | None = None): | |
| with Session(engine) as s: | |
| stmt = select(Item) | |
| rows = s.exec(stmt).all() | |
| if category: | |
| rows = [r for r in rows if (r.category or "").lower() == category.lower()] | |
| if q: | |
| ql = q.lower() | |
| rows = [r for r in rows if ql in (r.name or "").lower()] | |
| return rows | |
| @app.get("/items/{item_id}") | |
| def get_item(item_id: str): | |
| with Session(engine) as s: | |
| item = s.get(Item, item_id) | |
| if not item: | |
| raise HTTPException(404, "Not found") | |
| return item | |
| @app.post("/items", status_code=201) | |
| def create_item(item: Item): | |
| with Session(engine) as s: | |
| if s.get(Item, item.id): | |
| raise HTTPException(409, "ID already exists") | |
| s.add(item); s.commit(); s.refresh(item) | |
| return item | |
| # import_csv.py | |
| import pandas as pd | |
| from sqlmodel import Session | |
| from models import Item | |
| from app import engine | |
| df = pd.read_csv("data/dataset.csv", dtype={"id": "string"}, keep_default_na=False, | |
| na_values=["", "NA", "N/A", "null"]) | |
| with Session(engine) as s: | |
| for rec in df.to_dict(orient="records"): | |
| s.merge(Item(**rec)) | |
| s.commit() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment