kafka AdminClient API超时,等待节点分配

我是Kafka的新手,正在尝试使用AdminClientAPI管理在我的本地计算机上运行的Kafka服务器。我将其设置与Kafka文档的“快速入门”部分完全相同。唯一的区别是我还没有创建任何主题。


在此设置上运行任何外壳程序脚本都没有问题,但是当我尝试运行以下Java代码时:


public class ProducerMain{


    public static void main(String[] args) {

        Properties props = new Properties();

        props.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, 

            "localhost:9092");



        try(final AdminClient adminClient = 

              KafkaAdminClient.create(props)){


            try {

                final NewTopic newTopic = new NewTopic("test", 1, 

                    (short)1);


                final CreateTopicsResult createTopicsResult = 

                    adminClient.createTopics( 

                         Collections.singleton(newTopic));


                createTopicsResult.all().get();


            }catch (InterruptedException | ExecutionException e) {

                e.printStackTrace();

            }

        }

    }

}

错误: TimeoutException: Timed out waiting for a node assignment


Exception in thread "main" java.lang.RuntimeException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

at ProducerMain.main(ProducerMain.java:41)

    <br>Caused by: java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

at org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)

at org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)

at org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)

at org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:258)

at ProducerMain.main(ProducerMain.java:38)

<br>Caused by: org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment.

我已经在网上搜索了有关可能是什么问题的指示,但到目前为止没有发现任何问题。任何建议都值得欢迎,因为我已经走到尽头了。


BIG阳
浏览 321回答 2
2回答

函数式编程

听起来您的经纪人不健康...该代码可以正常工作public class Main {&nbsp; &nbsp; static final Logger logger = LoggerFactory.getLogger(Main.class);&nbsp; &nbsp; public static void main(String[] args) {&nbsp; &nbsp; &nbsp; &nbsp; Properties properties = new Properties();&nbsp; &nbsp; &nbsp; &nbsp; properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");&nbsp; &nbsp; &nbsp; &nbsp; properties.setProperty(AdminClientConfig.CLIENT_ID_CONFIG, "local-test");&nbsp; &nbsp; &nbsp; &nbsp; properties.setProperty(AdminClientConfig.RETRIES_CONFIG, "3");&nbsp; &nbsp; &nbsp; &nbsp; try (AdminClient client = AdminClient.create(properties)) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; final CreateTopicsResult res = client.createTopics(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Collections.singletonList(&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; new NewTopic("foo", 1, (short) 1)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; );&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; res.all().get(5, TimeUnit.SECONDS);&nbsp; &nbsp; &nbsp; &nbsp; } catch (InterruptedException | ExecutionException | TimeoutException e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; logger.error("unable to create topic", e);&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; }}而且我可以在代理日志中看到该主题已创建

桃花长相依

我用bitnami / kafka启动了kafka服务,并得到了完全相同的错误。尝试通过此版本启动kafka,它可以正常工作:https : //hub.docker.com/r/wurstmeister/kafka$ docker run -d --name zookeeper-server --network app-tier \&nbsp; -e ALLOW_ANONYMOUS_LOGIN=yes&nbsp; -p 2181:2181 zookeeper:3.6.2$ docker run -d --name kafka-server --network app-tier --publish 9092:9092 \&nbsp; --env KAFKA_ZOOKEEPER_CONNECT=zookeeper-server:2181 \&nbsp; --env KAFKA_ADVERTISED_HOST_NAME=30.225.51.235 \&nbsp; --env KAFKA_ADVERTISED_PORT=9092&nbsp; \&nbsp; wurstmeister/kafka30.225.51.235是主机的IP地址。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java