Redis——发布和订阅
发布日期:2021-06-24 04:57:57 浏览次数:8 分类:技术文章

本文共 3153 字,大约阅读时间需要 10 分钟。

发布与订阅(又称pub/sub),订阅者(listener)负责订阅频道(channel),发送者(publisher)负责向频道发送二进制字符串消息(binary string message).每当有消息被发送给指定频道的时候,频道都所有订阅者都会收到消息。

Redis提供都5个发布订阅命令:

命令 描述
Redis Psubscribe 命令 订阅一个或多个符合给定模式的频道。
Redis Pubsub 命令 查看订阅与发布系统状态。
Redis Publish 命令 将信息发送到指定的频道。
Redis Punsubscribe 命令 退订所有给定模式的频道。
Redis Subscribe 命令 订阅给定的一个或多个频道的信息。
Redis Unsubscribe 命令 指退订给定的频道。

使用实例:

  首先需要一个订阅者(listener)这里建立一个名为Subscriber的类:

public class Subscriber extends JedisPubSub {    public void onMessage(String channel, String message) {        System.out.println("onMessage channel = " + channel+ "message =" + message);    }    public void onSubscribe(String channel, int subscribedChannels) {        System.out.println(String.format("subscribe redis channel success, channel %s, subscribedChannels %d",                channel, subscribedChannels));    }    public void onUnsubscribe(String channel, int subscribedChannels) {        System.out.println(String.format("unsubscribe redis channel, channel %s, subscribedChannels %d",                channel, subscribedChannels));    }}

这个类继承自JedisPubSub,其中onMessage负责接收订阅频道消息后,业务处理逻辑,onSubscribe负责初始化订阅时候的处理,onUnsubscribe取消订阅时候的处理。

然后在定义一个类起一个线程来进行subscribe操作,因为我们需要订阅者一直在线,当发布者一发送消息到相应的频道时,能做出反应

public class SubThread extends Thread {    JedisPool pool;    private final Subscriber subscriber = new Subscriber();    private final String channel = "xx";    public SubThread( JedisPool pool) {        super("SubThread");        this.pool = pool;    }    @Override    public void run() {        System.out.println(String.format("subscribe redis, channel %s, thread will be blocked", channel));        Jedis jedis = pool.getResource();            try {                jedis.subscribe(subscriber, channel);            } catch (Exception e) {                System.out.println(String.format("subsrcibe channel error, %s", e));            } finally {                if (jedis != null) {                    jedis.close();                }            }    }}

再然后就是发布者:

public class Publisher {    JedisPool pool;    public Publisher( JedisPool pool) {        this.pool = pool;    }    public void start() {        Jedis jedis = pool.getResource();        while(true) {            jedis.publish("xx", "233");            try{                Thread.sleep(5000);            }catch (Exception e){                e.printStackTrace();            }        }    }}

再然后就是主函数的调用:

public class test1 {    public static void main(String[] args) throws Exception{        //连接本地的 Redis 服务        Jedis jedis = new Jedis("localhost");        System.out.println("连接成功");        //查看服务是否运行        System.out.println("服务正在运行: "+jedis.ping());        JedisPool pool = new JedisPool("localhost", 6379);        SubThread subThread = new SubThread(pool);        subThread.start();        Publisher publisher = new Publisher(pool);        publisher.start();    }

因为Jedis不是线程安全的,JedisPool是线程安全的,所以这里使用JedisPool。

输出:

连接成功服务正在运行: PONGsubscribe redis, channel xx, thread will be blockedsubscribe redis channel success, channel xx, subscribedChannels 1onMessage channel = xxmessage =233onMessage channel = xxmessage =233

 

转载于:https://www.cnblogs.com/xxbbtt/p/7864953.html

转载地址:https://blog.csdn.net/weixin_30346033/article/details/96147888 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:基于cglib反射的数据库查询结果封装为对象(简单的orm实现)
下一篇:opengl in medical imaging

发表评论

最新留言

留言是一种美德,欢迎回访!
[***.207.175.100]2024年04月17日 06时50分41秒