在 Fargate 中使用 ZeroMQ Golang

我正在尝试在以 awsvpc 模式在 Fargate 上运行的 ECS 中使用 ZeroMQ。我有 2 个不同的服务,每个服务都运行自己的任务并启用服务发现。


我在一个名为 broker 的微服务中创建了我的 Router 和 Dealer。


front, _ := zmq.NewSocket(zmq.ROUTER)

defer front.Close()

front.Bind("tcp://*:4070")


back, _ := zmq.NewSocket(zmq.DEALER)

defer back.Close()

back.Bind("tcp://*:4080")

然后我将这 2 个套接字添加到轮询器,并有一个等待消息的 for 循环。


我有一个单独的微服务连接到套接字并尝试向经销商发送消息。我已经设置了服务发现,所以我假设我连接的地址是:


“tcp://broker:4070”


下面是来自“serviceA”的代码


func New(ZMQ models.ZMQ) *Requester {

    s, err := zmq.NewSocket(zmq.REQ)

    if err != nil {

        log.Fatalln("shareholder/requester zmq.NewSocket", err)

    }

    p := zmq.NewPoller()

    p.Add(s, zmq.POLLIN)


    log.Println("Requester", ZMQ.Req)

    err = s.Connect("tcp://broker:4070")

    if err != nil {

        log.Print(fmt.Errorf("err is %w", err))

    }


    req := &Requester{

        Poller:  p,

        Retries: 2,

        Socket:  s,

        Timeout: time.Duration(time.Minute),

    }

    runtime.SetFinalizer(req, (*Requester).Close)

    return req

}

然后我使用上面的代码通过我的套接字连接发送消息


_, err := r.Socket.SendMessage(req)

但是,我的代理服务中从未收到我的消息。我可以使用我在服务发现期间注册的主机名在网络上访问我的 REST API,Fargate/ECS/ZeroMQ 有什么我在这里缺少的吗?


陪伴而非守候
浏览 212回答 2
2回答

潇潇雨雨

问:“Fargate/ECS/ZeroMQ 有什么我在这里缺少的吗???”可能是,可能不是。让我们以结构化的方式开始,深入了解根本原因:步骤 0:Broker服务节点提到要使用 ZeroMQ,所以我们将从这一点开始。鉴于您的选择是使用 AccessPoint 到DEALERon address( *:4070 )和 AccessPoint to ROUTERon address ( *:4080 ),并且都使用.bind()-method 来激活-Microservice 节点tcp://内的 -Transport 类Broker,我们的下一步是验证该节点是否以及如何实际对世界其他地方可见。所以,让它运行。第 1 步:视线测试这是测试的第一步Broker--Node,无论它的实现是什么,对“目标受众”实际上是可见的吗?如果没有,ZeroMQ 或其他框架内没有太多可做的,但您的任务是获取地址、L1 信号互连、L2-arp/rarp MAC-检测/映射、L3-路由权限/访问列表/filters/xlations/etc,(动态)DNS-updates 和所有其他配置更新,这样您就可以让世界其他地方的(选择性部分)看到并离成功更近一步.connect()$ #&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;is it L3-(in)-visible # a [ PASS ] | [ FAIL ]$ ping <_a_Broker_Node_Assumed_TCP/IP_Address>&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; # a [ PASS ] | [ FAIL ]第 2 步:端口号 RTO-Test$ #&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; 4070 # a [ PASS ] | [ FAIL ]$ netcat -vz <_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]$ ######$ # OR :$ ######$ telnet&nbsp; &nbsp; &nbsp;<_a_Broker_Node_visible_TCP/IP_Address> 4070 # a [ PASS ] | [ FAIL ]Trying&nbsp;Connected to&nbsp;Escape character is '^]'.https://<_a_Broker_Node_visible_TCP/IP_Address>:4070HTTP/1.1 400 Bad RequestServer: nginxDate: Mon, 03 May 2020 18:14:54 GMTContent-Type: text/htmlContent-Length: 150Connection: close<html><head><title>400 Bad Request</title></head><body><center><h1>400 Bad Request</h1></center><hr><center>nginx</center></body></html>Connection closed by foreign host.$$ //&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;4080 // a [ PASS ] | [ FAIL ]$ telnet <_a_Broker_Node_visible_TCP/IP_Address> 4080 // a [ PASS ] | [ FAIL ]第 3 步:本地消息发送的 RTO 测试替换相当复杂的REQ/ROUTER-Scalable Formal Communications Archetype Pattern 领域,让我们用一个简单的PUSH/PULL-message 传递测试进行测试,它(出于显而易见的原因)与发送消息的预期用途相匹配:package mainimport (&nbsp; &nbsp; zmq "github.com/pebbe/zmq4"&nbsp; &nbsp; "log"&nbsp; &nbsp; "fmt"&nbsp; &nbsp; "time"&nbsp; &nbsp; ...)func PushTASK() {&nbsp; &nbsp; aCtx, err&nbsp; &nbsp; := zmq.NewContext()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln( "__NACK: aCtx instantiation failed in zmq.NewContext()",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; }&nbsp; &nbsp; aPusher, err := aCtx.NewSocket( zmq.PUSH )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln( "__NACK: aPusher instantiation failed in aCtxNewSocket()",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; }&nbsp; &nbsp; err = aPusher.SetLinger( 0 )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln( "__NACK: aPusher instance failed to .SetLinger()",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err&nbsp; )&nbsp; &nbsp; }&nbsp; &nbsp; err = aPusher.SetConflate( true )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Fatalln( "__NACK: aPusher instance failed to .SetConflate()",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err&nbsp; )&nbsp; &nbsp; }&nbsp; &nbsp; log.Println( "POSACK: aPusher instantiated and about to .connect( tcp://addr:port#)" )&nbsp; &nbsp; err = aPusher.Connect( "tcp://broker:4070" )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Print( fmt.Errorf( "__NACK: aPusher failed to .connect(): %w",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; }&nbsp; &nbsp; log.Println( "POSACK: aPusher RTO and about to .SendMessage*()-loop" )&nbsp; &nbsp; for aPush_NUMBER := 1; aPush_NUMBER < 10000; aPush_NUMBER++ {&nbsp; &nbsp; &nbsp; &nbsp; err = aPusher.SendMessageDontwait( aPush_NUMBER )&nbsp; &nbsp; &nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; log.Print( fmt.Errorf( "__NACK: aPusher failed to .SendMessageDontwait()[%d]: %w",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; aPush_NUMBER,&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; time.Sleep( 0.1 * time.Second )&nbsp; &nbsp; }&nbsp;// ---------------------------------------------------BE NICE TO RESOURCES USED&nbsp; &nbsp; err = aPusher.Disconnect( "tcp://broker:4070" )&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Print( fmt.Errorf( "__NACK: aPusher failed to .Disconnect( tcp://addr:port ): %w",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; }&nbsp;// ---------------------------------------------------BE NICE TO RESOURCES USED&nbsp; &nbsp; err = aPusher.Close()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Print( fmt.Errorf( "__NACK: aPusher failed to .Close(): %w",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; }&nbsp;// ---------------------------------------------------BE NICE TO RESOURCES USED&nbsp; &nbsp; err = aCtx.Term()&nbsp; &nbsp; if err != nil {&nbsp; &nbsp; &nbsp; &nbsp; log.Print( fmt.Errorf( "__NACK: aCtx failed to .Term(): %w",&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; err )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;)&nbsp; &nbsp; }&nbsp;// ---------------------------------------------------WE ARE CLEAR TO TERMINATE}第 4 步:远程消息接收的 RTO 测试如果[ PASS ] | [ FAIL ]-tests 都没有崩溃,下一步是反映PUSH"remote" 的 -side 概念Broker,是的,重写它以使用PULL-side 并部署它以查看是否也没有崩溃以及是否消息在仍在运行或重新运行第 3 步时按应有的方式到达。第 5 步:享受 ZeroMQ 的强大功能一旦上述所有测试确实完成了[ PASS ],您不仅可以确定 ZeroMQ 不是阻碍因素,而且还可以将部署的原则增强到任何进一步的用例场景中,给定 L1-/L2-/L3-/ZeroMQ-服务以正确和可验证的方式落实到位。

繁星淼淼

我会将我的想法描述为一个答案,我相信我们可以解决您的问题。所以我认为这是你的设置。服务Anginxbackend通过调用backend:9000backendnginx通过调用nginx:80服务 A 到 Bnginx&nbsp;无法通过broker1:4070cannnginx和backendcann 都不能调用broker1或broker2仅指定name:port。如果容器在不同的服务中运行并且每个服务都有自己的awsvpc,则不能仅通过指定name:port来调用它们。您需要跨服务的连接,从 A 到 B,这意味着您需要适当的服务发现。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Go