如何运行 BigQuery 查询,然后将输出 CSV 发送到 Apache Airflow 中的

我需要在 python 中运行一个 bigquery 脚本,它需要在谷歌云存储中输出为 CSV。目前,我的脚本触发大查询代码并直接保存到我的电脑。


但是,我需要让它在 Airflow 中运行,所以我不能有任何本地依赖项。


我当前的脚本将输出保存到我的本地机器,然后我必须将它移动到 GCS。上网查了一下,搞不明白。(ps我对python很陌生,所以如果之前有人问过这个问题,我提前道歉!)


import pandas as pd

from googleapiclient import discovery

from oauth2client.client import GoogleCredentials


def run_script():


    df = pd.read_gbq('SELECT * FROM `table/veiw` LIMIT 15000',

                 project_id='PROJECT',

                 dialect='standard'

                 )


    df.to_csv('XXX.csv', index=False)


def copy_to_gcs(filename, bucket, destination_filename):


    credentials = GoogleCredentials.get_application_default()

    service = discovery.build('storage', 'v1', credentials=credentials)


    body = {'name': destination_filename}

    req = service.objects().insert(bucket=bucket,body=body, media_body=filename)

    resp = req.execute()


current_date = datetime.date.today()

filename = (r"C:\Users\LOCALDRIVE\ETC\ETC\ETC.csv")

bucket = 'My GCS BUCKET'


str_prefix_datetime = datetime.datetime.now().strftime('%Y%m%d_%H%M%S')

destfile = 'XXX' + str_prefix_datetime + '.csv'

print('')


    ```


胡说叔叔
浏览 149回答 1
1回答

Smart猫小萌

Airflow 提供了多个运算符来使用 BigQuery。BigQueryOperator在 BigQuery 上执行查询。BigQueryToCloudStorageOperator将 BigQuery 表(例如查询的目标表)导出到 GCS。您可以在 Cloud Composer 代码示例 中查看运行查询的示例,然后将结果导出为 CSV。# Copyright 2018 Google LLC## Licensed under the Apache License, Version 2.0 (the "License");# you may not use this file except in compliance with the License.# You may obtain a copy of the License at##&nbsp; &nbsp; &nbsp;https://www.apache.org/licenses/LICENSE-2.0## Unless required by applicable law or agreed to in writing, software# distributed under the License is distributed on an "AS IS" BASIS,# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.# See the License for the specific language governing permissions and# limitations under the License.# Query recent StackOverflow questions.bq_recent_questions_query = bigquery_operator.BigQueryOperator(&nbsp; &nbsp; task_id='bq_recent_questions_query',&nbsp; &nbsp; sql="""&nbsp; &nbsp; SELECT owner_display_name, title, view_count&nbsp; &nbsp; FROM `bigquery-public-data.stackoverflow.posts_questions`&nbsp; &nbsp; WHERE creation_date < CAST('{max_date}' AS TIMESTAMP)&nbsp; &nbsp; &nbsp; &nbsp; AND creation_date >= CAST('{min_date}' AS TIMESTAMP)&nbsp; &nbsp; ORDER BY view_count DESC&nbsp; &nbsp; LIMIT 100&nbsp; &nbsp; """.format(max_date=max_query_date, min_date=min_query_date),&nbsp; &nbsp; use_legacy_sql=False,&nbsp; &nbsp; destination_dataset_table=bq_recent_questions_table_id)# Export query result to Cloud Storage.export_questions_to_gcs = bigquery_to_gcs.BigQueryToCloudStorageOperator(&nbsp; &nbsp; task_id='export_recent_questions_to_gcs',&nbsp; &nbsp; source_project_dataset_table=bq_recent_questions_table_id,&nbsp; &nbsp; destination_cloud_storage_uris=[output_file],&nbsp; &nbsp; export_format='CSV')
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python