SpringBoot 集成RabbitMQ

SpringBoot 集成RabbitMQ

杰子学编程 77 2022-06-01

SpringBoot 集成RabbitMQ

一、Docker安装Rabbit MQ

运行下面命令,docker 可自动拉取镜像,并启动mq。

docker run -d --name rabbitmq -p 5671:5671 -p 5672:5672 -p 4369:4369 -p 25672:25672 -p 15671:15671 -p 15672:15672 rabbitmq:management

我们执行完成后可以运行docker ps 查看下mq运行情况

WX20211007-094431@2x

我们看到RabbitMq已经启动成功,我们在浏览器中打开ip:15672显示如下:

WX20211007-095121@2x

输入用户名密码,默认用户名guest/guest;登录成功后显示如下界面。

WX20211007-095138@2x

至此,RabbitMQ安装完成。

二、SpringBoot项目初始化

我们使用Spring initalizr初始化SpringBoot 项目,Spring initalizr

WX20211007-101630@2x

这里我们通过Spring官网初始化项目,并添加RabbitMQ的依赖,我们直接点击生成,代码会自动下载下来,我们将下载的代码导入到idea中(我这里的idea是社区版不支持Spring,故在官网初始化项目)。

WX20211007-102149@2x

项目导入到idea后,我们新创建个controller包,在包中创建IndexController.class。我们使用创建的Controller测试下我们的工程,在IndexController.class中我们添加一下内容:

@RestController
public class IndexController {
    @GetMapping("/index")
    public String index() {
        return "Hello RabbitMQ";
    }
}

启动工程,在浏览器中访问127.0.0.1:8080/index可以看到浏览器中出现“Hello RabbitMQ”,说明我们的工程初始化没有问题。

WX20211007-102529@2x

三、SpringBoot配置RabbitMQ

3.1、创建RabbitMqConfig

默认RabbitMQ序列化方式是SerializerMessageConverter序列化器,这么我们使用Jackson2JsonMessageConverter序列化器。我们需要设置下,内容如下:

@Configuration
public class RabbitMqTemplateConfig {

    @Bean
    public RabbitTemplate rabbitTemplate(CachingConnectionFactory connectionFactory) {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
        rabbitTemplate.setMessageConverter(jackson2JsonMessageConverter());
        return rabbitTemplate;
    }

    @Bean
    public MessageConverter jackson2JsonMessageConverter() {
        return new Jackson2JsonMessageConverter();
    }
}

完善SpringBoot配置文件,配置文件内容如下:

spring.rabbitmq.host=110.40.141.168
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/

这里我们使用的是application.properties,而非yaml。使用yaml的可以自行转换下。

3.2、创建队列常量类SimpleMqConstant

这里只做简单的功能演示,我们把队列的名称统一定义在常量类SimpleMqConstant类中,后续我们扩展其他队列方便维护。

/**
 * @Author julyWhj
 * @Description 默认的交换机测试$
 * @Date 2021/10/7 10:52 上午
 **/
public class SimpleMqConstant {

    /**
     * 处理对象的MQ队列
     */
    public static final String HANDLER_OBJECT_QUEUE_NAME = "com.july.mq.simple.object";

}

这里我们定义队列名称叫:com.july.mq.simple.object;

3.3、创建Simple对象

这里我们创建一个Simple对象,使用该对象进行序列化发送。

/**
 * @Author julyWhj
 * @Description Simple对象$
 * @Date 2021/10/7 10:55 上午
 **/
@Data
@Builder
@AllArgsConstructor
@NoArgsConstructor
public class Simple implements Serializable {
    private String name;
    private String no;
    private int age;
    private String phone;
    private Date createTime;
}

3.4、创建队列消费者SimpleConsumer

我们创建SimpleConsumer类,做为队列的消费者,内容如下:

/**
 * @Author julyWhj
 * @Description 消费者$
 * @Date 2021/10/7 10:57 上午
 **/
@Component
@Slf4j
public class SimpleConsumer {


    @RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME))
    @RabbitHandler
    public void receiveObject(Simple simple) throws JsonProcessingException {
        ObjectMapper objectMapper = new ObjectMapper();
        String message = objectMapper.writeValueAsString(simple);
        log.info("simple consumer receive the object:{}", message);
    }
}

这里我们使用@RabbitListener(queuesToDeclare = @Queue(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME)) 其中queuesToDeclare它可以在队列SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME不存在的时候自动创建队列,不会出现reply-code=404, reply-text=NOT_FOUND - no exchange 'XXX' in vhost '/', class-id=50, method-id=的异常。

这里我们接收到消息后,只做打印处理,不做其他处理。

3.5、创建队列生产者SimpleProducer

队列生产者SimpleProducer内容如下:

/**
 * @Author julyWhj
 * @Description 生产者$
 * @Date 2021/10/7 10:54 上午
 **/
@Component
public class SimpleProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 消息体为对象。配置MessageConverter为Jackson2JsonMessageConverter即可
     *
     * @param simple
     */
    public void sendOrderMessage(Simple simple) {
        rabbitTemplate.convertAndSend(SimpleMqConstant.HANDLER_OBJECT_QUEUE_NAME, simple);
    }
}

生产者内容很简单,接收Simple对象,调用convertAndSend 方法发送对象。

3.6、创建单元测试类SimpleMqTest

这里我们使用单元测试进行消息的发送和接收测试,测试类内容如下:

/**
 * @Author julyWhj
 * @Description $
 * @Date 2021/10/7 10:58 上午
 **/
@SpringBootTest
@Slf4j
public class SimpleMqTest {
    @Autowired
    private SimpleProducer simpleProducer;

    @Test
    public void testSimple() throws Exception {
        for (int i = 0; i < 10; i++) {
            simpleProducer.sendOrderMessage(Simple.builder()
                    .createTime(new Date())
                    .name("JulyWhj")
                    .age(i)
                    .no("ID-0001")
                    .phone("138XXXXXXXX")
                    .build());
        }
    }
}

我们运行单元测试,看下执行结果:

WX20211007-114619@2x

可以看到,消费者成功接收到10条数据,并成功打印出来。

四、思考:我们这样写会存在什么问题?

我们这样写会存在一个致命的问题,消息丢失

如何造成的消息丢失,我们应该怎么处理保证消息不丢失。后续的文章会为大家逐一分析。这里我们先简单的使用SpringBoot连接MQ,进行收发消息的Demo。

源码我上传github中,需要的可自行下载。


# Java # SpringBoot # RabbitMq