虚拟电脑可以做网站吗,网络工程好找工作吗,批量导文章到wordpress,房产网签备案查询延迟队列
概念
延迟队列#xff08;Delayed Queue#xff09;#xff0c;即消息被发送以后#xff0c;并不想让消费者立刻拿到消息#xff0c;而是等待特定时间后#xff0c;消费者才能拿到这个消息进行消费
应用场景
延迟队列的使用场景有很多#xff0c;比如…延迟队列概念延迟队列Delayed Queue即消息被发送以后并不想让消费者立刻拿到消息而是等待特定时间后消费者才能拿到这个消息进行消费应用场景延迟队列的使用场景有很多比如智能家居用户希望通过手机远程遥控家里的智能设备在指定的时间进行工作这时候就可以将用户指令发送到延迟队列当指令设定的时间到了再将指令推送到智能设备。日常管理预定会议后需要在会议开始前15分钟提醒参会人参加会议用户注册成功后7天后发送短信提高用户活跃度等……RabbitMQ本身没有直接支持延迟队列的功能但是可以通过上一篇文章所介绍的TTL死信队列的方式组合模拟出延迟队列的功能。假设一个应用中需要将每条消息都设置为10秒的延迟生产者通过nomal_exchange普通交换机这个交换机将发送的消息存储在normal_queue普通队列这个队列中。消费者订阅的并非是normal_queue而是dlx_queue死信队列这个队列。当消息从normal_queue这个队列中过期之后被存入dlx_queue这个队列中消费者就恰巧消费到延迟10秒的这条消息。TTL死信队列实现队列TTL死信队列先删除掉我们原来用于测试死信的normal队列(不删除原队列而随意修改队列属性会报异常):声明队列、交换机和绑定关系public static final String NORMAL_QUEUE normal.queue; public static final String NORMAL_EXCHANGE normal.exchange; public static final String DL_QUEUE dl.queue; public static final String DL_EXCHANGE dl.exchange;Bean(normalQueue) public Queue normalQueue(){ return QueueBuilder.durable(Contants.NORMAL_QUEUE) .deadLetterExchange(Contants.DL_EXCHANGE) .deadLetterRoutingKey(dlx) .maxLength(10L) .ttl(20000) .build(); } Bean(normalExchange) public DirectExchange normalExchange(){ return ExchangeBuilder.directExchange(Contants.NORMAL_EXCHANGE).build(); } Bean(normalBinding) public Binding normalBinding(Qualifier(normalQueue) Queue queue, Qualifier(normalExchange)DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with(normal); } //死信交换机和队列 Bean(dlQueue) public Queue dlQueue(){ return QueueBuilder.durable(Contants.DL_QUEUE).build(); } Bean(dlExchange) public DirectExchange dlExchange(){ return ExchangeBuilder.directExchange(Contants.DL_EXCHANGE).build(); } Bean(dlBinding) public Binding dlBinding(Qualifier(dlQueue) Queue queue, Qualifier(dlExchange)DirectExchange directExchange){ return BindingBuilder.bind(queue).to(directExchange).with(dlx); }发送消息RequestMapping(/delay) public String delay(){ rabbitTemplate.convertAndSend(Contants.NORMAL_EXCHANGE,normal,ttl test 30s...,message - { message.getMessageProperties().setExpiration(30000);//单位是毫秒设置过期时间为30s return message; }); rabbitTemplate.convertAndSend(Contants.NORMAL_EXCHANGE,normal,ttl3 test 10s...,message - { message.getMessageProperties().setExpiration(10000);//单位是毫秒设置过期时间为10s return message; }); System.out.printf(%tc 消息发送成功,new Date()); System.out.println(); return 消息发送成功; }ListenerComponent public class DLListener { RabbitListener(queues Contants.DL_QUEUE) public void dlHandleMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf([dl.queue] %tc 接收到消息:%s,deliveryTag:%d\n,new Date(),new String(message.getBody()), message.getMessageProperties().getDeliveryTag()); System.out.println(); } }调用接口发送请求127.0.0.1:8080/producer/delay观察日志可以发现两条消息都会在队列过期之后被收到也就是说队列里的消息无法自由地自定义过期时间。消息TTL死信队列还是先删除原来的队列声明新的队列 Bean(normalQueue) public Queue normalQueue(){ return QueueBuilder.durable(Contants.NORMAL_QUEUE) .deadLetterExchange(Contants.DL_EXCHANGE) .deadLetterRoutingKey(dlx) .build(); }交换机和绑定关系等复用上面的内容即可……发送消息和Listener也还是使用原来的即可……访问接口127.0.0.1:8080/producer/delay可以看到因为设置消息TTL是在消费者进行消费时进行判定而队列是先进先出的所以延时10s的消息会在30s的消息出队后再被判定为过期。通过控制台也可以发现这两条消息都是30s后才被消费的可以发现上面的两种方式1、队列TTL死信队列需要保证队列中的消息的延迟时间相同如果需要不同的延迟时间就需要创建不同的队列。2、消息TTL死信队列则需要保证延迟时间短的消息比延迟时间长的消息更先入队。都不太灵活那么有没有更灵活的方式呢我们可以使用延迟队列插件来解决这个问题。延迟队列插件实现RabbitMQ官方也提供了一个延迟的插件来实现延迟的功能。参考Scheduling Messages with RabbitMQ | RabbitMQ接下来具体进行操作安装延迟队列插件插件下载地址Releases · rabbitmq/rabbitmq-delayed-message-exchange · GitHub进去之后选择RabbitMQ对应的版本进行下载即可安装之后将文件拖到下图中的路径底下两者取其一即可启动插件拖完之后查看插件列表是否下载成功查看插件列表rabbitmq-plugins list启动插件rabbitmq-plugins enable rabbitmq_delayed_message_exchange重启服务service rabbitmq-server restart验证插件在RabbitMQ管理平台查看新建交换机时是否有延迟消息选项如果有就说明延迟消息插件已经正常运行了。基于延迟插件实现声明交换机、队列和绑定关系通过delayed方法声明当前交换机是能够支持发送延时消息的Configuration public class DelayConfig { Bean(delayQueue) public Queue delayQueue(){ return QueueBuilder.durable(Contants.DELAY_QUEUE).build(); } Bean(delayExchange) public Exchange delayExchange(){ return ExchangeBuilder.directExchange(Contants.DELAY_EXCHANGE).delayed().build(); } Bean(delayBinding) public Binding delayBinding(Qualifier(delayQueue)Queue queue, Qualifier(delayExchange)Exchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(delay).noargs(); } }生产者RequestMapping(/delay2) public String delay2(){ rabbitTemplate.convertAndSend(Contants.DELAY_EXCHANGE,delay,delay test 30s...,message - { message.getMessageProperties().setDelayLong(30000L);//单位是毫秒设置延迟时间为30s return message; }); rabbitTemplate.convertAndSend(Contants.DELAY_EXCHANGE,delay,delay test 10s...,message - { message.getMessageProperties().setDelayLong(10000L);//单位是毫秒设置延迟时间为10s return message; }); System.out.printf(%tc 消息发送成功,new Date()); System.out.println(); return 消息发送成功; }消费者Component public class DelayListener { RabbitListener(queues Contants.DELAY_QUEUE) public void delayHandleMessage(Message message, Channel channel) throws Exception { //消费者逻辑 System.out.printf([delay.queue] %tc 接收到消息:%s\n,new Date(),new String(message.getBody())); System.out.println(); } }运行程序访问接口127.0.0.1:8080/producer/delay2此时消息就准确延时发送了常见面试题介绍下RabbitMQ的延迟队列延迟队列是一个特殊的队列消息发送之后并不立即给消费者而是等待特定的时间才发送给消费者延迟队列的应用场景有很多比如订单在十分钟内未支付自动取消用户注册成功后3天后发调查问卷用户发起退款24小时后商家未处理则默认同意自动退款……但RabbitMQ本身并没有直接实现延迟队列通常有两种方法TTL死信队列组合的方式使用官方提供的延迟插件实现延迟功能二者对比1、基于死信实现的延迟队列优点灵活不需要额外插件支持缺点存在消息顺序问题并且需要额外的逻辑来处理死信队列的消息增加了系统的复杂性2、基于插件实现的延迟队列优点通过插件可以直接创建延迟队列简化延迟消息的实现而且避免了死信的时序问题缺点需要依赖特定的插件有运维工作和只适用特定版本事务RabbitMQ是基于AMQP协议实现的该协议实现了事务机制因此RabbitMQ也支持事务机制。SpringAMQP也提供了对事务相关的操作。RabbitMQ事务允许开发者确保消息的发送和接收时原子性的要么全部成功·要么全部失败。代码演示演示之前我们需要将发送方消息确认的配置注释掉这里一定要注释掉否则会产生信道冲突一个信道不可以即开启发送方消息确认模式又开启事务模式配置事务管理器Bean(transRabbitTemplate) public RabbitTemplate transRabbitTemplate(ConnectionFactory connectionFactory){ RabbitTemplate rabbitTemplate new RabbitTemplate(connectionFactory); //开启事务 rabbitTemplate.setChannelTransacted(true); return rabbitTemplate; } Bean public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory){ return new RabbitTransactionManager(connectionFactory); }声明队列//事务 public static final String TRANS_QUEUE trans.queue;Bean(transQueue) public Queue transQueue(){ return QueueBuilder.durable(Contants.TRANS_QUEUE).build(); }生产者Transactional RequestMapping(/trans) public String trans(){ System.out.println(trans test...); transRabbitTemplate.convertAndSend(,Contants.TRANS_QUEUE,trans test 1...); int nums 10/0; transRabbitTemplate.convertAndSend(,Contants.TRANS_QUEUE,trans test 2...); return 消息发送成功; }可以看到两条消息均发送失败把Transactional去掉第一条消息发送成功将开启事务这一行注释掉重新发送消息依然收到了第一条消息注释掉事务管理部分消息1发送成功消息分发概念RabbitMQ队列拥有多个消费者时队列会把收到的消息分派给不同的消费者每条消息只会发送给订阅列表里的一个消费者。这种方式非常适合扩展如果现在负载加重那么只需要创建更多的消费者来消费处理消息即可。默认情况下RabbitMQ是以轮询的方法进行分发的而不管消费者是否已经消费并已经确认了消息这种方式是不太合理的试想一下如果某些消费者消费速度慢而其他消费者速度快就可能会导致某些消费者消息积压某些消费者空闲进而应用整体的吞吐量下降。如何处理呢我们可以使用前面章节讲到的channel.basicQos(int prefetehCount)方法来限制当前信道上消费者所能保持的最大未确认消息数量。比如消费端调用了channelbasicQos5RabbitMQ会为该消费者计数发送一条消息计数1消费一条消息计数-1当达到了设定的上限RabbitMQ就不会再向它发送消息了知道消费者确认了某条消息。类似于TCP/IP中的“滑动窗口”。应用场景消息分发的应用场景如下限流非公平分发限流如下使用场景订单系统每秒最多处理5000请求正常情况下订单系统可以正常满足需求但是再秒杀时间点请求瞬间增多每秒1w个请求如果这些请求全部通过MQ发送到订单系统无疑会把订单系统压垮。RabbitMQ提供了限流机制可以控制消费端一次只拉取N个请求。通过设置prefetchCount参数同时也必须要设置消息应答方式为手动应答。prefetchCount控制消费者从队列中预取消息的数量以此来实现流量控制和负载均衡。代码示例配置prefetch参数设置应答方式为手动应答spring: application: name: rabbit-extensions-demo #配置RabbitMQ的基本信息 #amqp://username:passwordIp:port/virtual-host rabbitmq: addresses: amqp://admin:admin106.52.188.165:5672/extension listener: simple: # acknowledge-mode: none #消息接收确认 # acknowledge-mode: auto acknowledge-mode: manual retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时长为5秒 max-attempts: 5 # 最大重试次数 prefetch: 5 # publisher-confirm-type: correlated #消息发送确认配置交换机、队列和绑定关系Configuration public class DelayConfig { Bean(delayQueue) public Queue delayQueue(){ return QueueBuilder.durable(Contants.DELAY_QUEUE).build(); } Bean(delayExchange) public Exchange delayExchange(){ return ExchangeBuilder.directExchange(Contants.DELAY_EXCHANGE).delayed().build(); } Bean(delayBinding) public Binding delayBinding(Qualifier(delayQueue)Queue queue, Qualifier(delayExchange)Exchange delayExchange){ return BindingBuilder.bind(queue).to(delayExchange).with(delay).noargs(); } }发送消息一次发送20条消息RequestMapping(/qos) public String qos(){ System.out.println(qos test...); //发送普通消息 for (int i 0; i 20; i) { rabbitTemplate.convertAndSend(Contants.QOS_EXCHANGE,qos,qos test...); } return 消息发送成功; }消费者监听Component public class QOSListener { RabbitListener(queues Contants.QOS_QUEUE) public void handleMessage(Message message, Channel channel) throws Exception { long deliveryTag message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf(消费者接收到消息:%s,deliveryTag:%d\n, new String(message.getBody()), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println(模拟业务逻辑处理); // int num 3 / 0; // Thread.sleep(2000); System.out.println(业务逻辑处理完成); //肯定确认 //先将手动确认注掉不然会直接消费掉 //channel.basicAck(deliveryTag, false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, true); } } }测试:调用接口127.0.0.1:8080/producer/qos可以看到控制台只打印了5条消息观察管理平台可以发现待发送15条未确认5条。因为代码没有手动ack。把prefetch5注掉再观察运行结果从日志和控制台上可以看到消费者会一次性把这20条消息全部收到管理平台负载均衡非公平分发我们也可以使用此配置来实现“负载均衡”。如下图所示在有两个消费者的情况下一个消费者处理任务非常快另一个非常慢就会造成一个消费者一直很忙而另一个消费者很闲。这是因为RabbitMQ只是在消息进入队列时分派消息。他不考虑消费者未确认消息的数量我们可以使用设置prefetch1的方式告诉RabbitMQ一次只给消费者一条消息也就是说在处理并确认前一条消息之前不要向该消费者发送新消息。相反它会将它分派给下一个不忙的消费者。代码示例修改配置spring: application: name: rabbit-extensions-demo #配置RabbitMQ的基本信息 #amqp://username:passwordIp:port/virtual-host rabbitmq: addresses: amqp://admin:admin106.52.188.165:5672/extension listener: simple: # acknowledge-mode: none #消息接收确认 # acknowledge-mode: auto acknowledge-mode: manual retry: enabled: true # 开启消费者失败重试 initial-interval: 5000ms # 初始失败等待时长为5秒 max-attempts: 5 # 最大重试次数 prefetch: 1 # publisher-confirm-type: correlated #消息发送确认修改消费者代码使用Thread.sleep()模拟一个工作快一个工作慢的场景Component public class QOSListener { RabbitListener(queues Contants.QOS_QUEUE) public void handleMessage(Message message, Channel channel) throws Exception { long deliveryTag message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf(消费者接收到消息:%s,deliveryTag:%d\n, new String(message.getBody()), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println(模拟业务逻辑处理); // int num 3 / 0; Thread.sleep(2000); System.out.println(业务逻辑处理完成); //肯定确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, true); } } RabbitListener(queues Contants.QOS_QUEUE) public void handleMessage2(Message message, Channel channel) throws Exception { long deliveryTag message.getMessageProperties().getDeliveryTag(); try { //消费者逻辑 System.out.printf(消费者2接收到消息:%s,deliveryTag:%d\n, new String(message.getBody()), message.getMessageProperties().getDeliveryTag()); //进行业务逻辑处理 System.out.println(模拟业务逻辑处理); // int num 3 / 0; Thread.sleep(1000); System.out.println(业务逻辑处理完成); //肯定确认 channel.basicAck(deliveryTag, false); } catch (Exception e) { //否定确认 channel.basicNack(deliveryTag, false, true); } } }可以看到消费者2因为消费速率快会比消费者1收到更多的消息