慕森卡
您的代码正在创建 2 个 dags,每个表一个,但用第二个覆盖第一个。我的建议是将 JSON 文件的格式更改为:{ "2mins": [ "tbl1": ["update_timestamp", "stg"], "tbl2": ["update_timestamp", "stg"] ], "10mins": [ "tbl3": ["update_timestamp", "stg"], "tbl4": ["update_timestamp", "stg"] ]}让您的代码迭代计划并为每个表创建所需的任务(您将需要两个循环):# looping on the schedules to create two dagsfor schedule, tables in config.items():cron_time = '*/10 * * * *'if schedule== '2mins': cron_time = '*/20 * * * *'dag_id = 'upsert_every_{}'.format(schedule)dag = DAG( dag_id , default_args=default_args, description='Incremental load - Every 10mins', schedule_interval=cron_time, catchup=False, max_active_runs=1, doc_md = docs)# Looping over the tables to create the tasks for # each table in the current schedulefor table_name, table_config in tables.items(): max_ts = PythonOperator( task_id="get_maxts_{}".format(table_name), python_callable=get_max_ts, op_kwargs={'tablename':table_name, 'dag': dag}, provide_context=True, dag=dag ) export_gcs = PythonOperator( task_id='export_gcs_{}'.format(table_name), python_callable=pgexport, op_kwargs={'tablename':table_name, 'dag': dag}, provide_context=True, dag=dag ) stg_load = PythonOperator( task_id='stg_load_{}'.format(table_name), python_callable=stg_bqimport, op_kwargs={'tablename':table_name, 'dag': dag}, provide_context=True, dag=dag ) merge = PythonOperator( task_id='merge_{}'.format(table_name), python_callable=prd_merge, op_kwargs={'tablename':table_name, 'dag': dag}, provide_context=True, dag=dag ) # Tasks for the same table will be chained max_ts >> export_gcs >> stg_load >> merge# DAG is created among the global objectsglobals()[dag_id] = dag