猿问

如何使用 RestTemplate 在 qpid 中创建队列?

我正在尝试为使用 RabbitMQ 的应用程序编写集成测试,为此我正在使用 Qpid 代理。我设法启动了服务器并且我的测试正在连接到它,但我需要在启动前在 Qpid 中创建队列。因为我有大量队列,所以我动态创建了 bean:


applicationContext.getBeanFactory().registerSingleton(queueName, queue);

这需要在启动前创建队列。


这是 qpid 配置文件:


{

  "name": "tst",

  "modelVersion": "2.0",

  "defaultVirtualHost" : "default",

  "authenticationproviders" : [ {

    "name" : "noPassword",

    "type" : "Anonymous",

    "secureOnlyMechanisms": []

        },

    {

      "name" : "passwordFile",

      "type" : "PlainPasswordFile",

      "path" : "/src/test/resources/passwd.txt",

      "secureOnlyMechanisms": [],

      "preferencesproviders" : [{

        "name": "fileSystemPreferences",

        "type": "FileSystemPreferences",

        "path" : "${qpid.work_dir}${file.separator}user.preferences.json"

        }

      ]

    }

   ],

  "ports" : [

    {

      "name": "AMQP",

      "port": "5673",

      "authenticationProvider": "passwordFile",

      "protocols": [

        "AMQP_0_10",

        "AMQP_0_8",

        "AMQP_0_9",

        "AMQP_0_9_1"

      ]

    }],

  "virtualhostnodes" : [ {

    "name" : "default",

    "type" : "JSON",

    "virtualHostInitialConfiguration" : "{ \"type\" : \"Memory\" }"

  }]


}

从官方文档(https://qpid.apache.org/releases/qpid-broker-j-7.1.4/book/Java-Broker-Management-Channel-REST-API.html#d0e2130)我读到可以为 REST 调用创建队列,所以我尝试使用 RestTemplate 来实现这一点,但它似乎没有创建队列。


    @BeforeClass

    public static void startup() throws Exception {

        brokerStarter = new BrokerManager();

        brokerStarter.startBroker();


        RestTemplate restTemplate = new RestTemplate();

        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue1", "");

        restTemplate.put("http://localhost:5673/api/latest/queue/default/queue-2", "");

    }

有人可以解释我做错了什么吗?谢谢你!


Smart猫小萌
浏览 125回答 2
2回答

人到中年有点甜

我使用 REST API 解决了同样的问题。为了创建/删除用于集成测试目的的队列,我使用以下配置文件 ( qpid-config.json):{&nbsp; "name": "EmbeddedBroker",&nbsp; "modelVersion": "8.0",&nbsp; "authenticationproviders": [&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; "name": "anonymous",&nbsp; &nbsp; &nbsp; "type": "Anonymous"&nbsp; &nbsp; }&nbsp; ],&nbsp; "ports": [&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; "name": "AMQP",&nbsp; &nbsp; &nbsp; "bindingAddress": "localhost",&nbsp; &nbsp; &nbsp; "port": "${qpid.amqp_port}",&nbsp; &nbsp; &nbsp; "protocols": [ "AMQP_1_0" ],&nbsp; &nbsp; &nbsp; "authenticationProvider": "anonymous",&nbsp; &nbsp; &nbsp; "virtualhostaliases" : [ {&nbsp; &nbsp; &nbsp; &nbsp; "name" : "nameAlias",&nbsp; &nbsp; &nbsp; &nbsp; "type" : "nameAlias"&nbsp; &nbsp; &nbsp; }, {&nbsp; &nbsp; &nbsp; &nbsp; "name" : "defaultAlias",&nbsp; &nbsp; &nbsp; &nbsp; "type" : "defaultAlias"&nbsp; &nbsp; &nbsp; }, {&nbsp; &nbsp; &nbsp; &nbsp; "name" : "hostnameAlias",&nbsp; &nbsp; &nbsp; &nbsp; "type" : "hostnameAlias"&nbsp; &nbsp; &nbsp; } ]&nbsp; &nbsp; },&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; "name" : "HTTP",&nbsp; &nbsp; &nbsp; "port" : "${qpid.http_port}",&nbsp; &nbsp; &nbsp; "protocols" : [ "HTTP" ],&nbsp; &nbsp; &nbsp; "authenticationProvider" : "anonymous"&nbsp; &nbsp; }&nbsp; ],&nbsp; "virtualhostnodes": [&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; "name": "default",&nbsp; &nbsp; &nbsp; "defaultVirtualHostNode": "true",&nbsp; &nbsp; &nbsp; "type": "Memory",&nbsp; &nbsp; &nbsp; "virtualHostInitialConfiguration": "{\"type\": \"Memory\" }"&nbsp; &nbsp; }&nbsp; ],&nbsp; "plugins" : [&nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; "type" : "MANAGEMENT-HTTP",&nbsp; &nbsp; &nbsp; "name" : "httpManagement"&nbsp; &nbsp; }&nbsp; ]}相关的 Gradle 依赖项:&nbsp; &nbsp; testImplementation("org.apache.qpid:qpid-broker-core:${Versions.qpidBroker}") // tested with 8.0.0&nbsp; &nbsp; testImplementation("org.apache.qpid:qpid-broker-plugins-amqp-1-0-protocol:${Versions.qpidBroker}")&nbsp; &nbsp; testImplementation("org.apache.qpid:qpid-broker-plugins-memory-store:${Versions.qpidBroker}")&nbsp; &nbsp; testImplementation("org.apache.qpid:qpid-broker-plugins-management-http:${Versions.qpidBroker}")&nbsp; &nbsp; testImplementation("org.springframework.boot:spring-boot-starter-webflux")&nbsp; &nbsp; testImplementation("org.projectreactor:reactor-spring:${Versions.reactorSpring}")启动代理的代码(Kotlin):&nbsp; &nbsp; private fun startQpidBroker() {&nbsp; &nbsp; &nbsp; &nbsp; val attributes: MutableMap<String, Any> = HashMap()&nbsp; &nbsp; &nbsp; &nbsp; val initialConfig = EmbeddedAMQPBroker::class.java.classLoader.getResource("qpid-config.json")!!&nbsp; &nbsp; &nbsp; &nbsp; attributes["type"] = "Memory"&nbsp; &nbsp; &nbsp; &nbsp; attributes["initialConfigurationLocation"] = initialConfig.toExternalForm()&nbsp; &nbsp; &nbsp; &nbsp; attributes["startupLoggedToSystemOut"] = true&nbsp; &nbsp; &nbsp; &nbsp; System.setProperty("qpid.amqp_port", "$amqpPort")&nbsp; &nbsp; &nbsp; &nbsp; System.setProperty("qpid.http_port", "$httpPort")&nbsp; &nbsp; &nbsp; &nbsp; // needed to avoid "AMQP precondition failed" due to durable message being sent to non-durable queues&nbsp; &nbsp; &nbsp; &nbsp; System.setProperty("qpid.tests.mms.messagestore.persistence", "true")&nbsp; &nbsp; &nbsp; &nbsp; broker.startup(attributes)&nbsp; &nbsp; }删除/创建队列的代码:&nbsp; &nbsp; private fun recreateQueue(queueName: String) {&nbsp; &nbsp; &nbsp; &nbsp; val client = WebClient.create("http://localhost:${EmbeddedAMQPBroker.httpPort}");&nbsp; &nbsp; &nbsp; &nbsp; try {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; client.method(HttpMethod.DELETE)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .uri("/api/latest/queue/default/$queueName")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .retrieve()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toBodilessEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .block()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .statusCode&nbsp; &nbsp; &nbsp; &nbsp; } catch (e: WebClientResponseException) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (e.statusCode != HttpStatus.NOT_FOUND) { // queue might not yet exist so 404 is acceptable&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw e&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; client.method(HttpMethod.PUT)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .uri("/api/latest/queue/default/default/$queueName")&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .body(BodyInserters.fromValue(mapOf("name" to queueName, "type" to "standard")))&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .retrieve()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .toBodilessEntity()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .block()&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; .statusCode&nbsp; &nbsp; }

至尊宝的传说

我设法通过使用连接工厂解决了这个问题:&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; @Autowired&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ConnectionFactory factory;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; ....&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; factory.setHost("localhost");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; factory.setPort(qpid_server_port);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; String queue = "queue-x";&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; channel.queueDeclare(queue, true, false, false, null);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; //channel.queueBind(queue, "exchange-x" , "routing-key-x");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; } catch (Exception e) {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }
随时随地看视频慕课网APP

相关分类

Java
我要回答