Skip to content

Instantly share code, notes, and snippets.

@irajnazamdev
Created August 25, 2025 16:10
Show Gist options
  • Select an option

  • Save irajnazamdev/d247e35441b210ad53accd3c324b5bd2 to your computer and use it in GitHub Desktop.

Select an option

Save irajnazamdev/d247e35441b210ad53accd3c324b5bd2 to your computer and use it in GitHub Desktop.
End-to-end Python pipeline: scrape jobs, normalize data, infer entities, score matches, store in S3, load PostgreSQL, monitor workflows.
#!/usr/bin/env python3
"""
Job Data Pipeline: Scrape, Clean, Score, Store
"""
import requests
from bs4 import BeautifulSoup
import pandas as pd
import boto3
import psycopg2
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import logging
import json
import time
# Logging config
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("JobPipeline")
# AWS S3 client
s3 = boto3.client("s3")
BUCKET = "job-data-lake"
# PostgreSQL config
DB_CONFIG = {
"dbname": "jobsdb",
"user": "admin",
"password": "secret",
"host": "localhost",
"port": "5432"
}
# Scrape function
def scrape_jobs(url):
logger.info("Scraping jobs...")
resp = requests.get(url)
soup = BeautifulSoup(resp.text, "html.parser")
jobs = []
for div in soup.find_all("div", class_="job"):
title = div.find("h2").get_text(strip=True)
desc = div.find("p").get_text(strip=True)
jobs.append({"title": title, "desc": desc})
return pd.DataFrame(jobs)
# Clean/normalize
def clean_data(df):
logger.info("Cleaning data...")
df["title"] = df["title"].str.lower().str.strip()
df["desc"] = df["desc"].str.replace("\n", " ")
return df.drop_duplicates()
# Simple ML inference
def score_jobs(df):
logger.info("Scoring jobs...")
vectorizer = TfidfVectorizer(stop_words="english")
tfidf = vectorizer.fit_transform(df["desc"])
scores = cosine_similarity(tfidf, tfidf)
df["score"] = scores.mean(axis=1)
return df
# Store to S3
def upload_s3(df, key):
logger.info("Uploading to S3...")
s3.put_object(Bucket=BUCKET, Key=key, Body=df.to_csv(index=False))
# Load into Postgres
def load_postgres(df):
logger.info("Loading into Postgres...")
conn = psycopg2.connect(**DB_CONFIG)
cur = conn.cursor()
for _, row in df.iterrows():
cur.execute(
"INSERT INTO jobs (title, description, score) VALUES (%s, %s, %s)",
(row["title"], row["desc"], row["score"])
)
conn.commit()
cur.close()
conn.close()
# Main workflow
if __name__ == "__main__":
start = time.time()
df = scrape_jobs("https://example.com/jobs")
df = clean_data(df)
df = score_jobs(df)
upload_s3(df, "jobs/data.csv")
load_postgres(df)
logger.info("Pipeline finished in %.2fs", time.time() - start)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment