在Spring Cloud 分布式消息—Spring Cloud Stream 简介与入门一篇我们简单了介绍了Spring Cloud Stream,并且使用Spring Cloud Stream提供的默认通道input,output简单的做了一个示例,本篇我们将会使用自定义通道做一个示例,并且介绍Spring Cloud Stream的高级应用,如果对Spring Cloud Stream 不了解可以先去阅读Spring Cloud 分布式消息—Spring Cloud Stream 简介与入门一篇文章。
在上一篇我们介绍了两个注解@Input和@Output分别用于定义输入和输出通道,Spring Cloud Stream定义的SInk和Source就是使用了这两个注解。我们自定义通道也需要使用这两个注解,使用这两个注解,我们很容易定义输入和输出通道,比如我们定义一个日志的输入输出通道,代码如下:
//定义日志输入通道 public interface LogSink { String INPUT = "logInput"; @Input("logInput") SubscribableChannel input(); } //定义日志输出通道 public interface LogSource { String OUTPUT = "logOutput"; @Output("logOutput") MessageChannel output(); }在编写在通道之后,我们需要在配置文件中定义通道,将通道绑定到binder,并且为binder绑定到消息中间件,上面的通道配置如下:
spring: cloud: stream: bindings: #用于绑定通道到binder logInput: #ChannelName 这里是输入通道的名称,如果@Input不指定默认为方法名 destination: log #从哪里接收消息,这里指topic或者队列名称,在rabbit中为exchange binder: logBinder #绑定到名为logBinder的binder logOutput: #ChannelName 这里是输出通道的名称,如果@Output不指定默认为方法名 destination: log #将消息发送到哪里,这里指topic或者队列名称,在rabbit中为exchange binder: logBinder #绑定到名为logBinder的binder content-type: binders: #配置binder logBinder: #配置名称为hello1的binder type: rabbit #binder类型为rabbitMq environment: #配置运行环境 spring: rabbitmq: host: 10.0.10.63 #地址 port: 5672 #端口 username: guest #用户名 password: guest #密码 server: port: 8092然后需要@EnableBinding注解绑定通道即可,消息接收的代码如下所示,发送消息的代码这里不再展示,只需要@EnableBinding(LogSource.class),然后使用@Autowire注解将LogSource注入即可。
@EnableBinding(LogSink.class) public class LogReceiver { private static Logger logger = LoggerFactory.getLogger(LogReceiver.class); @StreamListener(LogSink.INPUT) public void receive(String payload) { logger.info("Received: " + payload); } }上面的代码中,我们将输入通道、输出通道分别定义在不同的接口中,如果通道特别多,则需要定义多个接口,因此我们可以将多个通道定义在一个接口内,@EnableBinding也只需要绑定一个接口即可。如下代码,我们只需要@EnableBinding(LogChannel.class)即可,在发送消息时使用@Autowire注解LogChannel接口。
public interface LogChannel { String INPUT = "service1logInput"; @Input("service1logInput") SubscribableChannel service1input(); String INPUT2 = "service2logInput"; @Input("service1logInput") SubscribableChannel service2input(); String OUTPUT = "service1logOutput"; @Output("service1logOutput") MessageChannel service1logOutput(); String OUTPUT2 = "service2logOutput"; @Output("service12ogOutput") MessageChannel service2logOutput(); }Spring Cloud Stream建立在Enterprise Integration Patterns定义的概念和模式的基础上,并依赖于其内部实现,该内部实现依赖于Spring项目组合中已建立且流行的Enterprise Integration Patterns实现: Spring Integration Framework。因此,它自然支持Spring Integration已经建立的基础,语义和配置选项。比如你可以使用@InboundChannelAdapter将Source的输出通道附加到MessageSource上,同样,可以在提供Processor绑定合约的消息处理程序方法的实现时使用@Transformer或@ServiceActivator ,代码示例如下所示:
@EnableBinding(Source.class) public class TimerSource { @Bean @InboundChannelAdapter(value = Source.OUTPUT, poller = @Poller(fixedDelay = "10", maxMessagesPerPoll = "1")) public MessageSource<String> timerMessageSource() { return () -> new GenericMessage<>("Hello Spring Cloud Stream"); } } @EnableBinding(Processor.class) public class TransformProcessor { @Transformer(inputChannel = Processor.INPUT, outputChannel = Processor.OUTPUT) public Object transform(String message) { return message.toUpperCase(); } }关于@StreamListener注解,它是Spring Cloud Stream 对Spring Integration的补充,像Spring Messaging的其他注解(@JmsListener)一样能为我们提供诸如路由等功能。对于使用@StreamListener注解的方法,你可以返回一个数据,但是你必须使用@SendTo注解指定该方法返回的数据的输出的目的地,代码如下所示:
@EnableBinding(Processor.class) public class TransformProcessor { @Autowired VotingService votingService; @StreamListener(Processor.INPUT) @SendTo(Processor.OUTPUT) public VoteResult handle(Vote vote) { return votingService.record(vote); } }Spring Cloud Stream支持基于@SpringListener注解上的条件,将消息转发给多个处理方法,这些方法不能有返回值并且是单独的消息处理方法。我们可以为@StreamListener的condition参数传入一个指定的SpEL表达式。这样只有匹配到表达式的处理器才会被调用。代码示例如下:
@EnableBinding(Sink.class) @EnableAutoConfiguration public static class TestPojoWithAnnotatedArguments { @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bogey'") public void receiveBogey(@Payload BogeyPojo bogeyPojo) { // handle the message } @StreamListener(target = Sink.INPUT, condition = "headers['type']=='bacall'") public void receiveBacall(@Payload BacallPojo bacallPojo) { // handle the message } }发布-订阅模型使通过共享topic让应用程序之间交互更加容易,但是通过创建给定应用程序的多个实例进行扩展的能力同样重要。这样做时,会将应用程序的不同实例置于竞争的消费者关系中,在该消费者关系中,只有一个实例可以处理给定消息。Spring Cloud Stream提供了消费组的概念可以通过spring.cloud.stream.bindings.<channelName>.group配置一个组别。如下图中的消费者 将会使用该配置:spring.cloud.stream.bindings.<channelName>.group=hdfsWrite或者 spring.cloud.stream.bindings.<channelName>.group=average.
订阅给定目标的所有组都将收到已发布数据的副本,但是每个组中只有一个成员从该目标接收给定消息。默认情况下,当未指定组时,Spring Cloud Stream会将应用程序分配给一个匿名且独立的单成员使用者组,该使用者组与所有其他使用者组具有发布-订阅关系。
除了分组,
Spring Cloud Stream提供了对给定应用程序的多个实例之间的数据分区的支持。在分区方案中,物理通信介质(例如代理主题)被视为结构化为多个分区。一个或多个生产者应用程序实例将数据发送到多个消费者应用程序实例,并确保由共同特征标识的数据由同一消费者实例处理。Spring Cloud Stream提供了一种通用抽象,用于以统一的方式实现分区处理用例。因此,无论代理本身是自然分区(例如,Kafka)还是非自然分区(例如,RabbitMQ),都可以使用分区。分区结构与所需配置如下所示:
#下面是生产者配置 #通过该参数指定了分区键的表达式规则 spring.cloud.stream.bindings.<channel-name>.producer.partitionKeyExpression=payload #指定了消息分区的数量。 spring.cloud.stream.bindings.<channel-name>.producer.partitionCount=2 #下面是消费者配置 #开启消费者分区功能 spring.cloud.stream.bindings.<channel-name>.consumer.partitioned=true #指定了当前消费者的总实例数量 spring.cloud.stream.instanceCount=2 #设置当前实例的索引号,从 0 开始 spring.cloud.stream.instanceIndex=1本篇介绍了Spring Cloud Stream 如何自定义通道名称和Spring Cloud Stream 分组和分区和路由等高级应用。下一篇我们会介绍Spring Cloud Stream的异常处理和一些高级配置的知识。