一般来说,消息队列有两种场景,一种是发布者订阅者模式,一种是生产者消费者模式,这里利用redis消息“发布/订阅”来简单实现订阅者模式。
实现之前先过过 redis 发布订阅的一些基础概念和操作。
- Redis 发布订阅 Redis 发布订阅(pub/sub)是一种消息通信模式(Topic):发送者(pub)发送消息,订阅者(sub)接收消息。Redis 客户端可以订阅任意数量的频道。
下图展示了频道 channel1 , 以及订阅这个频道的三个客户端 —— client2 、 client5 和 client1 之间的关系:
当有新消息通过 PUBLISH 命令发送给频道 channel1 时, 这个消息就会被发送给订阅它的三个客户端:
接下来我们通过 Redis-cli 来简单看看效果:
1)通过 publish 来发布 topic message
2)通过 subscribe 来订阅 topic
接下来用springboot2 + spring data redis 来实现来简单实现订阅者模式:
spring data redis实现发布与订阅需要配置以下信息:
- - Topic
- - MessageListener
- - RedisMessageListenerContainer
1). 用到的 starter 只有 spring data redis starter:
dependencies { compile group: 'org.springframework.boot', name: 'spring-boot-starter-redis', version: '1.4.7.RELEASE' testCompile('org.springframework.boot:spring-boot-starter-test')}
2). 配置 spring data redis:
package com.example.demo.redis.listener.redis.config;import com.example.demo.redis.listener.ConsumerRedisListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.jedis.JedisConnectionFactory;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;@Configurationpublic class RedisConfig { @Autowired private JedisConnectionFactory jedisConnectionFactory; @Bean public ConsumerRedisListener consumerRedis() { return new ConsumerRedisListener(); } @Bean public ChannelTopic topic() { return new ChannelTopic("string-topic"); } @Bean public RedisMessageListenerContainer redisMessageListenerContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(jedisConnectionFactory); container.addMessageListener(consumerRedis(),topic()); return container; }}
3) 实现一个String类型的 topic MessageListener
package com.example.demo.redis.listener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener;import org.springframework.data.redis.core.StringRedisTemplate;public class ConsumerRedisListener implements MessageListener { @Autowired private StringRedisTemplate stringRedisTemplate; @Override public void onMessage(Message message, byte[] pattern) { doBusiness(message); } /** * 打印 message body 内容 * @param message */ public void doBusiness(Message message) { Object value = stringRedisTemplate.getValueSerializer().deserialize(message.getBody()); System.out.println("consumer message: " + String.valueOf(value)); }}
4) 其它:
记得配置上 redis 相关的配置,最简单的application.properties配置如下:
spring.redis.host=127.0.0.1
spring.redis.port=6379
通过上面四步,简单的订阅者就做好了,通过以下代码可以发布一个消息,同时可以查看到控制台会有订阅者消费信息打印出来:
@Autowired private StringRedisTemplate stringRedisTemplate; @Test public void testRedisStringOps() { stringRedisTemplate.convertAndSend("string-topic","hello world"); }
最后总结下:
用 spring data redis 来实现 redis 订阅者,本质上还是Listener模式,只需要配置Topic, MessageListener 和 RedisMessageListenerContainer就可以了。同时,发布时,只需要使用 redisTemplate 的 convertAndSend方法即可topic来发布message。