Skip to content

Instantly share code, notes, and snippets.

@ifuller1
Last active November 12, 2018 16:32
Show Gist options
  • Select an option

  • Save ifuller1/3c261f43cff842f0fd20ff0926cca3f8 to your computer and use it in GitHub Desktop.

Select an option

Save ifuller1/3c261f43cff842f0fd20ff0926cca3f8 to your computer and use it in GitHub Desktop.
import logging
import datetime
import json
from airflow import DAG
from airflow import models
from airflow.hooks.http_hook import HttpHook
from airflow.operators.python_operator import PythonOperator
SYMBOLS = ['AAPL', 'GOOGL', 'SNAP']
YESTERDAY = datetime.datetime.combine(
datetime.datetime.today() - datetime.timedelta(1),
datetime.datetime.min.time())
def say_hello_method(**_context):
logging.info('Hello Cloud Composer!')
def create_pricing_file(**context):
symbol = context['params']['symbol']
iex_hook = HttpHook(
method='GET',
http_conn_id='iex_http_connection'
)
iex_response = iex_hook.run(
f'/1.0/stock/{symbol}/quote',
extra_options={"verify": True},
)
response_json = json.loads(iex_response.text)
pricing_data = {
"symbol": symbol,
"price": response_json['latestPrice']
}
pricing_file = f'/home/airflow/gcs/data/{symbol}.json'
with open(pricing_file, 'w') as text_file:
text_file.write(json.dumps(pricing_data))
DEFAULT_ARGS = {
# Setting start date as yesterday starts the DAG immediately when it is
# detected in the Cloud Storage bucket.
'start_date': YESTERDAY,
'email_on_failure': False,
'email_on_retry': False,
'retries': 2,
'retry_delay': datetime.timedelta(minutes=1),
'project_id': models.Variable.get('gcp_project')
}
with DAG('example',
default_args=DEFAULT_ARGS,
schedule_interval=datetime.timedelta(minutes=90),
catchup=False
) as dag:
say_hello_task = PythonOperator(
task_id='say_hello',
python_callable=say_hello_method,
provide_context=True
)
say_hello_task \
>> [PythonOperator(
task_id=f'create_pricing_file_{symbol}',
params={'symbol': symbol},
python_callable=create_pricing_file,
provide_context=True
) for symbol in SYMBOLS]
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment