正文
1.topic类型的Exchange
我们之前说过Topic类型的Exchange是direct类型的模糊查询模式,可以通过routkey来实现模糊消费message,topic的模糊匹配有两种模式:
1. 使用*来匹配一个单词
2.使用#来匹配0个或多个单词
我们来看代码
消费端
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient
{ class Program
{ private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = "39.**.**.**",
Port = 5672,
UserName = "root",
Password = "root",
VirtualHost = "/"
}; static void Main(string[] args)
{ var exchangeAll = "changeAll"; var queueman = "queueman"; var quemankey = "man.#"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false);
channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueman, exchangeAll, quemankey);
channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Byte[] body = ea.Body;
String message = Encoding.UTF8.GetString(body);
Console.WriteLine( message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}生产者代码
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchangeAll = "changeAll"; //性别.姓氏.头发长度
var keymanA = "man.chen.long"; var keymanB = "man.liu.long"; var keymanC = "woman.liu.long"; var keymanD = "woman.chen.short"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "topic", durable: true, autoDelete: false); var properties = channel.CreateBasicProperties();
properties.Persistent = true; //发布消息 channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanA,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanA));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanB,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanB));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanC,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanC));
channel.BasicPublish(exchange: exchangeAll,
routingKey: keymanD,
basicProperties: properties,
body: Encoding.UTF8.GetBytes(keymanD));
}
}
}
}
}我们先运行消费端再运行生产段,结果如下
消费端:
2.headers类型的exchange
生成者代码
using RabbitMQ.Client;using System;using System.Collections.Generic;using System.Text;using System.Threading;using System.Threading.Tasks;namespace RabbitMQConsole
{ class Program
{ static void Main(string[] args)
{
ConnectionFactory factory = new ConnectionFactory();
factory.HostName = "39.**.**.**";
factory.Port = 5672;
factory.VirtualHost = "/";
factory.UserName = "root";
factory.Password = "root"; var exchangeAll = "changeHeader"; using (var connection = factory.CreateConnection())
{ using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false); var properties = channel.CreateBasicProperties();
properties.Persistent = true;
properties.Headers = new Dictionary<string, object> {
{ "sex","man"}
}; //发布消息 channel.BasicPublish(exchange: exchangeAll,
routingKey: "",
basicProperties: properties,
body: Encoding.UTF8.GetBytes("hihihi"));
}
}
}
}
}消费端代码
using RabbitMQ.Client;using RabbitMQ.Client.Events;using System;using System.Collections.Generic;using System.Text;using System.Threading;namespace RabbitMQClient
{ class Program
{ private static readonly ConnectionFactory rabbitMqFactory = new ConnectionFactory()
{
HostName = "39.**.**.**",
Port = 5672,
UserName = "root",
Password = "root",
VirtualHost = "/"
}; static void Main(string[] args)
{ var exchangeAll = "changeHeader"; var queueman = "queueHeader"; using (IConnection conn = rabbitMqFactory.CreateConnection()) using (IModel channel = conn.CreateModel())
{
channel.ExchangeDeclare(exchangeAll, type: "headers", durable: true, autoDelete: false);
channel.QueueDeclare(queueman, durable: true, exclusive: false, autoDelete: false);
channel.QueueBind(queueman, exchangeAll, "",new Dictionary<string, object> { { "sex","man" } });
channel.BasicQos(prefetchSize: 0, prefetchCount: 50, global: false);
EventingBasicConsumer consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
Byte[] body = ea.Body;
String message = Encoding.UTF8.GetString(body);
Console.WriteLine( message);
channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};
channel.BasicConsume(queue: queueman, autoAck: false, consumer: consumer);
Console.ReadLine();
}
}
}
}