Skip to content

Instantly share code, notes, and snippets.

@wooparadog
Created November 11, 2025 07:31
Show Gist options
  • Select an option

  • Save wooparadog/e3c3848299abd008264bb7806c71c24f to your computer and use it in GitHub Desktop.

Select an option

Save wooparadog/e3c3848299abd008264bb7806c71c24f to your computer and use it in GitHub Desktop.
import argparse
import json
import logging
from datetime import datetime, timedelta
from typing import Dict, List, Optional
import requests
from sqlalchemy.dialects.postgresql import insert
from orion.biz.models.meta_ads import MetaAds
from orion.config import settings
from orion.utils.db import DBSession, transactional_session
from orion.utils.sentry import capture_error_to_sentry
logger = logging.getLogger(__name__)
META_ADS_BASE_URL = f"https://graph.facebook.com/{settings.META_ADS_API_VERSION}"
def generate_date_ranges(days: int) -> List[Dict[str, str]]:
"""Generate date ranges for the specified number of days back from yesterday.
Args:
days: Number of days to generate ranges for (including yesterday)
Returns:
List of date ranges in format [{"since": "YYYY-MM-DD", "until": "YYYY-MM-DD"}, ...]
"""
date_ranges = []
today = datetime.now().date()
for i in range(days + 2):
target_date = today - timedelta(days=i)
date_str = target_date.strftime("%Y-%m-%d")
date_ranges.append({"since": date_str, "until": date_str})
return date_ranges
def get_meta_ads_data(
campaign_id: str,
access_token: str,
date_ranges: List[Dict[str, str]],
after_cursor: Optional[str] = None,
) -> tuple[list, Optional[str]]:
"""Fetch ads data from Meta Ads API with paging support.
Args:
campaign_id: The Meta Ads campaign ID to fetch data for
access_token: The Meta Ads API access token
date_ranges: List of date ranges in format [{"since": "YYYY-MM-DD", "until": "YYYY-MM-DD"}, ...]
after_cursor: Pagination cursor for fetching next page of results
Returns:
tuple: (list of ads data, next page cursor)
"""
url = f"{META_ADS_BASE_URL}/{campaign_id}/insights"
params = {
"fields": "impressions,ad_id,campaign_id,adset_id,account_name,ad_name,campaign_name,adset_name,spend,results,clicks,inline_link_clicks,reach",
"time_ranges": json.dumps(date_ranges),
"limit": "100",
"level": "ad",
"access_token": access_token,
}
if after_cursor:
params["after"] = after_cursor
try:
response = requests.get(url, params=params)
response.raise_for_status()
data = response.json()
# Get next page cursor if available
next_cursor = None
if (
"paging" in data
and "cursors" in data["paging"]
and "after" in data["paging"]["cursors"]
):
next_cursor = data["paging"]["cursors"]["after"]
return data["data"], next_cursor
except Exception as e:
logger.error(f"Error fetching Meta Ads data: {str(e)}")
capture_error_to_sentry(e)
return [], None
def sync_meta_ads(campaign_ids: List[str], days: int):
"""Sync Meta Ads data to database for multiple campaigns.
Args:
campaign_ids: List of Meta Ads campaign IDs to sync
days: Number of days to sync back from yesterday
"""
access_token = settings.META_ADS_ACCESS_TOKEN
if not access_token:
logger.error("Missing required configuration: META_ADS_ACCESS_TOKEN")
return
if not campaign_ids:
logger.error("No campaign IDs provided")
return
total_records = 0
date_ranges = generate_date_ranges(days)
for campaign_id in campaign_ids:
logger.info(f"Processing campaign ID: {campaign_id}")
campaign_records = 0
next_cursor = None
while True:
ads_data, next_cursor = get_meta_ads_data(
campaign_id, access_token, date_ranges, next_cursor
)
if not ads_data:
break
with transactional_session(DBSession) as session:
for ad_data in ads_data:
# Convert date_start and date_stop to a single date
assert (
ad_data["date_start"] == ad_data["date_stop"]
), f'Date range should be the same for all ads: {ad_data["date_start"]} != {ad_data["date_stop"]}'
ad_data["date"] = ad_data["date_start"]
ad_data.pop("date_start")
ad_data.pop("date_stop")
# Convert string values to appropriate types
ad_data["impressions"] = int(ad_data.get("impressions", 0))
ad_data["clicks"] = int(ad_data.get("clicks", 0))
ad_data["inline_link_clicks"] = int(
ad_data.get("inline_link_clicks", 0)
)
ad_data["spend"] = float(ad_data.get("spend", 0))
ad_data["reach"] = float(ad_data.get("reach", 0))
# Create upsert statement
stmt = insert(MetaAds).values(**ad_data)
stmt = stmt.on_conflict_do_update(
index_elements=["ad_id", "date"], set_=ad_data
)
try:
session.execute(stmt)
except Exception as e:
logger.error(f"Error upserting ad data: {str(e)}")
capture_error_to_sentry(e)
continue
campaign_records += len(ads_data)
logger.info(f"Processed {len(ads_data)} records in current batch")
if not next_cursor:
break
total_records += campaign_records
logger.info(f"Completed campaign {campaign_id} with {campaign_records} records")
logger.info(
f"Successfully synced {total_records} total ads records across {len(campaign_ids)} campaigns"
)
def main():
parser = argparse.ArgumentParser(description="Sync Meta Ads data to database")
parser.add_argument(
"--campaign-ids",
required=True,
nargs="+",
help="One or more Meta Ads Campaign IDs to sync",
)
parser.add_argument(
"--days",
required=True,
type=int,
help="Number of days to sync back from yesterday",
)
parser.add_argument(
"--log-level",
default="INFO",
choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
help="Set the logging level",
)
args = parser.parse_args()
# Configure logging
logging.basicConfig(
level=getattr(logging, args.log_level),
format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
)
try:
sync_meta_ads(args.campaign_ids, args.days)
except Exception as e:
logger.error(f"Critical error in sync_meta_ads: {str(e)}")
capture_error_to_sentry(e)
raise
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment