在 Flyte 中的任务之间传递 blob

我正在尝试创建一个需要在多个任务之间传递数据的Flyte工作流程。我查看了文档中的示例之一,但尝试尽可能少地重新创建 blob 传递,但我仍然无法让它工作。

这是我的完整工作流程定义(当然,我的实际用例会产生更多数据):

from flytekit.sdk.tasks import python_task, outputs, inputs

from flytekit.sdk.types import Types

from flytekit.sdk.workflow import workflow_class, Output, Input



@inputs(the_text=Types.String)

@outputs(the_blob=Types.Blob)

@python_task

def create_blob(wf_params, the_text, the_blob):

    fname = "a-file.txt"

    with open(fname, "w") as f:

        f.write(the_text)


    the_blob.set(fname)



@inputs(the_blob=Types.Blob)

@outputs(the_text=Types.String)

@python_task

def read_blob(wf_params, the_blob, the_text):

    the_blob.download()

    with open(the_blob.local_path) as f:

        the_text.set(f.read())



@workflow_class

class PassBlob:

    input_text = Input(Types.String, required=True, help="The text to write to the file")


    create = create_blob(the_text=input_text)

    read = read_blob(the_blob=create.outputs.the_blob)


    output_text = Output(read.outputs.the_text, sdk_type=Types.String, help="The text read from the file")

此工作流部署成功,当我运行它时,会发生以下情况:


该create任务成功运行,并说明其输出:


the_blob:

  type:

    single

  uri:

    /4u/fe0c7c6326294497dac9-create-0/9c684e85918080341a14478b5f013ee6

在任务之间传递 s 的正确方法是什么Types.Blob?我该如何进行这项工作?



慕婉清6462132
浏览 54回答 1
1回答

www说

我假设您正在本地沙箱环境中运行它(您正在使用 minio,它是我们在沙箱环境中部署的测试 blob 存储)。您能否分享您用于注册工作流程的 Flytekit.config 文件。因此,Flyte 根据您的配置方式自动将中间数据存储在存储桶 (S3 / GCS) 中。前缀设置用于自动将数据上传到配置的存储桶和前缀 https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L7v0.7.0 之前的版本 - 使用配置中的分片格式化程序设置 - https://github.com/lyft/flytesnacks/blob/b980963e48eac4ab7e4a9a3e58b353ad523cee47/cookbook/sandbox.config#L14-L17另请告诉我们您正在运行的 Flyte 版本。请加入 Slack 频道,我可以帮助您入门。抱歉给大家带来的麻烦
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Python