猿问

使用 Python 的 BigQuery 动态 SQL

最近 GCP BQ 支持动态 SQL。我想用 Cloud Functions 试试这个。


我的 BQ 动态 SQL(在 UI 上运行)


declare cols string;

set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='tbla');

EXECUTE IMMEDIATE format("""select %s from `my_db.tbla`""",cols);

我想table_name从我的 python 代码传递值,但问题是,它会被 Python BQ lib 支持吗?


任何示例python代码?


我试过这些代码,但没有运气

代码 1:

def hello_gcs(event, context):

    table_name='tbla'

    client = bigquery.Client()

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

    sql=( '''

declare cols string;

set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name=?);

EXECUTE IMMEDIATE format("""select @ col from `my_db.tbla`""") using cols

''',(table_name))

    query_job = client.query(sql, job_config=job_config)

    results = query_job.result()  

    for row in results:

       print("{} : {} views".format(row.url, row.view_count))


错误:


, line 130, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Query error: Positional parameters are not supported at [3:104]

from google.cloud import bigquery


def hello_gcs(event, context):

    table_name='tbla'

    client = bigquery.Client()

    job_config = bigquery.QueryJobConfig(use_legacy_sql=False)

    sql=( '''

declare cols string;

set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name=%s);

EXECUTE IMMEDIATE format("""select @ col from `my_db.tbla`""") using cols

''',(table_name))

    query_job = client.query(sql, job_config=job_config)

    results = query_job.result()  

    for row in results:

       print("{} : {} views".format(row.url, row.view_count))


错误:


line 130, in result raise self._exception google.api_core.exceptions.BadRequest: 400 Syntax error: Illegal input character "%" at [3:104]


汪汪一只猫
浏览 173回答 2
2回答

哔哔one

最后,我找到了正确的语法。还注意到我的代码中存在一些错误。错误:sql=('''.......''')SQL 变量 ( )中的 SQL 查询语法错误最后一行的打印语句是错误的,我的选择查询没有url and view_count列。在动态 SQL 中,我们必须在 where 条件中传递单引号(如果它是一个字符串)示例工作代码:代码 1:from google.cloud import bigquerytable_name='tbla'client = bigquery.Client()job_config = bigquery.QueryJobConfig(use_legacy_sql=False)sql="declare cols string;set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');EXECUTE IMMEDIATE format(\"\"\"select %s from `manan.tbla` \"\"\",cols)".format(table_name)print(sql)query_job = client.query(sql, job_config=job_config)results = query_job.result()for row in results:        print(row)代码 2:from google.cloud import bigquerytable_name='tbla'client = bigquery.Client()job_config = bigquery.QueryJobConfig(use_legacy_sql=False)sql="declare cols string;set cols=(select STRING_AGG (column_name,',') from `my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');EXECUTE IMMEDIATE format(\"\"\"select ? from `my_db.tbla` \"\"\") using cols".format(table_name)print(sql)query_job = client.query(sql, job_config=job_config)results = query_job.result()for row in results:        print(row)代码 3:from google.cloud import bigquerytable_name='tbla'client = bigquery.Client()job_config = bigquery.QueryJobConfig(use_legacy_sql=False)sql="declare cols string;set cols=(select STRING_AGG (column_name,',') from`my_db.INFORMATION_SCHEMA.COLUMNS` where table_name='{}');EXECUTE IMMEDIATEformat(\"\"\"select @ col from `my_db.tbla` \"\"\") using cols as col".format(table_name)print(sql)query_job = client.query(sql, job_config=job_config)results = query_job.result()for row in results:        print(row)

慕姐4208626

Jinja2 SQL 模板是构建动态 SQL 的更好选择。例子:create or replace table {{ params.targetTable }}asselect    {{ params.targetColumnList|join(',') }},    cast(null as timestamp) as begin_timestamp,    cast(null as timestamp) as end_timestampfrom    {{ params.sourceTable }};
随时随地看视频慕课网APP

相关分类

Python
我要回答