在气流中,可以print()按照此处的建议使用简单或记录器写入日志
但是,当尝试在操作员内部打印时,这将不起作用。
我有以下代码:
for i in range(5, 0, -1):
gcs_export_uri_template = ["adstest/{{ macros.ds_format(macros.ds_add(ds, -params.i), '%Y-%m-%d', '%Y/%m/%d') }}/*"]
update_bigquery = GoogleCloudStorageToBigQueryOperator(
dag=dag,
task_id='load_ads_to_BigQuery-{}'.format(i),
bucket=GCS_BUCKET_ID,
destination_project_dataset_table=table_name_template,
source_format='CSV',
source_objects=gcs_export_uri_template,
schema_fields=dc(),
params={'i': i},
create_disposition='CREATE_IF_NEEDED',
write_disposition='WRITE_APPEND',
skip_leading_rows=1,
google_cloud_storage_conn_id=CONNECTION_ID,
bigquery_conn_id=CONNECTION_ID
)
现在说我想打印,"My name is load_ads_to_BigQuery-{}".format{i) 因为你可以看到这个打印是每个操作员的 uniuqe。
如果我这样做:
for i in range(5, 0, -1):
print("My name is load_ads_to_BigQuery-{}".format{i))
gcs_export_uri_template = ...
update_bigquery = GoogleCloudStorageToBigQueryOperator(...)
所有 5 个操作员将打印所有 5 个打印件。在我的情况下这是不正确的。打印件必须在GoogleCloudStorageToBigQueryOperator.
我怎样才能做到这一点?
相关分类