要求:使用 while 循环为每个日期运行 SQL 查询。例如:开始日期选择为 8 月 25 日,结束日期选择为 8 月 28 日。然后 BigQueryOperator 首先运行 8 月 25 日,然后是 8 月 26 日,依此类推,直到我们到达 28 日。
问题:在下面的 DAG 中,它只执行开始日期的查询,然后完成作业。它甚至不会执行/迭代 BigQueryOperator 到下一个日期等等。
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.contrib.operators.bigquery_operator import BigQueryOperator
from datetime import date, datetime, timedelta
import datetime
default_args = {
'owner': 'airflow',
'start_date': datetime.datetime(2018, 8, 31),
'email': ['xyz@xyz.com'],
'email_on_failure': True,
'retries': 1,
'retry_delay': timedelta(minutes=10),
'depends_on_past': False
}
dag = DAG('his_temp',default_args=default_args,schedule_interval=None)
date1 = datetime.date(2018, 8, 25)
date2 = datetime.date(2018, 8, 28)
day = datetime.timedelta(days=1)
while date1 <= date2:
parameter = {
'dataset': "projectname.finance",
'historical_date': date1.strftime('%Y%m%d')
}
sqlpartition = BigQueryOperator(
task_id='execute_sqlpartition',
use_legacy_sql=False,
write_disposition='WRITE_TRUNCATE',
allow_large_results=True,
bql="sqlqueries/sqlpartition.sql",
destination_dataset_table=parameter.get('dataset') + "." + "date_partition_" + parameter.get('historical_date'),
params=parameter,
bigquery_conn_id='bigquery',
dag=dag)
print "data loaded for "+ parameter.get('historical_date')
date1 = date1 + day
慕姐4208626
互换的青春
相关分类