工作流基本上是一系列按顺序执行的任务列表,每个任务可以选择将其输出作为下一个任务的输入。这里有一些工作流的不同应用场景,我们举几个例子。
用户注册的流程
- 注册一个用户,包括将用户头像上传到存储空间,然后在数据库中创建用户并将头像链接与其关联,最后发送验证邮件。
 - 用户通过网站的联系表单发送消息时,我们将他们作为新的潜在客户添加到CRM中,将他们的消息推送到Slack销售通道,然后确认他们是否为新联系人,并相应发送营销邮件。
 
在这份指南中,我们将一步步地使用构建器模式来创建我们自己的工作流处理程序,并且为了使这个过程更加有趣,我们将使用注册的例子。
在开始之前,请确保您具备以下条件:在开始实现之前,
- Node.js 和 npm
 - 一些基本的 TypeScript 知识
 
在进入正题之前,我们先试着看看如何用基本的方法来实现这个流程。
1 创建一个新的Node.js应用项目    mkdir workflow-sample  
    cd workflow-sample  
    # 使用npm init -y 初始化项目,自动配置 package.json
    npm init -y
 2. 添加 TypeScript 支持
首先,我们将安装所需的软件包
在命令行中输入以下命令来安装:
npm install -D typescript ts-node @types/node 
接下来,我们将在根文件夹中创建一个新的 tsconfig.json 文件,并粘贴以下代码:
{  
  "compilerOptions": {  
    "target": "目标",  
    "module": "模块",  
    "outDir": "输出目录",  
    "esModuleInterop": "es模块互操作",  
    "forceConsistentCasingInFileNames": "强制文件名大小写一致",  
    "strict": "严格",  
    "skipLibCheck": "跳过库检查"  
  }  
}
为了能通过命令“npm run dev”运行代码,我们将在package.json中添加这个脚本。
{  
  ...  
  "scripts": {  
    "dev": "ts-node index.ts"  // 开发脚本,运行 ts-node index.ts
  }  
}
最后,我们将创建一个新的 index.ts 文件并开始写代码
3. 创建注册功能部分
由于本指南的目标是处理工作流而不是深入探讨每一步的工作原理,我们将采用一种叫做 — 关注分离 的方法。
注册流程中有多个关注点,我们将主要功能拆分成多个子功能,只关心每个子功能的输入和输出。
所以,我们在 index.ts 文件中的主要功能如下:
    // 注册需要用户的电子邮件、密码和图像文件
    // 这可以通过创建一个API并使用类似multer的库来完成
    const register = async (email: string, password: string, image: Blob) => {
      try {
        // 1. 此函数上传图像文件并返回该图像的链接
        let imageLink = await uploadImage(image);
        // 2. 此函数将用户存入数据库
        let savedUser = await saveUser({ email, password, imageLink });
        // 3. 此函数向用户发送确认邮件
        let result = await sendVerificationEmail(email);
      } catch (error) {
        console.log("处理失败了");
      }
    };
现在,我们将尝试模拟每个函数的输入和输出情况。因为每个子功能运行都需要花费一定的时间,我们将使用一个自定义的超时函数来模拟这种情况。
为了做到这一点,让我们把这个代码添加到 index.ts 文件中
    function timeout(ms: number) {  
      return new Promise((resolve) => setTimeout(resolve, ms));  
    }  
    // 1. 这个函数上传图像文件并返回图像链接  
    const uploadImage = async (image: Blob) => {  
      await timeout(1000);  
      return { imageLink: "图像链接" };  
    };  
    // 2. 这个函数将用户信息保存到数据库  
    const saveUser = async ({  
      email,  
      password,  
      imageLink,  
    }: {  
      email: string;  
      password: string;  
      imageLink: string;  
    }) => {  
      await timeout(1000);  
      return {  
        id: "1",  
        email,  
        password,  
        image: imageLink,  
      };  
    };  
    // 3. 这个函数向用户发送验证邮件  
    const sendVerificationEmail = async ({ email }: { email: string }) => {  
      await timeout(1000);  
      console.log("邮件已成功发送至 " + email + "。");  
    };
 关于这种方法的问题
即使这个过程成功完成,还是有一些问题需要处理,促使我们创建一个处理这些问题的流程。
第一个问题是缺少重试机制。所以,例如,如果由于连接问题存储服务没有响应,整个流程就会失败。这时你可能会说,我们可以尝试在流程失败时重新运行它,虽然这个解决方案有效,但它仍有一个缺点,即如果错误不是暂时的连接问题,就会进入无限循环。
第二个问题是,例如,当第二步失败时,我们需要重新从第一步开始运行所有步骤。所以,如果第一步很复杂,需要10秒钟才能运行,那么每次后续步骤失败时,都需要重新运行第一步。
接下来,我们将使用建造者模式创建一个类,来解决所有提到的问题,这个模式简洁实用。
创建我们的工作流类(workflow类)根据我们看到的问题,我们的班级应该能够符合几个条件:
- 我们可以创建一个函数管道
 - 每个函数的输出可以作为下一个函数的输入
 - 如果一个函数抛出错误,应从该函数开始重新尝试
 - 工作流可以设置重试次数限制(这意味着如果设置了3次重试限制,一旦达到3次仍然失败,整个工作流将会被终止)
 
执行工作流中的各个步骤
让我们创建一个新的 /core/workflow.ts 并从其中导出一个类
    export class 工作流 {  
      private 重试次数限制: number;  
      constructor(重试次数限制: number = 3) {  
        this.重试次数限制 = 重试次数限制;  
      }  
      static 创建工作流程实例(  
        重试次数限制: number,  
        回调函数: (工作流: 工作流) => void  
      ): 工作流 {  
        const 实例 = new 工作流(重试次数限制);  
        回调函数(实例);  
        return 实例;  
      }  
      async 执行(): Promise<void> {}  
    }
我们的类中有一个“createWorkflow”方法,可以设置重试次数的限制,并将创建的工作流实例传递给回调函数。调用“run”方法将触发工作流的开始。
我们现在一个个开始看看能不能符合这些条件。
1- 我们可以设立一个函数的流水线。我们希望我们的类可以创建任意数量的步骤(函数),并且可以创建最终步骤。为此,我们将添加两个方法,‘create’方法会创建一个新的StepFunction,以及‘finally’方法将设置最终的FinallyFunction。让我们修改我们的类。
    type StepFunction = (input: any) => Promise<any>;  
    type FinallyFunction = (input: any) => void;  
    /**
* 这是一个用于实现工作流处理的类
     */
    export class Workflow {  
      private steps: StepFunction[] = [];  
      private retryLimit: number;  
      private finallyCallback?: FinallyFunction;  
      constructor(retryLimit: number = 3) {  
        this.retryLimit = retryLimit;  
        // 构造函数用于初始化工作流实例,并设置重试限制
      }  
      /**
* 创建一个工作流实例并调用回调函数传递该实例
       */
      static createWorkflow(  
        retryLimit: number,  
        callback: (workflow: Workflow) => void  
      ): Workflow {  
        const workflow = new Workflow(retryLimit);  
        callback(workflow);  
        return workflow;  
      }  
      /**
* 创建一个步骤函数,并将其添加到步骤数组中
       */
      create(stepFunction: StepFunction): this {  
        this.steps.push(stepFunction);  
        return this;  
      }  
      /**
* 设置一个最终回调函数
       */
      finally(callback: FinallyFunction) {  
        this.finallyCallback = callback;  
      }  
      /**
* 异步执行工作流,返回一个任何类型的Promise对象
       */
      async run(): Promise<void> {}  
    }
你可能已经注意到了 “create” 方法返回了“this”,其目的是实现链式调用(你会看到它是如何工作的)。
对于其他情况,我们将修改我们程序的“运行”方法。
2- 每个函数的输出都能够传递给下一个函数作为输入    // core/workflow.ts    
    async run(initialInput?: any): Promise<void> {  
        // 提供初始输入给第一个步骤来启动流程  
        let input = initialInput;  
        for (let i = 0; i < this.steps.length; i++) {  
          const step = this.steps[i];  
          try {  
              // 将输入传递给下一个步骤  
              input = await step(input);  
            } catch (error) {  
              console.error("因步骤出错导致工作流中断。");  
              break;  
            }  
        }  
        if (this.finallyCallback) {  
          try {  
            this.finallyCallback(input);  
          } catch (error) {  
            console.error("最终步骤出错:", error);  
          }  
        }  
      }
 3- 处理错误与重试
为了处理重试情况,我们将不再抛出错误来终止工作流,而是会重新运行这个步骤,直到达到重试次数的上限。为此,我们将按照以下方式调整我们的方法。
    // core/workflow.ts
    async run(initialInput?: any): Promise<void> {
        // 提供初始输入给第一个步骤执行
        let input = initialInput;
        let attempts = 0;
        let success = false;
        for (let i = 0; i < this.steps.length; i++) {
          const step = this.steps[i];
          // 在未达到重试限制并且步骤未成功的情况下,继续重试
          // 这也确保了我们从失败的步骤重新开始执行
          while (attempts < this.retryLimit && !success) {
            try {
              // 传递步骤的输入到下一个步骤
              input = await step(input);
              success = true;
            } catch (error) {
              attempts++;
              if (attempts === this.retryLimit) {
                console.error(
                  `步骤 ${i + 1} 在尝试了 ${attempts} 次后失败,错误信息为:`,
                  error
                );
              }
            }
          }
          // 如果在所有尝试之后步骤仍然失败,工作流将被中断并终止
          if (!success) {
            console.error("由于步骤失败,工作流将被中断并终止。");
            break;
          }
        }
        if (this.finallyCallback) {
          try {
            this.finallyCallback(input);
          } catch (error) {
            console.error("在最终步骤中出现错误:", error);
          }
        }
      }
这样一来,我们的工作流类就符合所有条件并解决了所有问题。
使用我们的工作流对象在这里,我们将用我们自己的类来处理注册过程。
    import { Workflow } from "./core/workflow";  
    const register = (email: string, password: string, image: Blob) => {  
      Workflow.createWorkflow(3, (workflow) => {  
        workflow  
          .create(上传图片功能)  
          .create(({ imageLink }) =>  
            保存用户信息({  
              email,  
              password,  
              imageLink,  
            })  
          )  
          .finally(发送验证邮件的步骤);  
      }).run(image);  
    };  
    register("ob.mokhfi@gmail.com", "password", new Blob());
尽管这种方法完全可行,但建议保持所有步骤的输入和输出的可见性。我们可以这样调整注册功能:
const register = (email: string, password: string, image: Blob) => {  
  Workflow.createWorkflow(3, (workflow) => {  
    workflow  
      .create(async (image) => {  
        let imageLink = await uploadImage(image);  
        return { imageLink };  
      })  
      .create(async ({ imageLink }) => {  
        let user = await saveUser({  
          email,  
          password,  
          imageLink,  
        });  
        return user; // 返回 { id, email, password, image }  
      })  
      .finally(async ({ email }) => {  
        await sendVerificationEmail给({ email });  
        // 如果你在API中使用这个工作流,你可以在最后这里响应  
        // res.status(200).send("用户创建成功")  
      });  
  }).run(image);  
};
 下一步会是什么?
我知道!这本指南太枯燥了,细节太多。当然,你可以直接查看 workflow.ts 文件的代码,并了解最后部分的使用方法。编写这个指南的目的就是让你理解代码的每个部分,这样你就能自己动手修改了。
例如,你可以在这个流程中添加一个额外的条件,来测试自己是否理解。
如果在多次尝试后仍有一步失败,工作流应执行回滚。
试着在相同的注册过程中测试它,通过在保存用户时触发错误,并通过回滚删除上传到存储的图片文件。
感谢阅读!请随时欢迎提供您的反馈,告诉我们怎样可以做得更好🙏