如何使用 While 循环执行 Airflow 运算符

要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 28 日。


问题:在下面的 DAG 中,它只执行开始日期的查询,然后完成作业。它甚至不会执行/迭代 BigQueryOperator 到下一个日期等等。


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.contrib.operators.bigquery_operator import BigQueryOperator

from datetime import date, datetime, timedelta

import datetime


default_args = {

    'owner': 'airflow',

    'start_date': datetime.datetime(2018, 8, 31),

    'email': ['xyz@xyz.com'],

    'email_on_failure': True,

    'retries': 1,

    'retry_delay': timedelta(minutes=10),

    'depends_on_past': False

}


dag = DAG('his_temp',default_args=default_args,schedule_interval=None)


date1 = datetime.date(2018, 8, 25)

date2 = datetime.date(2018, 8, 28)

day = datetime.timedelta(days=1)


while date1 <= date2:

    parameter = {

        'dataset': "projectname.finance",

        'historical_date': date1.strftime('%Y%m%d')

    }



    sqlpartition = BigQueryOperator(

    task_id='execute_sqlpartition',

    use_legacy_sql=False,

    write_disposition='WRITE_TRUNCATE',

    allow_large_results=True,

    bql="sqlqueries/sqlpartition.sql",

    destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),

    params=parameter,

    bigquery_conn_id='bigquery',

    dag=dag)


    print "data loaded for "+ parameter.get('historical_date')


    date1 = date1 + day   



呼啦一阵风
浏览 265回答 2
2回答

慕姐4208626

Airflow scheduler 的整个概念是它会调度任务,你只需要正确配置它。难怪它会在提到的开始日期运行一次,因为将选择 dag 开始日期,并且由于没有安排每日任务,它将运行一次并停止。您必须在 dag 级别而不是操作员级别进行配置。

互换的青春

您可以在依赖项的末尾添加自触发运算符。类似于以下内容:def trigger_check(context, dag_run_obj):&nbsp; &nbsp; if date1 <= date2:&nbsp; &nbsp; &nbsp; &nbsp; return dag_run_objtrigger = TriggerDagRunOperator(&nbsp; &nbsp; task_id="test_trigger_dagrun",&nbsp; &nbsp; trigger_dag_id="his_temp",&nbsp; &nbsp; python_callable=trigger_check,&nbsp; &nbsp; ... more arguments)op1 >> op2 >> ... >> trigger第一次触发它后,它会循环遍历日期,直到达到 date2 阈值。您必须更加小心地通过将其设为有序的 PythonOperator 或类似的东西来更新日期
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python