rabbitmq

RabbitMQ是实现了高级消息队列协议(AMQP)的开源消息代理软件(亦称面向消息的中间件)。RabbitMQ服务器是用Erlang语言编写的,而集群和故障转移是构建在开放电信平台框架上的。所有主要的编程语言均有与代理接口通讯的客户端库。

安装

以上操作均可以使用包管理工具完成

使用docker

docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management

核心概念

使用

JAVA客户端

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("192.168.182.129");
factory.setUsername("my");
factory.setPassword("123");
factory.setPort(5672);
factory.setVirtualHost("/");

Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare("queue1",false,false,false,null);
String msg = UUID.randomUUID().toString();
channel.basicPublish("","queue1",null,msg.getBytes());
DefaultConsumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:"+new String(body));

    }
};
channel.basicConsume("queue1",true,consumer);

SpringBoot

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
spring:
  rabbitmq:
    addresses: 192.168.182.129
    username: my
    password: 123
    virtual-host: /
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendUser(User user) throws Exception{
    CorrelationData correlationData = new CorrelationData(user.getUsername());
    rabbitTemplate.convertAndSend("user-exchange","user.abcd",user,correlationData);
}
spring.rabbitmq.listener.simple.concurrency=5
spring.rabbitmq.listener.simple.acknowledge-mode=auto
spring.rabbitmq.listener.simple.max-concurrency=10
spring.rabbitmq.listener.simple.prefetch=1
@RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "user-queue"),
        exchange = @Exchange(name = "user-exchange",type = "topic"),
        key = "user.#"
))
@RabbitHandler
public void onMessage(@Payload User user){
    // 当这里抛出异常,会自动进行重试
    log.info("on message:{}",user);
}

消息模型

点对点

批注 2020-03-12 151023

当有多个消费端时,mq会把消息公平分发到每个消费端(轮询)

工作队列

消息转发机制是平均分配,这样就会出现俩个消费者,由于每个消费者处理任务的效率不一,可以通过设置qos的方式来决定消费者的消费能力,从而达到资源的充分利用

channel.basicQos(1);

手动ack后的消费端,mq会继续发消息给它,这样就能达到消费速度更快的客户端消费更多数据

订阅模型-Fanout

Fanout exchange(扇型交换机)将消息路由给绑定到它身上的所有队列

批注 2020-03-12 181837

String exchangeName = "exchange1";
channel.exchangeDeclare(exchangeName,"fanout");
String msg = UUID.randomUUID().toString();
channel.basicPublish(exchangeName,"",null,msg.getBytes());

端a

String queueName = "queue1";
String exchangeName = "exchange1";
channel.queueDeclare(queueName, false, false, false, null);
channel.queueBind(queueName, exchangeName, "");
DefaultConsumer consumer = new DefaultConsumer(channel) {
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("消费者接收到消息:" + new String(body));
    }
};
channel.basicConsume(queueName,true,consumer);

端b

...
String queueName = "queue2";
...

订阅模型-Direct

Direct exchange(直连交换机)是根据消息携带的路由键(routing key)将消息投递给对应队列

20203121903

...
channel.basicPublish(exchangeName,"routing_key",null,msg.getBytes());
...
channel.queueBind(queueName, exchangeName, "routing_key");
...

订阅模型-Topic

Topic类型的Exchange与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。只不过Topic类型Exchange可以让队列在绑定Routing key 的时候使用通配符

202031311031

端a

channel.queueBind(queueName, exchangeName, "#.sms");

端b

channel.queueBind(queueName, exchangeName, "#.email");

当生产者的routingKey为log.sms时,消息会发送到端a

消息确认机制(ACK)

ACK:消费者通知RabbitMQ消息已经接收并且处理完毕了。RabbitMQ就可以删除该条消息了

DefaultConsumer consumer = new DefaultConsumer(channel){
    @Override
    public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
        System.out.println("接收到消息:"+new String(body));
        channel.basicAck(envelope.getDeliveryTag(),false);
    }
};
channel.basicConsume("queue1",false,consumer);

事务

try{
    channel.txSelect();
    String msg = UUID.randomUUID().toString();
    channel.basicPublish(exchangeName,"log.email",null,msg.getBytes());
    channel.txCommit();
}catch (Exception e){
    channel.txRollback();
}

保证幂等性

当引入异常重试机制时,如何保证同一条消息不被重复消费

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          initial-interval: 100ms
          enabled: true
          max-attempts: 3

解决这个问题,需要根据业务的具体情况来分析,可以:

全局消息ID

当消费者处理完一条消息之后,将这个消息ID记录下来,当一条新消息到来之后,要判断是否记录过这条消息的ID,如果是,不再继续往下处理

死信队列

当由于一些诸如队列满或者消息被拒绝等原因,这些消息将被移入到一个备胎队列,死信队列就是专门用来存放这些消息的队列

普通队列绑定私信队列

Map<String, Object> args = new HashMap<>(2);
// 死信队列交换机与死信队列路由键
args.put("deadExchangeName", deadExchangeName);
args.put("deadRoutingKey", deadRoutingKey);
Queue queue = new Queue("user_queue", true, false, false, args);

高可用

创建的 queue,无论元数据还是 queue 里的消息都会存在于多个实例上,就是说,每个 RabbitMQ 节点都有这个 queue 的一个完整镜像,写消息到 queue 的时候,都会自动把消息同步到多个实例的 queue 上

批注 2020-03-18 194849

顺序性

当多个consumer同时消费一个queue时,很有可能造成消费的顺序和存入的顺序不一致,解决方法是: 拆分多个 queue,每个 queue 一个 consumer