手记

用亚马逊SQS(Amazon Simple Queue Service)提升后端系统的性能

我年底假期有空闲时间,想找点乐子来逃避日常工作的枯燥。平时我用JavaScript和TypeScript写代码,还用Python写程序。

巴厘国际机场

几个月前,我开始学习并参与了一个使用Go语言的项目,今天我再次开始用Go编程(说实话,我真的超爱Go)。因此,我决定在假期里做一些有趣的项目,并深入研究消息队列。

首先,我研究了一下Go项目的结构。我发现了一个仓库,https://github.com/golang-standards/project-layout,它分享了关于Go项目标准布局建议的一些见解。(不过说句公道话,Go里并没有官方规定的项目结构标准。)

建设项目

之后,我创建了一个新的项目来创建一个RESTful API。这个API非常简单,它提供一个让新用户注册的功能。成功注册的用户会收到一封欢迎邮件。

  • cmd 是主应用程序
  • db 用来存储迁移记录
  • internal 包含私有的应用程序代码,例如配置信息
  • store 是我们编写应用程序代码的文件夹,在这里我们有用例和仓库。

我采用的是微服务架构,其中包括两个服务,

  1. 第一个服务处理用户的注册。
  2. 接下来的服务会给用户发送一封欢迎邮件。

基本上,我们可以用单体架构或一个单一的服务来实现这一点。不过,我们将其拆分成不同的服务?仅仅是因为我们想解耦这些服务。

解耦可以降低故障的风险,比如当后端服务器过载时,无法有效处理请求,最终可能会宕机。因此,解决这个问题的一个方法是消除各组件之间的依赖关系。我们该怎么做呢?

我们可以用消息队列。消息队列是服务间异步通信的一种方式,在微服务架构中很常见。

使用消息队列的想法并不局限于微服务。在我们发送邮件的场景中,不一定需要立即或实时发送邮件。相反,我们可以采取异步的方法,这就是我们选择使用消息队列的原因。

我们可以使用消息队列工具,例如RabbitMQ。不过,使用RabbitMQ时,我们需要自己管理基础设施。如果您不想自己处理软件管理和维护基础设施,可以选择Amazon SQS。

Amazon SQS

Amazon Simple Queue Service (Amazon SQS) 提供了一个安全、持久且可用的托管队列,帮助您集成和解耦分布式软件系统及组件以便解耦。

为什么我们要用Amazon SQS?

我们希望简化基础设施并避免管理它。Amazon SQS 提供了一种简单且可靠的方式来使用队列来解耦和连接组件(microservices)。

Amazon SQS 的特点
  • Amazon SQS 有两种类型,标准型和先进先出 (FIFO)。
  • 消息和队列的数量是无限的。
  • 加密的消息,使用 AWS 密钥管理服务(AWS KMS)管理的密钥保护 Amazon SQS 队列中的消息内容。
  • 可以与其他 Amazon 服务集成
  • 批处理:可以批量发送、接收或删除最多 10 条消息或 256KB 的数据。
先决条件
  • AWS账号, * AWS访问密钥对

如果你想得到帮助,可以试试这个入门项目:https://github.com/arasopraza/go-rest

所以,我们跳到代码吧~~

在第一个RESTful API项目中,使用以下命令安装Go语言版本的AWS配置和SQS服务SDK v2:

<命令>

请注意,具体的安装命令需要参考AWS SDK for Go的官方文档。

go get github.com/aws/aws-sdk-go-v2/config  // 获取所需的AWS SDK配置包
go get github.com/aws/aws-sdk-go-v2/service/sqs  // 获取所需的AWS SDK SQS服务包

接着,在内部目录中创建一个叫做“sqs”的新目录。

在目录里创建一个名为“sqs.go”的文件。

接下来,写这段代码。

NewClient 函数用于初始化 SQS 客户端并加载本地配置。SendMessage 函数用于将消息发送到 SQS 中。

main.go 文件中像这样导入 SQS 客户端:

    // 从环境变量中获取SQS URL
    queueUrl := os.Getenv("SQS_URL")
    // 使用获取到的URL创建一个新的SQS客户端
    sqsClient := sqs.NewClient(queueUrl)

确认你已经创建了亚马逊SQS队列。

亚马逊 SQS 支持两种队列类型:标准队列和FIFO队列。以下是它们的主要区别:

  • 标准队列:保证至少一次投递,这意味着您的消息可能会被消费不止一次。如果消息顺序不重要,请使用这个。
  • FIFO队列:保证恰好一次投递并维护消息的顺序。如果需要确保消息按正确的顺序投递,请使用这个。

因为在这种情况下,消息顺序并不重要,所以我们使用标准队列。

亚马逊 SQS 仪表盘

接下来,打开你的用例文件。在这种情况下,我们这样写用例。

在这种情况下,我们希望在用户成功注册后发送电子邮件。因此,我们将SQS客户端作为用户用例的依赖项。

CreateUser 函数内部,用户成功注册后,我们会调用 SQS 服务并将包含用户电子邮件的信息发送到 Amazon SQS。发送至 Amazon SQS 的用户电子邮件将由我们的第二个服务来处理。

来设置我们的第二个服务

创建一个用来发送欢迎邮件给用户的第二个服务。这个服务很简单,它只有一个任务,就是发送一封欢迎邮件。

和第一个服务一样,我们需要安装依赖。运行以下命令来安装所需的依赖:

# 这里插入安装依赖的命令
获取 AWS SDK 配置和 SQS 服务的 Go 语言客户端库
go get github.com/aws/aws-sdk-go-v2/config  
go get github.com/aws/aws-sdk-go-v2/service/sqs

我们首先使用 net/smtp 包来配置电子邮件。

在名为“actions”的新目录内创建一个名为 send_email.go 的文件。编写如下代码:

接下来,请在 actions 文件夹内创建一个新的文件 get_message.go,如下所示。

打开文件,如下所示编写代码:

package actions

import (
 "context"
 "log"

 "github.com/aws/aws-sdk-go-v2/aws"
 "github.com/aws/aws-sdk-go-v2/service/sqs"
 "github.com/aws/aws-sdk-go-v2/service/sqs/types"
)

// SQS 动作结构体定义
type SqsActions struct {
 SqsClient *sqs.Client
}

// 获取消息
func (actor SqsActions) GetMessages(ctx context.Context, queueUrl string, maxMessages int32, waitTime int32) ([]types.Message, error) {
 var messages []types.Message
 result, err := actor.SqsClient.ReceiveMessage(ctx, &sqs.ReceiveMessageInput{
  QueueUrl:            aws.String(queueUrl),
  MaxNumberOfMessages: maxMessages,
  WaitTimeSeconds:     waitTime,
 })
 if err != nil {
  log.Printf("未能从队列%s中获取消息,原因:%s\n", queueUrl, err)
 } else {
  messages = result.Messages
 }
 return messages, err
}

// 删除消息
func (actor SqsActions) DeleteMessage(ctx context.Context, queueUrl string, receiptHandle string) (*sqs.DeleteMessageOutput, error) {
 return actor.SqsClient.DeleteMessage(ctx, &sqs.DeleteMessageInput{
  QueueUrl:      aws.String(queueUrl),
  ReceiptHandle: aws.String(receiptHandle),
 })
}

我们创建了两个函数,GetMessagesDeleteMessageGetMessages 获取指定的 SQS 队列中的消息。DeleteMessage 在消费该消息之后,使用它的接收柄从 SQS 队列(Amazon Simple Queue Service 队列)中删除特定的消息。

记住:由于重试或延迟,Amazon SQS 标准队列中的消息可能会被传递和接收多次。处理完消息后删除消息以避免多次传递非常重要。

接着,创建文件 main.go 并写入如下代码:

package main

import ( 
 "context"
 "log"

 "github.com/joho/godotenv"
)

func main() { 
 ctx := context.Background()

 // 加载环境变量
 err := godotenv.Load("../.env") 
 if err != nil { 
  log.Fatal(err)
 } 
}

然后,在 main.go 文件里添加 processMessage 这个函数。

    func processMessage(sqsActions actions.SqsActions, queueUrl string) error {  
     messages, err := sqsActions.GetMessages(context.Background(), queueUrl, int32(10), int32(20))  
     if err != nil {  
      log.Fatalf("无法获取消息: %v", err)  
     }  

     // // 如果队列中没有消息  
     if len(messages) == 0 {  
      fmt.Println("队列中没有新消息。")  
      return nil  
     }  

     // // 打印消息  
     for _, message := range messages {  
      err = actions.SendMail(*message.Body)  
      if err != nil {  
       log.Fatal(err.Error())  
       continue  
      }  

      _, err = sqsActions.DeleteMessage(context.Background(), queueUrl, *message.ReceiptHandle)  
      if err != nil {  
       log.Printf("删除消息时出错: %v", err)  
      } else {  
       log.Printf("删除了消息: %s", *message.Body)  
      }  

      fmt.Printf("邮件发送成功,内容为: %s\n", *message.Body)  
     }  

     return nil  
    }

main.go 文件的函数中添加以下代码:

    sdkConfig, err := config.LoadDefaultConfig(ctx, config.WithRegion("ap-southeast-1"))
    if err != nil {
        fmt.Printf("错误: %v\n", err)
        return
    }

    // 创建一个SQS客户端
    sqsClient := sqs.NewFromConfig(sdkConfig)

    // 创建一个SqsActions实例
    sqsActions := actions.SqsActions{SqsClient: sqsClient}

    // 定义参数
    queueUrl := os.Getenv("SQS_URL")

    log.Printf("消费者服务运行中...\n")
    for {
        err := processMessage(sqsActions, queueUrl)
        if err != nil {
            log.Printf("处理消息时出错: %v\n", err)
        }
        time.Sleep(5 * time.Second)
    }

我们配置了 SDK 和 Amazon SQS 客户端(客户端)。然后,我们调用 processMessage 函数来从 Amazon SQS 接收消息。

以下为 main.go 中的完整代码

测试,

最后,我们可以启动我们的服务。

在第一次运行服务时,执行这个命令 make run app

在第二个服务里,你可以运行这个命令 go run main.go

第二个服务将监听亚马逊的SQS队列。

如果有消息,服务会立即消费它。消息中包含用户的电子邮件地址,该地址已成功注册过。接下来的服务将使用该电子邮件地址作为收件地址。

访问端点localhost:3000/api/v1/user,请求体格式如下所示。

请求包括用户名和电子邮件。

运行 HTTP 请求后,如果一切顺利,你将收到类似这样的回复。

接着去你的第二个服务看看终端里的日志。

当亚马逊SQS(简单队列服务)中有消息时,服务会获取该消息并执行发送邮件的任务,也就是发送邮件。

要确保邮件发送成功,我们可以查看邮箱里的收件箱(一个测试邮箱的工具,Mailtrap)。

真酷!

用户成功收到了欢迎邮件。

在 AWS 控制台里,您还可以通过访问此链接:https://console.aws.amazon.com/sqs/ 来监视您的队列。然后找到其中一个队列,并选择 监视 选项卡。

监控 Amazon SQS 服务

另外,您还可以将Amazon SQS与Amazon CloudWatch集成起来,以便监控更详细的指标数据,并启用日志记录,从而增强可见性。

谢谢你的阅读~~

想联系我吗?  
点击链接:  
我的社交媒体都在下面:  
https://www.linkedin.com/in/arasopraza/  
https://github.com/arasopraza  
https://twitter.com/arsyopraza  
https://arsyopraza.com

参考:

使用 Go 语言 AWS SDK 编写的 Amazon SQS 示例程序docs.aws.amazon.com 什么是 AWS Go v2 SDK?了解 AWS Go v2 SDK 详情docs.aws.amazon.com sqs 包提供了 Amazon 简单队列服务的 API 客户端、操作和参数类型。
0人推荐
随时随地看视频
慕课网APP