我正在尝试创建一个需要在多个任务之间传递数据的Flyte工作流程。我查看了文档中的示例之一,但尝试尽可能少地重新创建 blob 传递,但我仍然无法让它工作。
这是我的完整工作流程定义(当然,我的实际用例会产生更多数据):
from flytekit.sdk.tasks import python_task, outputs, inputs
from flytekit.sdk.types import Types
from flytekit.sdk.workflow import workflow_class, Output, Input
@inputs(the_text=Types.String)
@outputs(the_blob=Types.Blob)
@python_task
def create_blob(wf_params, the_text, the_blob):
fname = "a-file.txt"
with open(fname, "w") as f:
f.write(the_text)
the_blob.set(fname)
@inputs(the_blob=Types.Blob)
@outputs(the_text=Types.String)
@python_task
def read_blob(wf_params, the_blob, the_text):
the_blob.download()
with open(the_blob.local_path) as f:
the_text.set(f.read())
@workflow_class
class PassBlob:
input_text = Input(Types.String, required=True, help="The text to write to the file")
create = create_blob(the_text=input_text)
read = read_blob(the_blob=create.outputs.the_blob)
output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")
此工作流部署成功,当我运行它时,会发生以下情况:
该create任务成功运行,并说明其输出:
the_blob:
type:
single
uri:
/4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6
在任务之间传递 s 的正确方法是什么Types.Blob
?我该如何进行这项工作?
www说
相关分类