Skip to content

Instantly share code, notes, and snippets.

@lucasc896
Last active April 28, 2016 15:50
Show Gist options
  • Select an option

  • Save lucasc896/61b7602fefd313965c0583ac24974ea8 to your computer and use it in GitHub Desktop.

Select an option

Save lucasc896/61b7602fefd313965c0583ac24974ea8 to your computer and use it in GitHub Desktop.
Airflow Example DAG
"""
Python test code for airflow investigations
"""
from airflow import DAG
from airflow.operators import PythonOperator
from datetime import datetime, timedelta
from modules import print_date, boto_workout, spark_test
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2016, 4, 11, 13, 20, 00),
'email': ['[email protected]'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
# 'queue': 'bash_queue',
# 'pool': 'backfill',
# 'priority_weight': 10,
# 'end_date': datetime(2016, 1, 1),
}
my_dag = DAG(
'python_test_dag', default_args=default_args,
schedule_interval='*/5 * * * *')
task_print_date = PythonOperator(
task_id='print_date',
python_callable=print_date.main,
op_kwargs={'string': 'hello'},
dag=my_dag
)
task_receive_xcom = PythonOperator(
task_id='receive_xcom',
python_callable=print_date.receive_xcom,
provide_context=True,
dag=my_dag
)
task_boto_workout = PythonOperator(
task_id='boto_workout',
python_callable=boto_workout.main,
dag=my_dag
)
task_spark_test = PythonOperator(
task_id='spark_test',
python_callable=spark_test.spark_word_count,
dag=my_dag
)
task_boto_workout.set_upstream(task_print_date)
task_receive_xcom.set_upstream(task_print_date)
task_spark_test.set_upstream(task_receive_xcom)
task_spark_test.set_upstream(task_boto_workout)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment