在 Airflow 中的组件之间传输数据

我对 Airflow 很陌生,并且已经阅读了大部分文档。从文档中,我了解到 DAG 中组件之间的小数据可以使用 XCom 类共享。DAG 中发布数据的组件必须推送,订阅数据的组件必须拉取。


但是,我对推和拉的语法部分不是很清楚。我指的是关于文档的XCom 部分并开发了一个代码模板。假设我有以下代码,它只有两个组件,一个 pusher 和一个 puller。pusher 发布 puller 必须消耗的当前时间并写入日志文件。


from datetime import datetime

from airflow import DAG

from airflow.operators.python_operator import PythonOperator


log_file_location = '/usr/local/airflow/logs/time_log.log'


default_args = {'owner':'apache'}

dag = DAG('pushpull', default_args = default_args)


def push_function():

    #push this data on the DAG as key-value pair

    return(datetime.now()) #current time


def pull_function():

    with open(log_file_location, 'a') as logfile:

        current_time = '' #pull data from the pusher as key - value pair

        logfile.writelines('current time = '+current_time)

    logfile.close()


with dag:

    t1 = PythonOperator(

        task_id = 'pusher', 

        python_callable = push_function)


    t2 = PythonOperator(

        task_id = 'puller', 

        python_callable = pull_function)


    t2.set_upstream(t1)

我需要 Airflow 大师在两种语法上的帮助:

  1. 如何从推送功能连同键推送数据

  2. 如何获得 pull 函数使用 key 拉取数据。


潇湘沐
浏览 412回答 1
1回答

天涯尽头无女友

使用密钥推送到 Xcom 的示例:def push_function(**context):    msg='the_message'    print("message to push: '%s'" % msg)    task_instance = context['task_instance']    task_instance.xcom_push(key="the_message", value=msg)使用密钥拉到 Xcom 的示例:def pull_function(**kwargs):    ti = kwargs['ti']    msg = ti.xcom_pull(task_ids='push_task',key='the_message')    print("received message: '%s'" % msg)示例 DAY:from datetime import datetime, timedeltafrom airflow.models import DAGfrom airflow.operators.python_operator import PythonOperatorDAG = DAG(  dag_id='simple_xcom',  start_date=datetime(2017, 10, 26),  schedule_interval=timedelta(1))def push_function(**context):    msg='the_message'    print("message to push: '%s'" % msg)    task_instance = context['task_instance']    task_instance.xcom_push(key="the_message", value=msg)push_task = PythonOperator(    task_id='push_task',     python_callable=push_function,    provide_context=True,    dag=DAG)def pull_function(**kwargs):    ti = kwargs['ti']    msg = ti.xcom_pull(task_ids='push_task',key='the_message')    print("received message: '%s'" % msg)pull_task = PythonOperator(    task_id='pull_task',     python_callable=pull_function,    provide_context=True,    dag=DAG)push_task >> pull_task
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python