概述
RocketMQ 提供有控制台及一系列控制台命令,用于管理员对主题,集群,broker 等信息的管理;
进入 RocketMQ 的bin 目录,可以看到 mqadmin 脚本文件。
执行 mqadmin 脚本显示如下:
显示了 mqadmin 命令支持的所有操作。
如果想具体查新某一个操作的详细命令,可以使用
mqadmin help 命令名称
比如:mqadmin help updateTopic
查看 mqadmin脚本
可以发现 mqadmin 的命令调用的是 tools 命令,设置的启动类为 org.apache.rocketmq.tools.command.MQAdminStartup 。
tools 模块结构
MQAdminStartup 启动类
public static void main(String[] args) { main0(args, null); }public static void main0(String[] args, RPCHook rpcHook) { System.setProperty(RemotingCommand.REMOTING_VERSION_KEY, Integer.toString(MQVersion.CURRENT_VERSION)); //PackageConflictDetect.detectFastjson(); initCommand(); try { initLogback(); switch (args.length) { case 0: printHelp(); break; case 2: if (args[0].equals("help")) { SubCommand cmd = findSubCommand(args[1]); if (cmd != null) { Options options = ServerUtil.buildCommandlineOptions(new Options()); options = cmd.buildCommandlineOptions(options); if (options != null) { ServerUtil.printCommandLineHelp("mqadmin " + cmd.commandName(), options); } } else { System.out.printf("The sub command %s not exist.%n", args[1]); } break; } case 1: default: SubCommand cmd = findSubCommand(args[0]); if (cmd != null) { String[] subargs = parseSubArgs(args); Options options = ServerUtil.buildCommandlineOptions(new Options()); final CommandLine commandLine = ServerUtil.parseCmdLine("mqadmin " + cmd.commandName(), subargs, cmd.buildCommandlineOptions(options), new PosixParser()); if (null == commandLine) { return; } if (commandLine.hasOption('n')) { String namesrvAddr = commandLine.getOptionValue('n'); System.setProperty(MixAll.NAMESRV_ADDR_PROPERTY, namesrvAddr); } cmd.execute(commandLine, options, rpcHook); } else { System.out.printf("The sub command %s not exist.%n", args[0]); } break; } } catch (Exception e) { e.printStackTrace(); } }
1、首先调用initCommand() 方法加载所有的命令。
2、初始化日志
3、判断启动该类main 方法传入的参数。
3.1 如果没有参数,则打印帮助信息。
3.2 如果参数为2个,并且第一个是 help,第二个参数是initCommand() 加载的命令名称,则调用 ServerUtil.printCommandLineHelp() 方法打印指定命令的帮助信息。
3.3 如果参赛为一个、或2个,并且第一个参数不为 help,或多个。并且第一个参赛为 initCommand() 加载的命令,则调用 该initCommand() 加载类中的 execute() 方法。
cmd.execute(commandLine, options, rpcHook);
initCommand() 方法
public static void initCommand() { initCommand(new UpdateTopicSubCommand()); initCommand(new DeleteTopicSubCommand()); initCommand(new UpdateSubGroupSubCommand()); initCommand(new DeleteSubscriptionGroupCommand()); initCommand(new UpdateBrokerConfigSubCommand()); initCommand(new UpdateTopicPermSubCommand()); initCommand(new TopicRouteSubCommand()); initCommand(new TopicStatusSubCommand()); initCommand(new TopicClusterSubCommand()); initCommand(new BrokerStatusSubCommand()); initCommand(new QueryMsgByIdSubCommand()); initCommand(new QueryMsgByKeySubCommand()); initCommand(new QueryMsgByUniqueKeySubCommand()); initCommand(new QueryMsgByOffsetSubCommand()); initCommand(new PrintMessageSubCommand()); initCommand(new PrintMessageByQueueCommand()); initCommand(new SendMsgStatusCommand()); initCommand(new BrokerConsumeStatsSubCommad()); initCommand(new ProducerConnectionSubCommand()); initCommand(new ConsumerConnectionSubCommand()); initCommand(new ConsumerProgressSubCommand()); initCommand(new ConsumerStatusSubCommand()); initCommand(new CloneGroupOffsetCommand()); initCommand(new ClusterListSubCommand()); initCommand(new TopicListSubCommand()); initCommand(new UpdateKvConfigCommand()); initCommand(new DeleteKvConfigCommand()); initCommand(new WipeWritePermSubCommand()); initCommand(new ResetOffsetByTimeCommand()); initCommand(new UpdateOrderConfCommand()); initCommand(new CleanExpiredCQSubCommand()); initCommand(new CleanUnusedTopicCommand()); initCommand(new StartMonitoringSubCommand()); initCommand(new StatsAllSubCommand()); initCommand(new AllocateMQSubCommand()); initCommand(new CheckMsgSendRTCommand()); initCommand(new CLusterSendMsgRTCommand()); initCommand(new GetNamesrvConfigCommand()); initCommand(new UpdateNamesrvConfigCommand()); initCommand(new GetBrokerConfigCommand()); initCommand(new QueryConsumeQueueCommand()); initCommand(new SendMessageCommand()); initCommand(new ConsumeMessageCommand()); }
丛类名中可以看出跟上面控制台 执行 mqadmin
指令输出命令的名字和这里的类名可以一一对应上。
initCommand 方法
protected static List<SubCommand> subCommandList = new ArrayList<SubCommand>();public static void initCommand(SubCommand command) { subCommandList.add(command); }
把 init 加载到一个List集合中。
SubCommand 接口定义
所有的操作命令都实现了 SubCommand 接口
public interface SubCommand { String commandName(); String commandDesc(); Options buildCommandlineOptions(final Options options); void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException; }
1、commandName() 命令名称
2、commandDesc()命令描述
3、buildCommandlineOptions() 构建命令解析器
4、execute() 执行命令
创建 Topic 源码分析
下面我们以创建 Topic 命令来分析实现原理。
updateTopic 命令是创建Topic的命令。
通过该命令可以查看 updateTopic 支持那么多参数。
下面我们来分析下 UpdateTopicPermSubCommand 类的实现
UpdateTopicPermSubCommand 解析
commandName()
@Overridepublic String commandName() { return "updateTopic"; }
命令名称
commandDesc()
@Overridepublic String commandDesc() { return "Update or create topic"; }
命令描述
buildCommandlineOptions()
@Overridepublic Options buildCommandlineOptions(Options options) { Option opt = new Option("b", "brokerAddr", true, "create topic to which broker"); opt.setRequired(false); options.addOption(opt); opt = new Option("c", "clusterName", true, "create topic to which cluster"); opt.setRequired(false); options.addOption(opt); opt = new Option("t", "topic", true, "topic name"); opt.setRequired(true); options.addOption(opt); opt = new Option("r", "readQueueNums", true, "set read queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("w", "writeQueueNums", true, "set write queue nums"); opt.setRequired(false); options.addOption(opt); opt = new Option("p", "perm", true, "set topic's permission(2|4|6), intro[2:W 4:R; 6:RW]"); opt.setRequired(false); options.addOption(opt); opt = new Option("o", "order", true, "set topic's order(true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("u", "unit", true, "is unit topic (true|false)"); opt.setRequired(false); options.addOption(opt); opt = new Option("s", "hasUnitSub", true, "has unit sub (true|false)"); opt.setRequired(false); options.addOption(opt); return options; }
从该方法中可以看到定义的命令及其说明。
execute() 方法
@Overridepublic void execute(final CommandLine commandLine, final Options options, RPCHook rpcHook) throws SubCommandException { DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook); defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis())); try { TopicConfig topicConfig = new TopicConfig(); topicConfig.setReadQueueNums(8); topicConfig.setWriteQueueNums(8); topicConfig.setTopicName(commandLine.getOptionValue('t').trim()); // readQueueNums if (commandLine.hasOption('r')) { topicConfig.setReadQueueNums(Integer.parseInt(commandLine.getOptionValue('r').trim())); } // writeQueueNums if (commandLine.hasOption('w')) { topicConfig.setWriteQueueNums(Integer.parseInt(commandLine.getOptionValue('w').trim())); } // perm if (commandLine.hasOption('p')) { topicConfig.setPerm(Integer.parseInt(commandLine.getOptionValue('p').trim())); } boolean isUnit = false; if (commandLine.hasOption('u')) { isUnit = Boolean.parseBoolean(commandLine.getOptionValue('u').trim()); } boolean isCenterSync = false; if (commandLine.hasOption('s')) { isCenterSync = Boolean.parseBoolean(commandLine.getOptionValue('s').trim()); } int topicCenterSync = TopicSysFlag.buildSysFlag(isUnit, isCenterSync); topicConfig.setTopicSysFlag(topicCenterSync); boolean isOrder = false; if (commandLine.hasOption('o')) { isOrder = Boolean.parseBoolean(commandLine.getOptionValue('o').trim()); } topicConfig.setOrder(isOrder); if (commandLine.hasOption('b')) { String addr = commandLine.getOptionValue('b').trim(); defaultMQAdminExt.start(); defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); if (isOrder) { String brokerName = CommandUtil.fetchBrokerNameByAddr(defaultMQAdminExt, addr); String orderConf = brokerName + ":" + topicConfig.getWriteQueueNums(); defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf, false); System.out.printf("%s", String.format("set broker orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf.toString())); } System.out.printf("create topic to %s success.%n", addr); System.out.printf("%s", topicConfig); return; } else if (commandLine.hasOption('c')) { String clusterName = commandLine.getOptionValue('c').trim(); defaultMQAdminExt.start(); Set<String> masterSet = CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName); for (String addr : masterSet) { defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig); System.out.printf("create topic to %s success.%n", addr); } if (isOrder) { Set<String> brokerNameSet = CommandUtil.fetchBrokerNameByClusterName(defaultMQAdminExt, clusterName); StringBuilder orderConf = new StringBuilder(); String splitor = ""; for (String s : brokerNameSet) { orderConf.append(splitor).append(s).append(":") .append(topicConfig.getWriteQueueNums()); splitor = ";"; } defaultMQAdminExt.createOrUpdateOrderConf(topicConfig.getTopicName(), orderConf.toString(), true); System.out.printf("set cluster orderConf. isOrder=%s, orderConf=[%s]", isOrder, orderConf); } System.out.printf("%s", topicConfig); return; } ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options); } catch (Exception e) { throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e); } finally { defaultMQAdminExt.shutdown(); } }
从上面代码中可以看出,很大一部分代码都是解析 commandLine 参数。
解析出来的参数来填充 TopicConfig 对象。
然后调用 DefaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig) 方法来创建 Topic。
从上面的代码中可以看出 -b 和 -c 参数只能有一个生效。
-b 参数是在指定的 broker 上创建 topic
-c 是在指定的集群上每一个 broker 创建 topic。
优先判断的是 -b 参数,如果指定 -b 参数就会在指定的 broker 上创建,而不会在 -c 指定的集群上创建。
作者:jijs
链接:https://www.jianshu.com/p/2aa73bf5ce3f