猿问

如何在另一个达格气流中创建达格斯

我正在尝试拥有一个主匕首,它将根据我的需要创建更多的匕首。我在气流.cfg dags_folder中有以下python文件。此代码在数据库中创建主 dag。此主 dag 应读取文本文件,并应为文本文件中的每一行创建 dag。但是,在主 dag 中创建的 dag 不会添加到数据库中。创建它的正确方法是什么?


版本详细信息:


蟒蛇版本:3.7


阿帕奇气流版本:1.10.8


import datetime as dt


from airflow import DAG

from airflow.operators.bash_operator import BashOperator

from airflow.operators.python_operator import PythonOperator


root_dir = "/home/user/TestSpace/airflow_check/res"


print("\n\n ===> \n Dag generator")


default_args = {

    'owner': 'airflow',

    'start_date': dt.datetime(2020, 3, 22, 00, 00, 00),

    'concurrency': 1,

    'retries': 0

}



def greet(_name):

    message = "Greetings {} at UTC: {} Local: {}\n".format(_name, dt.datetime.utcnow(), dt.datetime.now())

    f = open("{}/greetings.txt".format(root_dir), "a+")

    print("\n\n =====> {}\n\n".format(message))

    f.write(message)

    f.close()



def create_dag(dag_name):

    with DAG(dag_name, default_args=default_args,

             schedule_interval='*/2 * * * *',

             catchup=False

             ) as i_dag:

        i_opr_greet = PythonOperator(task_id='greet', python_callable=greet,

                                     op_args=["{}_{}".format("greet", dag_name)])

        i_echo_op = BashOperator(task_id='echo', bash_command='echo `date`')


        i_opr_greet >> i_echo_op

    return i_dag



def create_all_dags():

    all_lines = []

    f = open("{}/../dag_names.txt".format(root_dir), "r")

    for x in f:

        all_lines.append(str(x))

    f.close()


    for line in all_lines:

        print("Dag creation for {}".format(line))

        globals()[line] = create_dag(line)



with DAG('master_dag', default_args=default_args,

         schedule_interval='*/1 * * * *',

         catchup=False

         ) as dag:

    echo_op = BashOperator(task_id='echo', bash_command='echo `date`')

    create_op = PythonOperator(task_id='create_dag', python_callable=create_all_dags)

    echo_op >> create_op


牧羊人nacy
浏览 116回答 2
2回答

慕少森

您有 2 个选项:使用子操作程序:示例 DAG。如果您的计划间隔可以相同,请使用它。编写蟒蛇 DAG 文件:从主 DAG 开始,在包含 DAG 的AIRFLOW_HOME中创建 Python 文件。为此,您可以使用 Jinja2 模板引擎。

明月笑刀无情

看看触发器运行器:https://airflow.apache.org/docs/stable/_api/airflow/operators/dagrun_operator/index.html用法示例:https://github.com/apache/airflow/blob/master/airflow/example_dags/example_trigger_controller_dag.py
随时随地看视频慕课网APP

相关分类

Python
我要回答