猿问

气流 - 创建 dag 和任务动态地为一个对象创建管道

在气流中,我想将一些表从 pg 导出到 BQ。


task1: get the max id from BQ

task2: export the data from PG (id>maxid)

task3: GCS to BQ stage

task4: BQ stage to BQ main

但有一个小挑战,日程间隔不同。所以我创建了一个 JSON 文件来告诉同步间隔。因此,如果是 2 分钟,那么它将使用 DAG upsert_2mins,否则将使用 10 分钟间隔 ( upsert_10mins) 。我使用这个语法来动态生成它。


JSON 配置文件:


{

    "tbl1": ["update_timestamp", "2mins", "stg"],

    "tbl2": ["update_timestamp", "2mins", "stg"]

}

它实际上创建了 dag,但问题是来自 Web UI,我能够看到最后一个表的任务。但它必须显示 2 个表的任务。

慕码人2483693
浏览 86回答 1
1回答

慕森卡

您的代码正在创建 2 个 dags,每个表一个,但用第二个覆盖第一个。我的建议是将 JSON 文件的格式更改为:{    "2mins": [                "tbl1": ["update_timestamp", "stg"],                "tbl2": ["update_timestamp", "stg"]             ],    "10mins": [                "tbl3": ["update_timestamp", "stg"],                "tbl4": ["update_timestamp", "stg"]             ]}让您的代码迭代计划并为每个表创建所需的任务(您将需要两个循环):# looping on the schedules to create two dagsfor schedule, tables in config.items():cron_time = '*/10 * * * *'if schedule== '2mins':    cron_time = '*/20 * * * *'dag_id = 'upsert_every_{}'.format(schedule)dag = DAG(    dag_id ,    default_args=default_args,    description='Incremental load - Every 10mins',    schedule_interval=cron_time,    catchup=False,    max_active_runs=1,    doc_md = docs)# Looping over the tables to create the tasks for # each table in the current schedulefor table_name, table_config in tables.items():    max_ts = PythonOperator(        task_id="get_maxts_{}".format(table_name),        python_callable=get_max_ts,        op_kwargs={'tablename':table_name, 'dag': dag},        provide_context=True,        dag=dag    )    export_gcs = PythonOperator(        task_id='export_gcs_{}'.format(table_name),        python_callable=pgexport,        op_kwargs={'tablename':table_name, 'dag': dag},        provide_context=True,        dag=dag    )    stg_load = PythonOperator(        task_id='stg_load_{}'.format(table_name),        python_callable=stg_bqimport,        op_kwargs={'tablename':table_name, 'dag': dag},        provide_context=True,        dag=dag    )        merge = PythonOperator(        task_id='merge_{}'.format(table_name),        python_callable=prd_merge,        op_kwargs={'tablename':table_name, 'dag': dag},        provide_context=True,        dag=dag    )        # Tasks for the same table will be chained    max_ts >> export_gcs >> stg_load >> merge# DAG is created among the global objectsglobals()[dag_id] = dag
随时随地看视频慕课网APP

相关分类

Python
我要回答