SpringCloud - Stream 动态绑定消息通道
发布日期:2021-06-30 23:28:51 浏览次数:2 分类:技术文章

本文共 4940 字,大约阅读时间需要 16 分钟。

在之前的章节中,所有消费者和生产者均通过@EnableBinding定义,此方式能够快速的构建生产消费关系,但仔细想想,如果我们需要根据一定的条件决策消息生产者将消息发往哪个通道,貌似当前简单粗暴的方式无法满足。如此常见的场景,springcloud必然会帮我们想到,通过BinderAwareChannelResolver的bean实例即可实现动态通道的选择,其会伴随@EnableBinding注解自动完成注册。

 

本章概要

1、BinderAwareChannelResolver的应用

2、ExpressionEvaluatingRouter的应用

 

BinderAwareChannelResolver的应用

  • 首先来看BinderAwareChannelResolver的直接应用,为了方便场景模拟,采用一个rest api方式触发消息的生产发送。

 

消费者Receiver工程改造

1、在MySink中添加如下两个动态接收通道,dynamic1-channel与dynamic1-channel

package com.cloud.shf.stream.sink;public interface MySink {    /*********************************动态通道选择示例******************************/    String DYNAMIC1_CHANNEL = "dynamic1-channel";    String DYNAMIC2_CHANNEL = "dynamic2-channel";     @Input(DYNAMIC1_CHANNEL)    SubscribableChannel dynamic1Input();     @Input(DYNAMIC2_CHANNEL)    SubscribableChannel dynamic2Input();}

2、在SinkReceiver.class中添加对上述两个通道的监听,并打印接收内容

/*********************************动态通道选择示例******************************/@StreamListener(value = MySink.DYNAMIC1_CHANNEL)public void dynamic1Receiver(@Payload User user) {    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge());} @StreamListener(value = MySink.DYNAMIC2_CHANNEL)public void dynamic2Receiver(@Payload User user) {    LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge());}

 

生产者Sender工程改造

添加一个DynamicDestinationController类,提供一个rest-api协助进行场景模拟

package com.cloud.shf.stream.controller;@EnableBinding@Controllerpublic class DynamicDestinationController {     @Autowired    private BinderAwareChannelResolver resolver;     /************************************方式一************************************/    @RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*")    @ResponseStatus(HttpStatus.ACCEPTED)    public void handleRequest(@PathVariable("dest") String dest,                              @RequestBody String body,                              @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) {        sendMessage(body, dest, contentType);    }     private void sendMessage(String body, String dest, Object contentType) {        resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body,                new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType))));    }}

Note

  • 直接注入BinderAwareChannelResolver的bean实例即可
  • 通过PathVariable属性dest值模拟通道名称
  • boby作为消息体
  • contentType作为消息的头信息

 

服务验证

1、启动receiver、sender两个工程;

2、多次通过curl请求api如下(curl -H "Content-Type: application/json" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/dynamic*-channel)

此时可以看到receiver工程的控制台打印如下

其打印的来源通道与请求中的占位符完全匹配,继续观察sender控制台日志,由于原来并没有在sernder中定义相关通道描述,故首次触发指定通道即可看到如下日志记录:

小节,由此可以看到,根据占位符dest动态路由成功,准确的被发送至预期的消息通道。实际应用中,如果我们预先知道可能的动态路由通道名称,则可以通过spring.cloud.stream.dynamicDestinations配置白名单,只有预设定的通道名称方会被动态绑定,避免创建大量无效的通道信息,浪费资源。

 

ExpressionEvaluatingRouter的应用

  • 通过下图可以BinderAwareChannelResolver类的定义

其实现了Spring Integration的DestinationResolver接口,并且BinderAwareChannelResolver实例可以被注入在其他的components实例中,本小节将实现将BinderAwareChannelResolver实例注入在ExpressionEvaluatingRouter中实现消息通道的动态绑定。

1、在DynamicDestinationController类中添加如下实现

/************************************方式二************************************/@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")@ResponseStatus(HttpStatus.ACCEPTED)public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) {    Map
headers = new HashMap<>(2); headers.put(MessageHeaders.CONTENT_TYPE, contentType); if (!StringUtils.isEmpty(dest)) { headers.put("dest", dest); } sendMessage(body, headers);} private void sendMessage(User body, Map
headers) { routerChannel().send(MessageBuilder.createMessage(body, new MessageHeaders(headers)));} @Bean(name = "router-channel")public MessageChannel routerChannel() { return new DirectChannel();} @Bean@ServiceActivator(inputChannel = "router-channel")public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]")); //作用于通过spel表达式没有获取到对应的通道信息 router.setDefaultOutputChannelName("dynamic1-channel"); router.setChannelResolver(resolver); return router;}

Note

  • 通过头信息中的dest属性作为动态绑定的依据;如果未设定dest则采用默认dynamic1-channel作为消息通道
  • 通过Spel表达式获取头信息中的dest属性值(headers[dest])
  • 将BinderAwareChannelResolver注入至ExpressionEvaluatingRouter实例中
  • 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor实现了对Message的处理,故可以通过此处看到我们消息包含的消息体、消息体具体信息,从而更好的编写Spel表达式,主要代码如下:

2、再次通过curl多次请求如下(curl -H "Content-Type: application/json" -H "dest:dynamic1-channel" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/)

receiver工程控制台如下:

此时可以看到,接收的消息来源通道与请求头呼应,特别关注第5个case,其并未设定dest属性,故采用了默认的dynamic1-channel通道。

转载地址:https://lux-sun.blog.csdn.net/article/details/107481277 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!

上一篇:DOS - Windows 上 telnet 测试端口号
下一篇:程序人生 - 当HR压你价,说你只值7K,你该怎么回答?

发表评论

最新留言

感谢大佬
[***.8.128.20]2024年04月12日 16时34分06秒