Created
November 11, 2025 07:31
-
-
Save wooparadog/e3c3848299abd008264bb7806c71c24f to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import argparse | |
| import json | |
| import logging | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Optional | |
| import requests | |
| from sqlalchemy.dialects.postgresql import insert | |
| from orion.biz.models.meta_ads import MetaAds | |
| from orion.config import settings | |
| from orion.utils.db import DBSession, transactional_session | |
| from orion.utils.sentry import capture_error_to_sentry | |
| logger = logging.getLogger(__name__) | |
| META_ADS_BASE_URL = f"https://graph.facebook.com/{settings.META_ADS_API_VERSION}" | |
| def generate_date_ranges(days: int) -> List[Dict[str, str]]: | |
| """Generate date ranges for the specified number of days back from yesterday. | |
| Args: | |
| days: Number of days to generate ranges for (including yesterday) | |
| Returns: | |
| List of date ranges in format [{"since": "YYYY-MM-DD", "until": "YYYY-MM-DD"}, ...] | |
| """ | |
| date_ranges = [] | |
| today = datetime.now().date() | |
| for i in range(days + 2): | |
| target_date = today - timedelta(days=i) | |
| date_str = target_date.strftime("%Y-%m-%d") | |
| date_ranges.append({"since": date_str, "until": date_str}) | |
| return date_ranges | |
| def get_meta_ads_data( | |
| campaign_id: str, | |
| access_token: str, | |
| date_ranges: List[Dict[str, str]], | |
| after_cursor: Optional[str] = None, | |
| ) -> tuple[list, Optional[str]]: | |
| """Fetch ads data from Meta Ads API with paging support. | |
| Args: | |
| campaign_id: The Meta Ads campaign ID to fetch data for | |
| access_token: The Meta Ads API access token | |
| date_ranges: List of date ranges in format [{"since": "YYYY-MM-DD", "until": "YYYY-MM-DD"}, ...] | |
| after_cursor: Pagination cursor for fetching next page of results | |
| Returns: | |
| tuple: (list of ads data, next page cursor) | |
| """ | |
| url = f"{META_ADS_BASE_URL}/{campaign_id}/insights" | |
| params = { | |
| "fields": "impressions,ad_id,campaign_id,adset_id,account_name,ad_name,campaign_name,adset_name,spend,results,clicks,inline_link_clicks,reach", | |
| "time_ranges": json.dumps(date_ranges), | |
| "limit": "100", | |
| "level": "ad", | |
| "access_token": access_token, | |
| } | |
| if after_cursor: | |
| params["after"] = after_cursor | |
| try: | |
| response = requests.get(url, params=params) | |
| response.raise_for_status() | |
| data = response.json() | |
| # Get next page cursor if available | |
| next_cursor = None | |
| if ( | |
| "paging" in data | |
| and "cursors" in data["paging"] | |
| and "after" in data["paging"]["cursors"] | |
| ): | |
| next_cursor = data["paging"]["cursors"]["after"] | |
| return data["data"], next_cursor | |
| except Exception as e: | |
| logger.error(f"Error fetching Meta Ads data: {str(e)}") | |
| capture_error_to_sentry(e) | |
| return [], None | |
| def sync_meta_ads(campaign_ids: List[str], days: int): | |
| """Sync Meta Ads data to database for multiple campaigns. | |
| Args: | |
| campaign_ids: List of Meta Ads campaign IDs to sync | |
| days: Number of days to sync back from yesterday | |
| """ | |
| access_token = settings.META_ADS_ACCESS_TOKEN | |
| if not access_token: | |
| logger.error("Missing required configuration: META_ADS_ACCESS_TOKEN") | |
| return | |
| if not campaign_ids: | |
| logger.error("No campaign IDs provided") | |
| return | |
| total_records = 0 | |
| date_ranges = generate_date_ranges(days) | |
| for campaign_id in campaign_ids: | |
| logger.info(f"Processing campaign ID: {campaign_id}") | |
| campaign_records = 0 | |
| next_cursor = None | |
| while True: | |
| ads_data, next_cursor = get_meta_ads_data( | |
| campaign_id, access_token, date_ranges, next_cursor | |
| ) | |
| if not ads_data: | |
| break | |
| with transactional_session(DBSession) as session: | |
| for ad_data in ads_data: | |
| # Convert date_start and date_stop to a single date | |
| assert ( | |
| ad_data["date_start"] == ad_data["date_stop"] | |
| ), f'Date range should be the same for all ads: {ad_data["date_start"]} != {ad_data["date_stop"]}' | |
| ad_data["date"] = ad_data["date_start"] | |
| ad_data.pop("date_start") | |
| ad_data.pop("date_stop") | |
| # Convert string values to appropriate types | |
| ad_data["impressions"] = int(ad_data.get("impressions", 0)) | |
| ad_data["clicks"] = int(ad_data.get("clicks", 0)) | |
| ad_data["inline_link_clicks"] = int( | |
| ad_data.get("inline_link_clicks", 0) | |
| ) | |
| ad_data["spend"] = float(ad_data.get("spend", 0)) | |
| ad_data["reach"] = float(ad_data.get("reach", 0)) | |
| # Create upsert statement | |
| stmt = insert(MetaAds).values(**ad_data) | |
| stmt = stmt.on_conflict_do_update( | |
| index_elements=["ad_id", "date"], set_=ad_data | |
| ) | |
| try: | |
| session.execute(stmt) | |
| except Exception as e: | |
| logger.error(f"Error upserting ad data: {str(e)}") | |
| capture_error_to_sentry(e) | |
| continue | |
| campaign_records += len(ads_data) | |
| logger.info(f"Processed {len(ads_data)} records in current batch") | |
| if not next_cursor: | |
| break | |
| total_records += campaign_records | |
| logger.info(f"Completed campaign {campaign_id} with {campaign_records} records") | |
| logger.info( | |
| f"Successfully synced {total_records} total ads records across {len(campaign_ids)} campaigns" | |
| ) | |
| def main(): | |
| parser = argparse.ArgumentParser(description="Sync Meta Ads data to database") | |
| parser.add_argument( | |
| "--campaign-ids", | |
| required=True, | |
| nargs="+", | |
| help="One or more Meta Ads Campaign IDs to sync", | |
| ) | |
| parser.add_argument( | |
| "--days", | |
| required=True, | |
| type=int, | |
| help="Number of days to sync back from yesterday", | |
| ) | |
| parser.add_argument( | |
| "--log-level", | |
| default="INFO", | |
| choices=["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], | |
| help="Set the logging level", | |
| ) | |
| args = parser.parse_args() | |
| # Configure logging | |
| logging.basicConfig( | |
| level=getattr(logging, args.log_level), | |
| format="%(asctime)s - %(name)s - %(levelname)s - %(message)s", | |
| ) | |
| try: | |
| sync_meta_ads(args.campaign_ids, args.days) | |
| except Exception as e: | |
| logger.error(f"Critical error in sync_meta_ads: {str(e)}") | |
| capture_error_to_sentry(e) | |
| raise | |
| if __name__ == "__main__": | |
| main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment