在上一篇文章中,“如何从云函数触发Dataform工作流执行,[1]”中,我们讨论了如何从云函数(当前是Cloud Run函数)触发先前配置的Dataform工作流。本文将提供更多使用Python代码来执行Dataform工作流的方法[1]。
照片由 Chris Barbalis 提供,来自 Unsplash
本文假定读者已经熟悉Cloud Run Functions和Dataform的基本概念。
我们来搭建一个数据管道,生成一个 JSON 文件并上传到 GCS 上。接下来,我们可以在 Dataform 中进一步处理文件。我们应该从 Cloud Run 函数触发 Dataform 的工作流执行过程,并将文件路径作为参数传递。
在上一篇文章中,我们讨论了 Dataform 代码周期[2],它包括在创建并保存操作代码到 Git 之后,执行工作流的多个步骤。我们讨论了在创建并保存操作代码到 Git 后,执行 Dataform 代码周期中多个步骤的过程。
- 根据 Git 分支创建发布设置。
- 根据发布设置创建发布构建。
- 根据发布构建创建工作流设置。
- 通过调用工作流来运行其执行。
之前的'article'重点介绍了步骤4,通过执行工作流配置名称来运行工作流,其中工作流配置通过调用工作流实例,运行之前编译的动作代码。
现在,为了在执行前更新动作代码,我们需要在动作查询中添加一个新的变量值。这可以通过在发布配置中使用编译变量来实现[3]。Dataform API 允许我们将四个步骤分为两步,即将步骤1和2合并,步骤3和4合并。
定义参数首先,在开发工作区中打开dataform.json
文件,并添加一个新的变量作为参数。
{
"defaultSchema": "dataform",
"assertionSchema": "df_assertions",
"warehouse": "bigquery",
"defaultDatabase": "my_project",
"defaultLocation": "us-east1",
"vars": {
"变量": "gs://my_bucket/test_file_for_dev_env.json"
}
}
我们可以在开发工作区中添加任何变量的值。例如,我们可以指定一个仅在执行开发工作区中的某个动作时使用的测试文件的路径。由于我们在主配置文件中声明变量时,使用清晰的变量名很重要。(唉,真遗憾,JSON格式不支持注释)。
数据表操作的代码然后,我们来写相关的代码。
配置结构 {
类型: "增量",
模式: "my_dataset",
标签: ["从GCS加载文件"],
安全性: true
}
前置操作 {
IF LENGTH('${dataform.projectConfig.vars.inbound_json_file_path}') > 1 THEN
将数据加载到临时表 loaded_data_table
从文件 (
格式 = 'JSON',
uris = ["${dataform.projectConfig.vars.inbound_json_file_path}"]);
ELSE 否则;
END IF;
}
选择 CURRENT_TIMESTAMP AS created_at,
"${dataform.projectConfig.vars.inbound_json_file_path}" AS file_path,
TO_JSON_STRING(a, true) AS json_data
FROM loaded_data_table a
在config
块中,我们有一个增量表用于添加每个新文件的数据。动作标记为load_file_from_gcs
。我们在调用代码中会用到它。protected
选项用于防止数据丢失。
在 pre-operations
部分中,我们使用 IF-ELSE
结构来检查 inbound_json_file_path
值是否有效。如果有效,将执行 LOAD
语句。否则,使用 RETURN
操作符停止执行操作。LOAD
语句将路径定义为变量的 JSON 文件中的数据加载到临时表中。
在执行 SELECT
语句时,从临时表加载的数据被插入到 json_data
字段中,并且添加了创建时间戳以及文件路径这两列作为额外列。
让我们来看一下 Cloud Run 函数的代码。我们打算调用 Dataform 工作流,并将 JSON 文件路径作为参数传递。
from google.cloud import dataform_v1beta1
df_client = dataform_v1beta1.DataformClient()
def create_compilation_result(repo_uri, git_branch, params):
# 初始化请求参数
compilation_result = dataform_v1beta1.CompilationResult()
compilation_result.git_commitish = git_branch
compilation_result.code_compilation_config.vars = params
# 如果需要添加编译覆盖项
# compilation_result.code_compilation_config.schema_suffix = "prod"
request = dataform_v1beta1.CreateCompilationResultRequest(
parent=repo_uri,
compilation_result=compilation_result,
)
response = df_client.create_compilation_result(request)
return response.name
def create_workflow_invocation(repo_uri, compilation_name, tag_list):
workflow_invocation = dataform_v1beta1.types.WorkflowInvocation(
compilation_result=compilation_name
)
workflow_invocation.invocation_config.included_tags = tag_list
request = dataform_v1beta1.CreateWorkflowInvocationRequest(
parent=repo_uri,
workflow_invocation=workflow_invocation
)
response = df_client.create_workflow_invocation(request)
# 返回0
return 0
def main():
gcp_project = 'my_project'
location = 'us-east1'
repo_name = 'my_repo_name'
repo_uri = f'项目/{gcp_project}/位置/{location}/仓库/{repo_name}'
git_branch = 'my_branch'
params = {
"JSON文件路径": "gs://my_bucket/my_file.json"
}
tag_list = ['load_file_from_gcs']
compilation_name = create_compilation_result(repo_uri, git_branch, params)
create_workflow_invocation(repo_uri, compilation_name, tag_list)
return 0
在 main
函数里,我们首先设置 gcp_project
、location
和 repo_name
变量的值,接着使用这些值组成 repo_uri
。之后,我们设置 git_branch
变量的值并创建包含 json_file_path
值的 params
对象。需要注意的是,如果您使用任何其他编译变量(如环境变量),请确保将这些变量也添加到 params
对象中。
接下来,我们使用repo_uri
、git_branch
和params
作为函数参数调用create_compilation_result
函数。让我们仔细看看create_compilation_result
函数的代码。首先,使用[CompilationResult](https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.CodeCompilationConfig?hl=en)
类初始化一个compilation_result
对象。通过提供的链接查看类描述,我们可以观察到除了根据发布配置创建编译的常规方法外,它还允许我们通过git_commitish
和code_compilation_config
类属性直接使用编译参数。将git_commitish
设置为git_branch
的值。然后将code_compilation_config.vars
填充为params
。如果您使用编译覆盖(例如schema_suffix
),也可以根据[CodeCompilationConfig](http://CodeCompilationConfig)
规范包含这些覆盖项。生成一个请求,将repo_uri
与初始化的compilation_result
关联起来。请求发送后,它将返回创建的编译结果的名称(response.name
)。
接下来,我们用 repo_uri
,compilation_name
和 tag_list
参数调用 create_workflow_invocation
函数。此函数会初始化一个来自 [WorkflowInvocation](https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.WorkflowInvocation?hl=en)
类的 workflow_invocation
对象。将 compilation_result
属性设置为 compilation_name
。接下来,用 tag_list
填充 [invocation_config](https://cloud.google.com/python/docs/reference/dataform/latest/google.cloud.dataform_v1beta1.types.InvocationConfig?hl=en)
中的 included_tags
字段。创建一个请求对象,其父级为 repo_uri
,并将初始化的 workflow_invocation
对象作为参数。发送请求以启动工作流调用。
运行完Cloud Run函数后,我们可以在“工作流执行日志”标签中看到Dataform作业的执行。
总结在这篇文章中,我们讨论了如何使用Python代码动态触发Dataform工作流执行,并传递需要处理的参数。通过利用Dataform API,我们可以与Cloud Run Functions等系统集成,并传递给工作流执行处理的参数。随着Dataform的持续发展,我们预计会看到新的功能和优化,从而进一步完善这一工作流执行流程。
谢谢您的阅读。
相关链接- [1] Medium。Alex Feldman,如何从Cloud Functions触发Dataform工作流。
- [2] Google Cloud 文档中的Dataform中的代码生命周期简介。
- [3] Google Cloud 文档中的Dataform,查看发行配置的详情
- [4] Google Cloud 文档中的Dataform,类(Class)CodeCompilationConfig:查看文档
- [5] Google Cloud 文档中的Dataform,类(Class)WorkflowInvocation:查看文档