在节奏工作流的循环内调用相同的活动

我在 cadence 工作流程中有一个问题,我们可以在 for 循环中使用不同的输入调用相同的活动吗?该代码将是确定性的吗?如果执行工作流的工作人员在执行期间停止并稍后重新启动,那么 cadence 在重新构建工作流时是否能够重播事件。


例如,我有以下代码。


   func init() {

    workflow.RegisterWithOptions(SampleWorkFlow, workflow.RegisterOptions{Name: "SampleWorkFlow"})

    activity.RegisterWithOptions(SampleActivity, activity.RegisterOptions{Name: "SampleActivity"})

    activity.RegisterWithOptions(SecondActivity, activity.RegisterOptions{Name: "SecondActivity"})

}


// SampleWorkFlow comment

func SampleWorkFlow(ctx workflow.Context, input string) error {


    fmt.Println("Workflow started")

    ctx = workflow.WithTaskList(ctx, sampleTaskList)

    ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)


    var result string

    err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)

    if err != nil {

        return err

    }


    for i := 1; i <= 10; i++ {

        value := i

        workflow.Go(ctx, func(ctx workflow.Context) {

            err := workflow.ExecuteActivity(ctx, "SecondActivity", input, value).Get(ctx, &result)

            if err != nil {

                log.Println("err=", err)

            }

        })

    }


    return nil


}


// SampleActivity comment

func SampleActivity(ctx context.Context, value, v1 string) (string, error) {

    fmt.Println("Sample activity start")

    for i := 0; i <= 10; i++ {

        fmt.Println(i)

    }

    return "Hello " + value, nil

}


// SecondActivity comment

func SecondActivity(ctx context.Context, value int) (string, error) {


    fmt.Println("Second  activity start")


    fmt.Println("value=", value)

    fmt.Println("Second activity going to end")

    return "Hello " + fmt.Sprintf("%d", value), nil

}

在这里,第二个活动在 for 循环中被并行调用。我的第一个问题是,这段代码是确定性的吗?


假设在循环 5 次迭代后,当 i = 5 时,执行此工作流的工作人员终止,如果工作流程在另一个工作人员中启动,cadence 是否能够重播事件?


你能回答我的问题吗?


皈依舞
浏览 162回答 1
1回答

慕标琳琳

是的,这段代码是确定性的。它不调用任何非确定性操作(如随机或 UUID 生成)并用于workflow.Go启动 goroutine。所以它是确定性的。代码的复杂性在定义其确定性方面不起任何作用。无关的尼特。无需在示例中使用 goroutine,因为ExecuteActivity调用已经通过返回 Future 实现了非阻塞。所以样本可以简化为:func SampleWorkFlow(ctx workflow.Context, input string) error {&nbsp; &nbsp; fmt.Println("Workflow started")&nbsp; &nbsp; ctx = workflow.WithTaskList(ctx, sampleTaskList)&nbsp; &nbsp; ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)&nbsp; &nbsp; var result string&nbsp; &nbsp; err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; for i := 1; i <= 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp;workflow.ExecuteActivity(ctx, "SecondActivity", input, i)&nbsp; &nbsp; }&nbsp; &nbsp; return nil}请注意,此示例仍然可能不会按照您期望的方式执行,因为它完成了工作流,而无需等待活动完成。所以这些活动甚至都不会开始。这是等待活动完成的代码:func SampleWorkFlow(ctx workflow.Context, input string) error {&nbsp; &nbsp; fmt.Println("Workflow started")&nbsp; &nbsp; ctx = workflow.WithTaskList(ctx, sampleTaskList)&nbsp; &nbsp; ctx = workflow.WithActivityOptions(ctx, conf.ActivityOptions)&nbsp; &nbsp; var result string&nbsp; &nbsp; err := workflow.ExecuteActivity(ctx, "SampleActivity", input, "string-value").Get(ctx, &result)&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; return err&nbsp; &nbsp; }&nbsp; &nbsp; var results []workflow.Future&nbsp; &nbsp; for i := 1; i <= 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; future := workflow.ExecuteActivity(ctx, "SecondActivity", input, i)&nbsp; &nbsp; &nbsp; &nbsp; results = append(results, future)&nbsp; &nbsp; }&nbsp; &nbsp; for i := 0; i < 10; i++ {&nbsp; &nbsp; &nbsp; &nbsp; var result string&nbsp; &nbsp; &nbsp; &nbsp; err := results[i].Get(ctx, &result)&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Println("err=", err)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }&nbsp; &nbsp; return nil}
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go