猿问

嵌入式 Kafka 以错误的分区数开始

我在 JUnit 测试中启动了一个 EmbeddedKafka 实例。我可以在应用程序中正确读取已推送到流的记录,但我注意到的一件事是每个主题只有一个分区。谁能解释为什么?


在我的应用程序中,我有以下内容:


List<PartitionInfo> partitionInfos = consumer.partitionsFor(topic);

这将返回一个包含一项的列表。当针对具有 3 个分区的本地 Kafka 运行时,它会按预期返回包含 3 个项目的列表。


我的测试看起来像:


@RunWith(SpringRunner.class)

@SpringBootTest

@EmbeddedKafka(partitions = 3)

@ActiveProfiles("inmemory")

@DirtiesContext(classMode = DirtiesContext.ClassMode.AFTER_EACH_TEST_METHOD)

@TestPropertySource(

                locations = "classpath:application-test.properties",

                properties = {"app.onlyMonitorIfDataUpdated=true"})

public class MonitorRestKafkaIntegrationTest {


    @Autowired

    private EmbeddedKafkaBroker embeddedKafkaBroker;


    @Value("${spring.embedded.kafka.brokers}")

    private String embeddedBrokers;


    @Autowired

    private WebApplicationContext wac;


    @Autowired

    private JsonUtility jsonUtility;


    private MockMvc mockMvc;


    @Before

    public void setup() {

            mockMvc = webAppContextSetup(wac).build();

            UserGroupInformation.setLoginUser(UserGroupInformation.createRemoteUser("dummyUser"));

    }


        private ResultActions interactiveMonitoringREST(String eggID, String monitoringParams) throws Exception {

            return mockMvc.perform(post(String.format("/eggs/%s/interactive", eggID)).contentType(MediaType.APPLICATION_JSON_VALUE).content(monitoringParams));

        }


呼唤远方
浏览 138回答 2
2回答

慕容3067478

您需要告诉经纪人预先创建主题...@SpringBootTest@EmbeddedKafka(topics = "foo", partitions = 3)class So57481979ApplicationTests {&nbsp; &nbsp; @Test&nbsp; &nbsp; void testPartitions(@Autowired KafkaAdmin admin) throws InterruptedException, ExecutionException {&nbsp; &nbsp; &nbsp; &nbsp; AdminClient client = AdminClient.create(admin.getConfig());&nbsp; &nbsp; &nbsp; &nbsp; Map<String, TopicDescription> map = client.describeTopics(Collections.singletonList("foo")).all().get();&nbsp; &nbsp; &nbsp; &nbsp; System.out.println(map.values().iterator().next().partitions().size());&nbsp; &nbsp; }}num.partitions或者,如果您希望代理在首次使用时为您自动创建主题,则设置代理属性。我们可能应该根据分区属性自动执行此操作。

慕丝7291255

我发现bootstrapServersPropertyis important in&nbsp;@EmbeddedKafka,它用于填充 中的属性application-test.yml,然后可用于创建消费者/侦听器容器。
随时随地看视频慕课网APP

相关分类

Java
我要回答