阿超
>
springboot-RabbitMQ发送短信
天不为人之恶寒也辍冬,地不为人之恶辽远也辍广。——《荀子》
常见名词
Virtual Hosts
——虚拟主机,一个虚拟主机下可有多个队列
Exchange
——交换机,分发消息到队列中
管理界面
使用默认账户guest
密码guest
登录RabbitMQ
管理界面
这里可以看到我们的端口和相关信息
15672
——管理界面
25672
——RabbitMQ
集群通信端口号
5672
——RabbitMQ
内部通信端口号
快速入门
引入依赖
1 2 3 4 5 6
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.10.0</version> </dependency>
|
简单队列
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| package com.ruben.mq.rabbitMQ.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
import java.nio.charset.StandardCharsets;
public class Send { private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) { channel.queueDeclare(QUEUE_NAME, false, false, false, null); String message = "Hello World!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes(StandardCharsets.UTF_8)); System.out.println(" [x] Sent '" + message + "'"); } } }
|
消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package com.ruben.mq.rabbitMQ.simple;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory; import com.rabbitmq.client.DeliverCallback;
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(QUEUE_NAME, false, false, false, null); System.out.println(" [*] Waiting for messages."); DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); }; channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
我们可以在管理界面看到队列和消息情况
我们再创建一个Virtual Host
,点击Add virtual host
我们点击这个Virtual Host
点击Set permission
来设置权限
然后创建队列
这里Durable
表示持久化到磁盘,Transient
表示队列只在内存中存储
这样我们就可以在创建连接时指定Virtual Host
了
MQ
确保消息不丢失
生产者->MQ
Ack
消息确认机制(MQ
收到消息后同步或异步的方式通知生产者)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
private static void ACKConfirmDemo() throws Exception { try (Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel()) { String msg = "Hino Supa"; channel.confirmSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8)); System.out.println("消息投递成功"); boolean result = channel.waitForConfirms(); if (result) { System.out.println("消息投递成功"); } else { System.out.println("消息投递失败"); } } }
|
事务形式
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
|
private static void transactionDemo() throws Exception { try (Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel()) { try { String msg = "Hino Supa"; channel.txSelect(); channel.basicPublish("", QUEUE_NAME, null, msg.getBytes(StandardCharsets.UTF_8)); channel.txCommit(); System.out.println("消息投递成功"); } catch (IOException e) { channel.txRollback(); e.printStackTrace(); } } }
|
MQ
->消费
RabbitMQ
必须要将消息消费成功后才会从mq
服务端中移除
Kafka
不管是消费成功还是失败,都不会立即从mq
服务端中移除,使用offset
记录消息消费情况
工作队列
我们的消费者可根据自身能力调整消费消息数,如果有多个消费者,则每次消费完成都去告诉RabbitMQ
,从而获取下一条/多条消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| Connection connection = RabbitMQConnection.getConnection();
Channel channel = connection.createChannel();
channel.basicQos(2); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("消费者获取消息:" + msg); channel.basicAck(envelope.getDeliveryTag(), false); } };
channel.basicConsume(QUEUE_NAME, false, defaultConsumer);
|
发布订阅
Exchange
——交换机,分发消息到队列中
有以下几种交换机direct
,topic
,headers
和fanout
。
Fanout Exchange
:扇形交换机——我们每个消费者都能收到消息
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.ruben.mq.rabbitMQ.subcrible;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ProducerFanout {
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { try (Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel();) { channel.exchangeDeclare(EXCHANGE_NAME, "fanout", true); String msg = "ruben"; channel.basicPublish(EXCHANGE_NAME, "", null, msg.getBytes()); } }
}
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class MailConsumer {
private static final String QUEUE_NAME = "fanout_email_queue";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("邮件消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class SmsConsumer {
private static final String QUEUE_NAME = "fanout_email_sms";
private static final String EXCHANGE_NAME = "fanout_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); Connection connection = RabbitMQConnection.getConnection(); final Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ""); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("短信消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
Direct
:直连交换机——按照指定的routingKey
去分发消息
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.ruben.mq.rabbitMQ.subcrible.direct;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ProducerDirect {
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException { try (Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel()) { channel.exchangeDeclare(EXCHANGE_NAME, "direct", true); String msg = "ruben"; channel.basicPublish(EXCHANGE_NAME, "sms", null, msg.getBytes()); } }
}
|
消费者email
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible.direct;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class MailConsumer {
private static final String QUEUE_NAME = "direct_email_queue";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); Connection connection = RabbitMQConnection.getConnection(); final Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "email"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("邮件消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
消费者sms
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible.direct;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class SmsConsumer {
private static final String QUEUE_NAME = "direct_sms_queue";
private static final String EXCHANGE_NAME = "direct_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); Connection connection = RabbitMQConnection.getConnection(); final Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "sms"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("短信消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
Topic
:主题交换机——消费者的routingKey
使用[主题].*
去匹配生产者发送的routingKey
为[主题].xxx
的消息
生产者,发送routingKey
为supa.sms
的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| package com.ruben.mq.rabbitMQ.subcrible.topic;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.util.concurrent.TimeoutException;
public class ProducerTopic {
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException { try (Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel();) { channel.exchangeDeclare(EXCHANGE_NAME, "topic", true); String msg = "ruben"; channel.basicPublish(EXCHANGE_NAME, "supa.sms", null, msg.getBytes()); } }
}
|
消费者,指定routingKey
为ruben.*
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible.topic;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class MailConsumer {
private static final String QUEUE_NAME = "topic_email_queue";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("邮件消费者..."); Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "ruben.*"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("邮件消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
消费者,指定routingKey
为supa.*
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| package com.ruben.mq.rabbitMQ.subcrible.topic;
import com.rabbitmq.client.*; import com.ruben.mq.rabbitMQ.connection.RabbitMQConnection;
import java.io.IOException; import java.nio.charset.StandardCharsets; import java.util.concurrent.TimeoutException;
public class SmsConsumer {
private static final String QUEUE_NAME = "topic_sms_queue";
private static final String EXCHANGE_NAME = "topic_exchange";
public static void main(String[] args) throws IOException, TimeoutException { System.out.println("短信消费者..."); Connection connection = RabbitMQConnection.getConnection(); Channel channel = connection.createChannel(); channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, "supa.*"); DefaultConsumer defaultConsumer = new DefaultConsumer(channel) { @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { String msg = new String(body, StandardCharsets.UTF_8); System.out.println("短信消费者获取消息:" + msg); } }; channel.basicConsume(QUEUE_NAME, true, defaultConsumer);
} }
|
springboot
整合RabbitMQ
GAV
1 2 3 4 5 6
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> <version>2.4.2</version> </dependency>
|
然后是配置文件和配置类
1 2 3 4 5 6 7
| spring: rabbitmq: addresses: localhost port: 5672 username: guest password: guest virtual-host: /ruben
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| package com.ruben.config;
import org.springframework.amqp.core.Binding; import org.springframework.amqp.core.BindingBuilder; import org.springframework.amqp.core.FanoutExchange; import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitmqConfig {
public static final String QUEUE_RUBEN_SMS = "queue_ruben_sms";
public static final String EXCHANGE_RUBEN = "exchange_ruben";
public static final String ROUTING_KEY_RUBEN = "ruben.sms";
@Bean public Queue smsQueue() { return new Queue(QUEUE_RUBEN_SMS); }
@Bean public FanoutExchange fanoutExchange() { return new FanoutExchange(EXCHANGE_RUBEN); }
@Bean public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(smsQueue).to(fanoutExchange); }
}
|
最后是发消息
1 2 3 4 5 6 7 8 9 10 11 12 13
| @Resource private AmqpTemplate amqpTemplate;
@GetMapping("sendSms/{number}") public AjaxJson sendSms(@PathVariable String number) { String code = new Random().ints(100000, 999999).boxed().findAny().map(String::valueOf).orElseThrow(RuntimeException::new); stringRedisTemplate.opsForValue().set(number, code, 5, TimeUnit.MINUTES); amqpTemplate.send(RabbitmqConfig.EXCHANGE_RUBEN, RabbitmqConfig.ROUTING_KEY_RUBEN, MessageBuilder.withBody(JSON.toJSONString(SmsTO.builder().number(number).code(code).build()).getBytes(StandardCharsets.UTF_8)).build()); return AjaxJson.success("发送成功!"); }
|
然后是消费者这边
先配置上面同样的配置类,然后
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| package com.ruben.rubenproducerdemo.consumer;
import com.alibaba.fastjson.JSON; import com.ruben.rubenproducerdemo.config.RabbitmqConfig; import com.ruben.rubenproducerdemo.pojo.to.SmsTO; import com.ruben.rubenproducerdemo.utils.SmsUtil; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;
@Component public class SmsConsumer {
@RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitmqConfig.QUEUE_RUBEN_SMS), exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_RUBEN, type = "fanout"), key = RabbitmqConfig.ROUTING_KEY_RUBEN)) public void consume(Message message) { SmsTO smsTO = JSON.parseObject(message.getBody(), SmsTO.class); SmsUtil.SendSms(smsTO.getNumber(), "SMS_189521312", smsTO.getCode()); }
}
|
然后是发短信的代码,在我之前写过的一篇博客中有
这样就实现了同步返回结果并存入数据库,异步发送验证码短信的业务啦~
死信队列
消息中间件拒收该消息后转移到死信队列中存放,死信队列也可以有交换机、路由key
等
产生原因
1.消息以及过期了都还没被消费
2.队列容量满了
3.消费者多次消费失败
这里我们进行配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81
| package com.ruben.rubenproducerdemo.config;
import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitmqConfig {
public static final String QUEUE_RUBEN_SMS = "queue_ruben_sms";
public static final String QUEUE_DEAD_RUBEN_SMS = "queue_dead_ruben_sms";
public static final String EXCHANGE_RUBEN = "exchange_ruben";
public static final String EXCHANGE_DEAD_RUBEN = "exchange_dead_ruben";
public static final String ROUTING_KEY_RUBEN = "ruben.sms";
@Bean public Queue smsQueue() { return QueueBuilder .durable(QUEUE_RUBEN_SMS) .ttl(10000) .deadLetterExchange(EXCHANGE_DEAD_RUBEN) .deadLetterRoutingKey(ROUTING_KEY_RUBEN) .build(); }
@Bean public FanoutExchange fanoutExchange() { return ExchangeBuilder.fanoutExchange(EXCHANGE_RUBEN).build(); }
@Bean public Binding bindingSmsFanoutExchange(Queue smsQueue, FanoutExchange fanoutExchange) { return BindingBuilder.bind(smsQueue).to(fanoutExchange); }
}
|
然后是死信消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| package com.ruben.rubenproducerdemo.consumer;
import com.alibaba.fastjson.JSON; import com.rabbitmq.client.AMQP; import com.rabbitmq.client.Channel; import com.ruben.rubenproducerdemo.config.RabbitmqConfig; import com.ruben.rubenproducerdemo.pojo.to.SmsTO; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.AmqpTemplate; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.*; import org.springframework.stereotype.Component;
import javax.annotation.Resource; import java.io.IOException;
@Slf4j @Component public class DeadLetterConsumer {
@RabbitHandler @RabbitListener(bindings = @QueueBinding(value = @Queue(value = RabbitmqConfig.QUEUE_DEAD_RUBEN_SMS), exchange = @Exchange(value = RabbitmqConfig.EXCHANGE_DEAD_RUBEN, type = "fanout"), key = RabbitmqConfig.ROUTING_KEY_RUBEN)) public void deadLetterConsume(Message message, Channel channel) throws IOException { SmsTO smsTO = JSON.parseObject(message.getBody(), SmsTO.class); channel.basicAck(message.getMessageProperties().getDeliveryTag(), true); log.info("死信队列收到" + smsTO); } }
|
配置重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| spring: rabbitmq: addresses: localhost port: 5672 username: guest password: guest virtual-host: /ruben listener: simple: retry: enabled: true # 最大重试次数 max-attempts: 5 # 重试间隔毫秒 initial-interval: 3000 # 手动ack模式 acknowledge-mode: manual
|