手记

我们如何利用服务器发送事件(SSE)实现实时通知系统

作为Seller Growth团队,我们负责分配、跟踪和管理各种旨在提升卖家参与和使用系统的任务及挑战。

我们的最新目标是在新任务的分配或完成时立即通知卖家。为此,我们考虑了三种可能的方案:HTTP轮询(长轮询)、服务器发送事件(SSE)技术以及WebSocket技术。

在这篇文章里,我们将讨论为什么我们在通知系统中使用SSE(服务器发送事件)。我们将探讨SSE相比我们考虑过的其他选择的优势,并通过使用NestJS框架的TypeScript代码示例来清晰地向您展示。

照片由 Markus Winkler 通过 Unsplash 拍摄。

我们使用的工具

为了实现我们实时通知系统,我们采取了以下几点措施:

  • 服务器发送事件(SSE):我们使用SSE在服务器和客户端之间建立一个实时的单向通道,以便通知一产生就能立即传递给客户端。SSE是一个轻量且简单的解决方案,我们只需要将通知推送到客户端。
  • Redis 发布/订阅:我们使用Redis发布/订阅作为消息系统,使多个发布者可以向多个订阅者发送消息。使用Redis发布/订阅,我们能够将通知分发到多个Pod中,确保所有卖家都能及时接收到通知。
  • 通知写入API:我们开发了一个通知写入API来收集其他应用程序的通知。此API会将收集到的通知发送至Redis。
  • 通知读取API:我们也开发了一个通知读取API,它订阅Redis并监听所有的通知。在收到通知后,读取API会通过SSE将这些通知发送给连接的客户端。

我们为啥选SSE呢?

经过仔细权衡,我们决定SSE是我们的通知系统最佳选择。其中一个主要原因是,虽然HTTP轮询是一种可行的选择,但并不能即时发送通知。使用HTTP轮询时,服务器需要定期检查是否有新的事件,这会耗费大量资源且效率低下,特别是在许多用户订阅了通知的情况下。

SSE,相比之下,提供了一个实时的、单向的通道,从服务器到客户端,使得通知一产生就能立即送达。这对于我们的应用场景来说非常重要,因为我们希望确保卖家能及时收到这些通知。

虽然WebSocket在实时通信中很受欢迎,但它们的实现相对于SSE来说更复杂。WebSocket支持全双工通信,这意味着服务器和客户端可以双向通信。然而,因为我们只需要向客户端推送通知,我们选择了SSE作为更简单和更有效的方法。

总而言之,我们决定使用SSE(服务器发送事件)作为我们的通知系统,因为它能够实现实时通知传递,并且相比WebSocket而言更加简单和轻便。

Redis 发布/订阅

为了从多个应用 pod 发送通知,我们采用了 Redis 发布/订阅功能。Redis 发布/订阅是一种消息系统,允许多个发布者向多个订阅者发送消息。这使我们能够将通知分发到多个 pod,确保所有卖家都能及时收到他们的通知。

    import { Redis } from 'ioredis';  

    @Injectable()  
    export class RedisService implements OnModuleInit {  

      ...  

      async onModuleInit() {  
        this.subscriber = new Redis({  
          sentinels: this.redisSentinelConfig.addresses,  
          name: this.redisSentinelConfig.masterName,  
          password: this.redisSentinelConfig.password,  
        });  

        await this.subscriber.subscribe(this.redisSentinelConfig.channelName, async (err, count) => {  
          if (err) {  
            this.logger.error(`订阅不成功: ${err.message}`);  
            return;  
          }  
          this.logger.log(`订阅成功!目前这客户端已经订阅了 ${count} 个频道`);  
        });  

        this.subscriber.on('message', async (channel, message: string) => {  
          const liveNotification: LiveNotification = JSON.parse(message);  
          await this.liveNotificationService.emit(liveNotification);  
        });  
      }  
    }
系统设计

为了收集来自其他应用程序的通知,我们开发了一个用于写入通知的 API。该 API 将通知发送到 Redis。读取通知的 API 订阅 Redis 并监听所有通知。收到通知后,通过 SSE 将其发送给连接的客户端。

因为多个读取接口可能运行在不同的应用 pod 中,每个读取接口都会收到 Redis 发送的所有通知。不过,这些读取接口会根据当前 pod 中连接的客户端来过滤通知,这确保了相关通知只会发给当前 pod 联接的客户端,从而减少从网络传输的无用数据量。

当通过 Redis 发布订阅为卖家发布通知时,所有读取 API 实例都会接收到该通知。然而,并不是所有的通知都与所有的连接客户端都相关。为了确保只将相关的通知发送给每个连接的客户端,读取 API 根据每个客户端的订阅来过滤通知。

    导入 { AuthGuard } from '@nestjs/passport';  
    导入 { Controller, Sse, UseGuards } from '@nestjs/common';  

    @Controller('/live-notification')  
    导出 class LiveNotificationController {  
      构造函数(private readonly liveNotificationService: LiveNotificationService) {}  

      @Sse()  
      @ApiBearerAuth()  
      @UseGuards(AuthGuard('jwt'))  
      公共 getEventsBySeller(@Tracers() tracers: ITracers, @SellerId() sellerId: number) {  
        返回 this.liveNotificationService.subscribeForSeller(sellerId);  
      }  
    }
    import { EventEmitter } from 'events';  
    import { filter, fromEvent } from 'rxjs';  

    @Injectable()  
    export class LiveNotificationService implements OnModuleInit {  

      private readonly emitter = new EventEmitter();  

      // ...  

      public async emit(data: LiveNotification) {  
        this.emitter.emit('liveNotification', { data });  
      }  

      public subscribeForSeller(sellerId: number) {  
        const source = fromEvent(this.emitter, 'liveNotification');  
        return source.pipe(  
          filter(({ data: liveNotification }) =>   
            liveNotification?.content === 'heartbeat' ||   
            liveNotification?.sellerId === sellerId)  
        );  
      }  
    }
问题来了
垂直扩展的难题:

我们的SSE和基于Redis的通知系统能够及时高效地将通知发送给连接的客户端。然而,随着通知数量的增加,我们预计会遇到垂直扩展的挑战,因为每个读取API都订阅了所有的通知,这可能成为系统的瓶颈。当通过Redis发布订阅向卖家发送通知时,所有读取API的实例都会收到该通知。然而,并非所有的通知都与每个连接的客户端相关。为了确保只有相关的通知被发送给每个连接的客户端,读取API会根据客户端的订阅来筛选通知。如果通知数量显著增多,这个筛选过程可能会变慢,导致通知发送延迟。

心跳包

我们的前端开发人员使用了一个事件源库来发送授权头信息。然而,他们报告说服务器和浏览器之间的连接在一分钟之后关闭了。经过进一步研究,我们发现该库需要发送心跳包来检测断开连接。为了解决这个问题,我们实现了每30秒发送一次心跳包,这解决了问题。

    onModuleInit(): any {  
      setInterval(() => {  
        const emitterListenerCount = this.emitter.listenerCount('liveNotification');  
        this.logger.log(`心跳信号已发送给 SSE 客户端:当前活跃的 emitter 监听数量为 ${emitterListenerCount}`);  
        this.emitter.emit('liveNotification', { data: { content: 'heartbeat' } });  
      }, 30000);  
    }
性能

目前,我们系统中有90,000个并发连接,我们正在运行15个Kubernetes Pod来处理生产环境中的负载。每个Pod大约消耗800MB内存和300Mi CPU资源。

Grafana CPU & 内存图,请求(黄)、限制(绿)、Pods(其他颜色)

总结一下

服务器发送事件(SSE)被证明是我们通知系统中的最佳选择,因为它相比而言更加简单和轻量级,能够提供实时的通知传递。利用Redis的发布/订阅功能,我们能够跨多个Pod分发通知,确保所有客户端都能及时收到通知。我们成功地为客户提供了一个可靠且高效的发送通知解决方案。

要了解更多关于前端细节,实现的细节和实现的内容,我建议阅读我同事写的相关文章。

通过服务器发送事件(Server-Sent Events)为增长中心的30多万卖家提供实时通知medium.com

本文将讨论我们如何利用服务器发送事件来提升30多万卖家的实时体验。

准备好让你的职业生涯更上一层楼了吗?加入我们的活力团队,在Trendyol做出不一样的贡献。

Trendyol - 后端工程师 - 远程从2010年开始,我们一直以充满活力和敏捷的初创公司精神运营。从那时起,我们已经成长为一家十角兽……
0人推荐
随时随地看视频
慕课网APP