Created
January 5, 2023 05:47
-
-
Save joshuahendinata/4093763c7a56c67b069898c39c7a2d94 to your computer and use it in GitHub Desktop.
Google Ads API call from Spark UDF
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import string | |
| from google.ads.googleads.client import GoogleAdsClient | |
| from google.protobuf.json_format import MessageToDict | |
| from pyspark.sql import functions as F | |
| from pyspark.sql.types import ArrayType, StringType, StructField, StructType | |
| segment_dates = ['2022-01-01', '2022-01-02'] # dummy values for brevity | |
| gads_account_ids = ['123456789', '987654321'] # dummy values for brevity | |
| first_letters = list(string.digits) + list(string.ascii_lowercase) | |
| value_list = [] | |
| for account in gads_account_ids: | |
| for date in segment_dates: | |
| for first_letter in first_letters: | |
| value_list.append({ | |
| "account_id": account, | |
| "segment_date": date, | |
| "product_partition": first_letter | |
| }) | |
| request_df = spark.createDataFrame(value_list) | |
| # The idea here is to let Spark assign a unique partition for each row. | |
| # This will speed up the query as it maximizes the parallelism during the UDF call | |
| request_df = ( | |
| request_df.repartition("account_id", "segment_date", "product_partition") | |
| ) | |
| # important to BLOCK Spark optimization from moving the repartition | |
| # to after the UDF call. See the Caveat & Lessons Learned #3 below | |
| request_df.cache().count() | |
| # The protobuf fields returned from google ads API will follow its | |
| # resource format which is defined in camel case format | |
| # e.g. product_item_id --> productItemId | |
| ads_metrics_schema = ArrayType( | |
| StructType([ | |
| StructField("segments", StructType([ | |
| StructField("productItemId", StringType()), | |
| ])), | |
| StructField("metrics", StructType([ | |
| # StringType() is the most flexible type. You can cast it | |
| # as necessary at the end | |
| StructField("clicks", StringType()), | |
| StructField("costMicros", StringType()), | |
| StructField("impressions", StringType()), | |
| ])), | |
| ]) | |
| ) | |
| @F.udf(returnType=ads_metrics_schema) | |
| def load_from_ads_api( | |
| # list down all the filter criteria you used to split your query | |
| segment_date: str, | |
| account_id: str, | |
| product_partition: str | |
| ): | |
| # More details on https://developers.google.com/google-ads/api/docs/client-libs/python/configuration?hl=en#configuration_using_a_dict | |
| googleads_client = GoogleAdsClient.load_from_dict(credentials) | |
| ga_service = googleads_client.get_service("GoogleAdsService") | |
| query = f""" | |
| SELECT | |
| segments.product_item_id, | |
| metrics.clicks, | |
| metrics.cost_micros, | |
| metrics.impressions | |
| FROM shopping_performance_view | |
| WHERE segments.date = "{segment_date}" | |
| AND segments.product_item_id like "{product_partition}%" | |
| """ | |
| search_request = googleads_client.get_type("SearchGoogleAdsStreamRequest") | |
| search_request.customer_id = account_id | |
| search_request.query = query | |
| # The query will be executed here | |
| stream = ga_service.search_stream(search_request) | |
| result_list = [] | |
| for batch in stream: | |
| for row in batch.results: | |
| json_row = MessageToDict(row) | |
| result_list.append(json_row) | |
| return result_list | |
| response_df = ( | |
| request_df.withColumn("response", | |
| load_from_ads_api( | |
| F.col("segment_date"), | |
| F.col("account_id"), | |
| F.col("product_partition") | |
| ) | |
| ) | |
| .withColumn("result", F.explode(F.col("response"))) | |
| .select( | |
| "account_id", | |
| "segment_date", | |
| "result.metrics.*", | |
| "result.segments.*" | |
| ) | |
| ) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment