在 Apache Airflow 中保存算子的结果

几个运算符允许提取数据,但我从未设法使用结果。


例如:https : //github.com/apache/incubator-airflow/blob/master/airflow/contrib/operators/bigquery_get_data.py


该运算符可以按如下方式调用:


get_data = BigQueryGetDataOperator(

      task_id='get_data_from_bq',

      dataset_id='test_dataset',

      table_id='Transaction_partitions',

      max_results='100',

      selected_fields='DATE',

      bigquery_conn_id='airflow-service-account'

      )

然而,get_data 是 DAG 类型,但第 116 行说“返回 table_data”。需要明确的是,操作员工作并检索数据,我只是不明白如何使用数据检索/数据所在的位置。


如何使用上面的“get_data”获取数据?


弑天下
浏览 160回答 2
2回答

当年话下

您将get_data在下一个任务中使用的方式可以是一个PythonOperator,然后您可以使用它来处理数据。get_data = BigQueryGetDataOperator(      task_id='get_data_from_bq',      dataset_id='test_dataset',      table_id='Transaction_partitions',      max_results='100',      selected_fields='DATE',      bigquery_conn_id='airflow-service-account'      )def process_data_from_bq(**kwargs):      ti = kwargs['ti']      bq_data = ti.xcom_pull(task_ids='get_data_from_bq')      # Now bq_data here would have your data in Python list      print(bq_data)process_data = PythonOperator(      task_id='process_data_from_bq',      python_callable=process_bq_data,      provide_context=True      )get_data >> process_data

Qyouu

返回值保存在Xcom 中。您可以从另一个操作员访问它,如本示例所示。data = ti.xcom_pull(task_ids='get_data_from_bq')
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python