JMS 回调返回重复的 JMSMessageID

我目前正在尝试使用纯 java 将一堆简单消息发送到队列。


public AtomicReference<Message> doSend(String message, String queue){

    try (JMSContext context = connectionFactory.createContext()) {

        TextMessage textMessage = context.createTextMessage(message);            

        final AtomicReference<Message> msg = new AtomicReference<>();

        msg.set(textMessage);

        log.info("Sending message to queue {}", queue);

        context.createProducer().send(createDestination(context, queue), textMessage);

        log.info("Message sent to queue {}, messageId provided {}", queue, msg.get().getJMSMessageID());

        return msg;

    }

    catch (Exception e) {

        log.error("Failed to send message to queue",e);

        throw new SipJmsException("Failed to send message to queue", e);

    }

}


private Destination createDestination(JMSContext context, String queue){

    log.debug("Creating destination queue {} connection",queue);

    return context.createQueue(queue);

}

我连续发送 N 条消息,日志显示 JMSMessageId 始终生成相同。


[main] Sending message to queue TEST_QUEUE

[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d81619824

[main] Sending message to queue TEST_QUEUE

[main] Message sent to queue TEST_QUEUE, messageId provided ID:414d5120444556494d53514d20202020551c3f5d83619824

ETC。


据我所知,JMSMessageId 应该是唯一的,它的冲突会导致问题。


O'Reily 的书说:


JMSMessageID 是一个唯一标识消息的字符串值。标识符的唯一性取决于供应商。JMSMessageID 对于需要对消息进行唯一索引的 JMS 消费者应用程序中的历史存储库很有用。与 JMSCorrelationID 结合使用,JMSMessageID 也可用于关联消息:String messageid = message.getJMSMessageID();


那么,为什么 MessageId 不是唯一的呢?(应用程序运行之间甚至相同)。


四季花海
浏览 167回答 3
3回答

慕斯王

消息 ID 是唯一的,我用以下标记了不同的编号*:414d5120444556494d53514d20202020551c3f5d81619824&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;*414d5120444556494d53514d20202020551c3f5d83619824

撒科打诨

我创建了一个简单的 JMS 程序,它将 5 条消息放入一个队列,每次放入后,它都会输出 JMSMessageId。示例输出:2019/08/13 19:15:18.824 MQTestJMS11x5: testConn: successfully connected.2019/08/13 19:15:18.845 MQTestJMS11x5: testConn: successfully opened TEST.Q12019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg: Sending request to queue:///TEST.Q12019/08/13 19:15:18.845 MQTestJMS11x5: sendMsg:&nbsp;2019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d242011022019/08/13 19:15:18.887 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d242011032019/08/13 19:15:18.888 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d242011042019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d242011052019/08/13 19:15:18.889 MQTestJMS11x5: sendMsg: Sent message: MessageId=ID:414d51204d515754312020202020202028cd525d242011062019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Closed session2019/08/13 19:15:18.892 MQTestJMS11x5: testConn: Stopped connection2019/08/13 19:15:18.893 MQTestJMS11x5: testConn: Closed connection请注意,每个消息 ID 都是唯一的。这是生成输出的 JMS 程序:import java.text.SimpleDateFormat;import java.util.Date;import java.util.Hashtable;import javax.jms.*;import com.ibm.mq.jms.*;import com.ibm.msg.client.wmq.WMQConstants;/**&nbsp;* Program Name&nbsp;*&nbsp; MQTestJMS11x5&nbsp;*&nbsp;* Description&nbsp;*&nbsp; This java JMS class will connect to a remote queue manager and put 5 messages to a queue.&nbsp;*&nbsp;* Sample Command Line Parameters&nbsp;*&nbsp; -m MQA1 -h 127.0.0.1 -p 1414 -c TEST.CHL -q TEST.Q1 -u UserID -x Password&nbsp;*&nbsp;* @author Roger Lacroix&nbsp;*/public class MQTestJMS11x5{&nbsp; &nbsp;private static final SimpleDateFormat&nbsp; LOGGER_TIMESTAMP = new SimpleDateFormat("yyyy/MM/dd HH:mm:ss.SSS");&nbsp; &nbsp;private Hashtable<String,String> params;&nbsp; &nbsp;private MQQueueConnectionFactory mqQCF = null;&nbsp; &nbsp;/**&nbsp; &nbsp; * The constructor&nbsp; &nbsp; */&nbsp; &nbsp;public MQTestJMS11x5()&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; super();&nbsp; &nbsp; &nbsp; params = new Hashtable<String,String>();&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * Make sure the required parameters are present.&nbsp; &nbsp; * @return true/false&nbsp; &nbsp; */&nbsp; &nbsp;private boolean allParamsPresent()&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; boolean b = params.containsKey("-h") && params.containsKey("-p") &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params.containsKey("-c") && params.containsKey("-m") &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params.containsKey("-q") &&&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params.containsKey("-u") && params.containsKey("-x");&nbsp; &nbsp; &nbsp; if (b)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; Integer.parseInt((String) params.get("-p"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (NumberFormatException e)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; b = false;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; return b;&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * Extract the command-line parameters and initialize the MQ variables.&nbsp; &nbsp; * @param args&nbsp; &nbsp; * @throws IllegalArgumentException&nbsp; &nbsp; */&nbsp; &nbsp;private void init(String[] args) throws IllegalArgumentException&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; if (args.length > 0 && (args.length % 2) == 0)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for (int i = 0; i < args.length; i += 2)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; params.put(args[i], args[i + 1]);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throw new IllegalArgumentException();&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; if (allParamsPresent())&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mqQCF = new MQQueueConnectionFactory();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mqQCF.setQueueManager((String) params.get("-m"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mqQCF.setHostName((String) params.get("-h"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mqQCF.setChannel((String) params.get("-c"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; mqQCF.setTransportType(WMQConstants.WMQ_CM_CLIENT);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;mqQCF.setPort(Integer.parseInt((String) params.get("-p")));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; catch (NumberFormatException e)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;mqQCF.setPort(1414);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (JMSException e)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger(e.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new IllegalArgumentException();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (Exception e)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger(e.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; e.printStackTrace();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; throw new IllegalArgumentException();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; else&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;throw new IllegalArgumentException();&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * Test the connection to the queue manager.&nbsp; &nbsp; * @throws MQException&nbsp; &nbsp; */&nbsp; &nbsp;private void testConn()&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; QueueConnection conn = null;&nbsp; &nbsp; &nbsp; QueueSession session = null;&nbsp; &nbsp; &nbsp; Queue myQ = null;&nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn = mqQCF.createQueueConnection((String) params.get("-u"), (String) params.get("-x"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn.start();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;session = conn.createQueueSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("successfully connected.");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;myQ = session.createQueue((String) params.get("-q"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("successfully opened "+ (String) params.get("-q"));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQDestination mqd = (MQDestination) myQ;&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;mqd.setTargetClient(WMQConstants.WMQ_CLIENT_JMS_COMPLIANT);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sendMsg( session, myQ);&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; catch (JMSException e)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("getLinkedException()=" + e.getLinkedException());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger(e.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; catch (Exception e)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger(e.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;e.printStackTrace();&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; finally&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (session != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;session.close();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("Closed session");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("session.close() : " + ex.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (conn != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn.stop();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("Stopped connection");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("connection.stop() : " + ex.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (conn != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;conn.close();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("Closed connection");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("connection.close() : " + ex.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * Send a message to a queue.&nbsp; &nbsp; * @throws MQException&nbsp; &nbsp; */&nbsp; &nbsp;private void sendMsg(QueueSession session, Queue myQ) throws JMSException&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; QueueSender sender = null;&nbsp; &nbsp; &nbsp; TextMessage msg = null;&nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("Sending request to " + myQ.getQueueName());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sender = session.createSender(myQ);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;for (int i=0; i < 5; i++)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg = session.createTextMessage();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; msg.setText("This is test message # " + (i+1));&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; sender.send(msg);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("Sent message: MessageId="+msg.getJMSMessageID());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; finally&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;try&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; if (sender != null)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp;sender.close();&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;catch (Exception ex)&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; &nbsp; &nbsp; &nbsp; MQTestJMS11x5.logger("sender.close() : " + ex.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;}&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * A simple logger method&nbsp; &nbsp; * @param data&nbsp; &nbsp; */&nbsp; &nbsp;public static void logger(String data)&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; String className = Thread.currentThread().getStackTrace()[2].getClassName();&nbsp; &nbsp; &nbsp; // Remove the package info.&nbsp; &nbsp; &nbsp; if ( (className != null) && (className.lastIndexOf('.') != -1) )&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;className = className.substring(className.lastIndexOf('.')+1);&nbsp; &nbsp; &nbsp; System.out.println(LOGGER_TIMESTAMP.format(new Date())+" "+className+": "+Thread.currentThread().getStackTrace()[2].getMethodName()+": "+data);&nbsp; &nbsp;}&nbsp; &nbsp;/**&nbsp; &nbsp; * mainline&nbsp; &nbsp; * @param args&nbsp; &nbsp; */&nbsp; &nbsp;public static void main(String[] args)&nbsp; &nbsp;{&nbsp; &nbsp; &nbsp; MQTestJMS11x5 write = new MQTestJMS11x5();&nbsp; &nbsp; &nbsp; try&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;write.init(args);&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;write.testConn();&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; catch (IllegalArgumentException e)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger("Usage: java MQTestJMS11x5 -m QueueManagerName -h host -p port -c channel -q JMS_Queue_Name -u UserID -x Password");&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.exit(1);&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; catch (Exception e)&nbsp; &nbsp; &nbsp; {&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;MQTestJMS11x5.logger(e.getLocalizedMessage());&nbsp; &nbsp; &nbsp; &nbsp; &nbsp;System.exit(1);&nbsp; &nbsp; &nbsp; }&nbsp; &nbsp; &nbsp; System.exit(0);&nbsp; &nbsp;}}

狐的传说

final&nbsp;AtomicReference<Message>&nbsp;msg&nbsp;=&nbsp;new&nbsp;AtomicReference<>();你为什么使用“最终”。将其删除并重试。
打开App,查看更多内容
随时随地看视频慕课网APP

相关分类

Java