Apache Beam:无法通过 docker-compose 访问 Pub/Sub

我已经构建了一个软件,它使用 GCP Pub/Sub 作为消息队列,使用 Apache Beam 构建管道,使用 Flask 构建网络服务器。它在生产中运行顺利,但我无法将所有部分与 docker-compose 连接在一起,特别是 Apache Beam 管道。


我遵循Dataflow 管道和 pubsub 模拟器,通过将localhostSO 答案中docker-compose.yaml的


  pubsub_emulator:

    build: docker_images/message_queue

    ports:

      - 8085:8085


  webserver:

    build: docker_images/webserver

    environment:

      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085

      PUBSUB_PROJECT_ID: my-dev

    restart: unless-stopped

    ports:

      - 8899:8080

    depends_on:

      - pubsub_emulator


   pipeline:

    build: docker_images/pipeline

    environment:

      PUBSUB_EMULATOR_HOST: pubsub_emulator:8085

      PUBSUB_PROJECT_ID: my-dev

    restart: unless-stopped

    depends_on:

      - pubsub_emulator

网络服务器能够访问 Pub/Sub 模拟器并生成主题。


但是,管道在启动时失败,并显示MalformedURLException:


Caused by: java.lang.IllegalArgumentException: java.net.MalformedURLException: no protocol: pubsub_emulator:8085/v1/projects/my-dev/subscriptions/sync_beam_1702190853678138166

管道的选项看起来不错,我用以下方式定义它们:


final String pubSubEmulatorHost = System.getenv("PUBSUB_EMULATOR_HOST"); 


BasePipeline.PipeOptions options = PipelineOptionsFactory.fromArgs(args).withValidation()

                                .as(BasePipeline.PipeOptions.class);


options.as(DataflowPipelineOptions.class).setStreaming(true);


options.as(PubsubOptions.class).setPubsubRootUrl(pubSubEmulatorHost);


Pipeline pipeline = Pipeline.create(options);

有人知道正在发生的事情以及如何解决它吗?唯一的解决方案是否意味着将模拟器和管道设置在同一个 docker 中?


婷婷同学_
浏览 116回答 1
1回答

收到一只叮咚

您可以尝试将值更改为以下内容:http://pubsub_emulator:8085作为抱怨丢失的错误protocol,预计会出现http在您的情况下根据Apache Beam SDK,该值应为完全限定的 URL:// getPubsubRootUrl@Default.String(value="https://pubsub.googleapis.com") @Hiddenjava.lang.String getPubsubRootUrl()// Root URL for use with the Google Cloud Pub/Sub API.但是,如果您来自 python 背景,您会注意到这里显示的使用gRPC Python的Python SDK只需要包含地址和端口的服务器地址# A snippet from google-cloud-python library.if os.environ.get("PUBSUB_EMULATOR_HOST"):    kwargs["channel"] = grpc.insecure_channel(        target=os.environ.get("PUBSUB_EMULATOR_HOST")    )grpc.insecure_channel(target, options=None)Creates an insecure Channel to a server.The returned Channel is thread-safe.Parameters: target – The server address
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java