本文共 4940 字,大约阅读时间需要 16 分钟。
在之前的章节中,所有消费者和生产者均通过@EnableBinding定义,此方式能够快速的构建生产消费关系,但仔细想想,如果我们需要根据一定的条件决策消息生产者将消息发往哪个通道,貌似当前简单粗暴的方式无法满足。如此常见的场景,springcloud必然会帮我们想到,通过BinderAwareChannelResolver的bean实例即可实现动态通道的选择,其会伴随@EnableBinding注解自动完成注册。
本章概要
1、BinderAwareChannelResolver的应用
2、ExpressionEvaluatingRouter的应用
BinderAwareChannelResolver的应用
- 首先来看BinderAwareChannelResolver的直接应用,为了方便场景模拟,采用一个rest api方式触发消息的生产发送。
消费者Receiver工程改造
1、在MySink中添加如下两个动态接收通道,dynamic1-channel与dynamic1-channel
package com.cloud.shf.stream.sink;public interface MySink { /*********************************动态通道选择示例******************************/ String DYNAMIC1_CHANNEL = "dynamic1-channel"; String DYNAMIC2_CHANNEL = "dynamic2-channel"; @Input(DYNAMIC1_CHANNEL) SubscribableChannel dynamic1Input(); @Input(DYNAMIC2_CHANNEL) SubscribableChannel dynamic2Input();}
2、在SinkReceiver.class中添加对上述两个通道的监听,并打印接收内容
/*********************************动态通道选择示例******************************/@StreamListener(value = MySink.DYNAMIC1_CHANNEL)public void dynamic1Receiver(@Payload User user) { LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC1_CHANNEL, user.getAge());} @StreamListener(value = MySink.DYNAMIC2_CHANNEL)public void dynamic2Receiver(@Payload User user) { LOGGER.info("Received-{} from {} channel age: {}", active, MySink.DYNAMIC2_CHANNEL, user.getAge());}
生产者Sender工程改造
添加一个DynamicDestinationController类,提供一个rest-api协助进行场景模拟
package com.cloud.shf.stream.controller;@EnableBinding@Controllerpublic class DynamicDestinationController { @Autowired private BinderAwareChannelResolver resolver; /************************************方式一************************************/ @RequestMapping(path = "/{dest}", method = RequestMethod.POST, consumes = "*/*") @ResponseStatus(HttpStatus.ACCEPTED) public void handleRequest(@PathVariable("dest") String dest, @RequestBody String body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType) { sendMessage(body, dest, contentType); } private void sendMessage(String body, String dest, Object contentType) { resolver.resolveDestination(dest).send(MessageBuilder.createMessage(body, new MessageHeaders(Collections.singletonMap(MessageHeaders.CONTENT_TYPE, contentType)))); }}
Note
- 直接注入BinderAwareChannelResolver的bean实例即可
- 通过PathVariable属性dest值模拟通道名称
- boby作为消息体
- contentType作为消息的头信息
服务验证
1、启动receiver、sender两个工程;
2、多次通过curl请求api如下(curl -H "Content-Type: application/json" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/dynamic*-channel)
此时可以看到receiver工程的控制台打印如下
其打印的来源通道与请求中的占位符完全匹配,继续观察sender控制台日志,由于原来并没有在sernder中定义相关通道描述,故首次触发指定通道即可看到如下日志记录:
小节,由此可以看到,根据占位符dest动态路由成功,准确的被发送至预期的消息通道。实际应用中,如果我们预先知道可能的动态路由通道名称,则可以通过spring.cloud.stream.dynamicDestinations配置白名单,只有预设定的通道名称方会被动态绑定,避免创建大量无效的通道信息,浪费资源。
ExpressionEvaluatingRouter的应用
- 通过下图可以BinderAwareChannelResolver类的定义
其实现了Spring Integration的DestinationResolver接口,并且BinderAwareChannelResolver实例可以被注入在其他的components实例中,本小节将实现将BinderAwareChannelResolver实例注入在ExpressionEvaluatingRouter中实现消息通道的动态绑定。
1、在DynamicDestinationController类中添加如下实现
/************************************方式二************************************/@RequestMapping(path = "/", method = RequestMethod.POST, consumes = "application/json")@ResponseStatus(HttpStatus.ACCEPTED)public void handleRequest(@RequestBody User body, @RequestHeader(HttpHeaders.CONTENT_TYPE) Object contentType, @RequestHeader(name = "dest", required = false) String dest) { Mapheaders = new HashMap<>(2); headers.put(MessageHeaders.CONTENT_TYPE, contentType); if (!StringUtils.isEmpty(dest)) { headers.put("dest", dest); } sendMessage(body, headers);} private void sendMessage(User body, Map headers) { routerChannel().send(MessageBuilder.createMessage(body, new MessageHeaders(headers)));} @Bean(name = "router-channel")public MessageChannel routerChannel() { return new DirectChannel();} @Bean@ServiceActivator(inputChannel = "router-channel")public ExpressionEvaluatingRouter router() { ExpressionEvaluatingRouter router = new ExpressionEvaluatingRouter(new SpelExpressionParser().parseExpression("headers[dest]")); //作用于通过spel表达式没有获取到对应的通道信息 router.setDefaultOutputChannelName("dynamic1-channel"); router.setChannelResolver(resolver); return router;}
Note
- 通过头信息中的dest属性作为动态绑定的依据;如果未设定dest则采用默认dynamic1-channel作为消息通道
- 通过Spel表达式获取头信息中的dest属性值(headers[dest])
- 将BinderAwareChannelResolver注入至ExpressionEvaluatingRouter实例中
- 其中org.springframework.integration.handler.ExpressionEvaluatingMessageProcessor实现了对Message的处理,故可以通过此处看到我们消息包含的消息体、消息体具体信息,从而更好的编写Spel表达式,主要代码如下:
2、再次通过curl多次请求如下(curl -H "Content-Type: application/json" -H "dest:dynamic1-channel" -X POST -d "{\"username\":\"song\",\"age\":12}" http://localhost:9000/)
receiver工程控制台如下:
此时可以看到,接收的消息来源通道与请求头呼应,特别关注第5个case,其并未设定dest属性,故采用了默认的dynamic1-channel通道。
转载地址:https://lux-sun.blog.csdn.net/article/details/107481277 如侵犯您的版权,请留言回复原文章的地址,我们会给您删除此文章,给您带来不便请您谅解!