Last active
November 12, 2018 16:32
-
-
Save ifuller1/3c261f43cff842f0fd20ff0926cca3f8 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| import logging | |
| import datetime | |
| import json | |
| from airflow import DAG | |
| from airflow import models | |
| from airflow.hooks.http_hook import HttpHook | |
| from airflow.operators.python_operator import PythonOperator | |
| SYMBOLS = ['AAPL', 'GOOGL', 'SNAP'] | |
| YESTERDAY = datetime.datetime.combine( | |
| datetime.datetime.today() - datetime.timedelta(1), | |
| datetime.datetime.min.time()) | |
| def say_hello_method(**_context): | |
| logging.info('Hello Cloud Composer!') | |
| def create_pricing_file(**context): | |
| symbol = context['params']['symbol'] | |
| iex_hook = HttpHook( | |
| method='GET', | |
| http_conn_id='iex_http_connection' | |
| ) | |
| iex_response = iex_hook.run( | |
| f'/1.0/stock/{symbol}/quote', | |
| extra_options={"verify": True}, | |
| ) | |
| response_json = json.loads(iex_response.text) | |
| pricing_data = { | |
| "symbol": symbol, | |
| "price": response_json['latestPrice'] | |
| } | |
| pricing_file = f'/home/airflow/gcs/data/{symbol}.json' | |
| with open(pricing_file, 'w') as text_file: | |
| text_file.write(json.dumps(pricing_data)) | |
| DEFAULT_ARGS = { | |
| # Setting start date as yesterday starts the DAG immediately when it is | |
| # detected in the Cloud Storage bucket. | |
| 'start_date': YESTERDAY, | |
| 'email_on_failure': False, | |
| 'email_on_retry': False, | |
| 'retries': 2, | |
| 'retry_delay': datetime.timedelta(minutes=1), | |
| 'project_id': models.Variable.get('gcp_project') | |
| } | |
| with DAG('example', | |
| default_args=DEFAULT_ARGS, | |
| schedule_interval=datetime.timedelta(minutes=90), | |
| catchup=False | |
| ) as dag: | |
| say_hello_task = PythonOperator( | |
| task_id='say_hello', | |
| python_callable=say_hello_method, | |
| provide_context=True | |
| ) | |
| say_hello_task \ | |
| >> [PythonOperator( | |
| task_id=f'create_pricing_file_{symbol}', | |
| params={'symbol': symbol}, | |
| python_callable=create_pricing_file, | |
| provide_context=True | |
| ) for symbol in SYMBOLS] |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment