从气流中的 BigQueryOperator 获取结果

我试图从BigQueryOperator使用气流中获取结果,但我找不到办法做到这一点。我尝试调用成员中的next()方法bq_cursor(在 1.10 中可用),但它返回None. 这就是我尝试这样做的方式


import datetime

import logging


from airflow import models

from airflow.contrib.operators import bigquery_operator

from airflow.operators import python_operator



yesterday = datetime.datetime.combine(

    datetime.datetime.today() - datetime.timedelta(1),

    datetime.datetime.min.time()

)


def MyChequer(**kwargs):

    big_query_count = bigquery_operator.BigQueryOperator(

        task_id='my_bq_query',

        sql='select count(*) from mydataset.mytable'

    )


    big_query_count.execute(context=kwargs)


    logging.info(big_query_count)

    logging.info(big_query_count.__dict__)

    logging.info(big_query_count.bq_cursor.next())


default_dag_args = {

    'start_date': yesterday,

    'email_on_failure': False,

    'email_on_retry': False,

    'project_id': 'myproject'

}


with models.DAG(

        'bigquery_results_execution',

        # Continue to run DAG once per day

        schedule_interval=datetime.timedelta(days=1),

        default_args=default_dag_args) as dag:


    myoperator = python_operator.PythonOperator(

        task_id='threshold_operator',

        provide_context=True,

        python_callable=MyChequer

    )


    # Define DAG

    myoperator

查看bigquery_hook.py和bigquery_operator.py似乎是获取结果的唯一可用方法。


德玛西亚99
浏览 147回答 3
3回答

梦里花落0921

每当我需要从 BigQuery 查询中获取数据并将其用于某事时,我都会使用 BigQuery 钩子创建自己的运算符。我通常将其称为 BigQueryToXOperator,我们有很多用于将 BigQuery 数据发送到其他内部系统的运算符。例如,我有一个 BigQueryToPubSub 运算符,您可能会发现它可以用作示例,说明如何查询 BigQuery,然后逐行处理结果,并将它们发送到 Google PubSub。考虑以下通用示例代码,了解如何自行执行此操作:class BigQueryToXOperator(BaseOperator):    template_fields = ['sql']    ui_color = '#000000'    @apply_defaults    def __init__(            self,            sql,            keys,            bigquery_conn_id='bigquery_default',            delegate_to=None,            *args,            **kwargs):        super(BigQueryToXOperator, self).__init__(*args, **kwargs)        self.sql = sql        self.keys = keys # A list of keys for the columns in the result set of sql        self.bigquery_conn_id = bigquery_conn_id        self.delegate_to = delegate_to    def execute(self, context):        """        Run query and handle results row by row.        """        cursor = self._query_bigquery()        for row in cursor.fetchall():            # Zip keys and row together because the cursor returns a list of list (not list of dicts)            row_dict = dumps(dict(zip(self.keys,row))).encode('utf-8')            # Do what you want with the row...            handle_row(row_dict)    def _query_bigquery(self):        """        Queries BigQuery and returns a cursor to the results.        """        bq = BigQueryHook(bigquery_conn_id=self.bigquery_conn_id,                          use_legacy_sql=False)        conn = bq.get_conn()        cursor = conn.cursor()        cursor.execute(self.sql)        return cursor
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python