Skip to content

Instantly share code, notes, and snippets.

@joshuahendinata
Created January 5, 2023 05:47
Show Gist options
  • Select an option

  • Save joshuahendinata/4093763c7a56c67b069898c39c7a2d94 to your computer and use it in GitHub Desktop.

Select an option

Save joshuahendinata/4093763c7a56c67b069898c39c7a2d94 to your computer and use it in GitHub Desktop.
Google Ads API call from Spark UDF
import string
from google.ads.googleads.client import GoogleAdsClient
from google.protobuf.json_format import MessageToDict
from pyspark.sql import functions as F
from pyspark.sql.types import ArrayType, StringType, StructField, StructType
segment_dates = ['2022-01-01', '2022-01-02'] # dummy values for brevity
gads_account_ids = ['123456789', '987654321'] # dummy values for brevity
first_letters = list(string.digits) + list(string.ascii_lowercase)
value_list = []
for account in gads_account_ids:
for date in segment_dates:
for first_letter in first_letters:
value_list.append({
"account_id": account,
"segment_date": date,
"product_partition": first_letter
})
request_df = spark.createDataFrame(value_list)
# The idea here is to let Spark assign a unique partition for each row.
# This will speed up the query as it maximizes the parallelism during the UDF call
request_df = (
request_df.repartition("account_id", "segment_date", "product_partition")
)
# important to BLOCK Spark optimization from moving the repartition
# to after the UDF call. See the Caveat & Lessons Learned #3 below
request_df.cache().count()
# The protobuf fields returned from google ads API will follow its
# resource format which is defined in camel case format
# e.g. product_item_id --> productItemId
ads_metrics_schema = ArrayType(
StructType([
StructField("segments", StructType([
StructField("productItemId", StringType()),
])),
StructField("metrics", StructType([
# StringType() is the most flexible type. You can cast it
# as necessary at the end
StructField("clicks", StringType()),
StructField("costMicros", StringType()),
StructField("impressions", StringType()),
])),
])
)
@F.udf(returnType=ads_metrics_schema)
def load_from_ads_api(
# list down all the filter criteria you used to split your query
segment_date: str,
account_id: str,
product_partition: str
):
# More details on https://developers.google.com/google-ads/api/docs/client-libs/python/configuration?hl=en#configuration_using_a_dict
googleads_client = GoogleAdsClient.load_from_dict(credentials)
ga_service = googleads_client.get_service("GoogleAdsService")
query = f"""
SELECT
segments.product_item_id,
metrics.clicks,
metrics.cost_micros,
metrics.impressions
FROM shopping_performance_view
WHERE segments.date = "{segment_date}"
AND segments.product_item_id like "{product_partition}%"
"""
search_request = googleads_client.get_type("SearchGoogleAdsStreamRequest")
search_request.customer_id = account_id
search_request.query = query
# The query will be executed here
stream = ga_service.search_stream(search_request)
result_list = []
for batch in stream:
for row in batch.results:
json_row = MessageToDict(row)
result_list.append(json_row)
return result_list
response_df = (
request_df.withColumn("response",
load_from_ads_api(
F.col("segment_date"),
F.col("account_id"),
F.col("product_partition")
)
)
.withColumn("result", F.explode(F.col("response")))
.select(
"account_id",
"segment_date",
"result.metrics.*",
"result.segments.*"
)
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment