Introduction
In this blog post, I’ll dive into Spring Cloud Stream API that is underneath the Spring umbrella. The API is around for some time and has many of beneficial use cases. I felt the need to write this post, because as a typical nature, at work we were in diversion of achieving a stream line and integration among our microservices.
Sample Project
I have come to prepare a good demonstration projects one named “Automation” that will create events and the other is “Pki” that will consume all produced events. Upon successful validation, the event will be persisted in an in-memory database. Automation has a simple REST endpoint that will take a Certificate Order and proceed with the stream. Here is an illustration of the further achievement
Requirements
In order for running this project, you will need to have following setup:
- Maven 3,
- Java 8,
- Docker
Running a Rabbit MQ inside Docker:
docker run -d --hostname my-rabbit --name some-rabbit -p 15672:15672 -p 5672:5672 rabbitmq:3-management
Sending a Certification Request to the Automation API:
curl -X POST http://localhost:7000/api/certificates -H "Content-Type: application/json" -d "{\"commonName\":\"tugrulaslan.com\",\"algorithm\":\"sha256\"}"
Running Tests
Both projects have their Unit and Integration tests. You can have a peek at them to find out how each module works.
Concept
Spring Cloud API allows application code to decouple from heavy burden tight integration-provider-specific code, as well settings and the details of the exchange systems. By utilizing the Spring Cloud Stream API, applications are agnostic to the details and just focused on communication. The API will ensure the queues, and topics in the messaging tool are created and the connection among the modules are achieved.
Application perform business code and transmit/emit the events using the inputs and outputs which we will have a look in the upcoming chapters. Here thedepicted figure from the project documentation demonstrates the integration layout:
Communication and Interfaces
As I stated earlier, in the Cloud API we are using just interfaces towards the communication. Since streaming diverts, I am going to conclude all the details in two different chapters on Producer and Consumer.
Furthermore, the API itself, provides a basic way to exchange messages over the default existing interfaces namely; “Sink”, “Source”, and “Processor”. However, I am taking my sample to a next level that will have its own custom interfaces to demonstrate more features.Eventually, the sample code will end up using a Topic Exchange communication with that each consumer will receive the type of the specific events that they like. You can see all the target samples and the Topic exchange type in the reference post[1]
Producer Side
On this side, we have the project Automation that is responsible of producing the events that are triggered via REST API. First of all let’s look at the Programming Side of the API and discover the interface called “CertificateEventStreamSource”
import org.springframework.cloud.stream.annotation.Output; import org.springframework.messaging.MessageChannel; public interface CertificateEventStreamSource { @Output("certificateOrderedChannel") MessageChannel certificateOrderedChannel(); }
Let me elaborate the concepts here:
- @Output: The annotation defines a name for the communication channel as well as points out that the defined interface is a Producer. The given annotation name must match in the configuration file, that we will see soon,
- MessageChannel: The interface provides ways to send the event to the MQ
For now, this is the most that your application needs to know in terms of the communication. Just a last step, we need to annotate the class that will use this interface shown in the class called “CertificateService”
@EnableBinding(CertificateEventStreamSource.class)
In addition, our next target is the configuration. The configuration steps are very simple. Generally, Spring Boot project provides Automatic Configuration out of the box for many projects, that helps you not to define certain default entries like rabbit local host string, port etc. You can see it in the documentation[2]
application.yaml
spring.cloud.stream.bindings.certificateOrderedChannel.destination=CREDS_EXCHANGE spring.cloud.stream.rabbit.bindings.certificateOrderedChannel.producer.routing-key-expression='creds.certificate.ordered'
Let’s break the entries down for the explanation:
destination: It points out to the Exchange on the Broker. The name needs to match to the one defined in the Interface’s @Output annotation, then the framework will easily bind this interface. The corresponding value is the name of the Exchange that will appear in the Rabbit MQ,
routing-key-expression: The second and the last entry is for the key for the event. With the help of this entry, Rabbit easily binds this event to this key, later the designated consumers will pull the events by this key.
If you like to know more about the Producer bindings that are available, consult the API documentation[3] . Down to this point, we have easily configured our Automation API to be fully working with the Event Stream. There is no more needed configuration required, and now we can focus on our consumer.
Consumer Side
For the consumption of the events, we will be observing the “PKI ” component which will emit the events, run some validation and then persist the event in database. The Elaboration of this component will vary compared to the Producer Automation, because I have enhanced this consumer with some beneficial use cases that i will explain hereby.
Before diving into the details, I’ll show the Interface then it will follow the configuration and so on. Now let’s look at “CertificateEventStreamSource”
import org.springframework.cloud.stream.annotation.Input; import org.springframework.messaging.SubscribableChannel; public interface CertificateEventStreamSource { String CERTIFICATE_ORDERED_CHANNEL = "certificateOrderedSubscribableChannel"; @Input(CERTIFICATE_ORDERED_CHANNEL) SubscribableChannel certificateOrdered(); }
Let’s look into what we have here and give some light:
- @Input: The annotation defines a name for the communication channel as well as points out that the defined interface is a Consumer. The given name in the annotation must correspond to the binding in the property,
- MessageChannel: The interface provides ways to emit the event from the MQ
These two are semantically same to the Producer’s interface. Additionally, we need to annotate the class “CertificateOrderedListener” with:
@EnableBinding(CertificateEventStreamSource.class)
So far we let our application to programmatically integrate with the API. Now we’ll move onto the configuration which will give the Cloud API the guidance on how to manage our stream and some failure scenarios:
application.yaml
spring.cloud.stream.bindings.certificateOrderedSubscribableChannel.destination=CREDS_EXCHANGE spring.cloud.stream.rabbit.bindings.certificateOrderedSubscribableChannel.consumer.binding-routing-key=creds.certificate.ordered spring.cloud.stream.rabbit.bindings.certificateOrderedSubscribableChannel.consumer.bind-queue=true spring.cloud.stream.bindings.certificateOrderedSubscribableChannel.group=certificateOrderedQueue spring.cloud.stream.rabbit.bindings.certificateOrderedSubscribableChannel.consumer.auto-bind-dlq=true spring.cloud.stream.rabbit.bindings.certificateOrderedSubscribableChannel.consumer.republishToDlq=true spring.cloud.stream.bindings.certificateOrderedSubscribableChannel.consumer.max-attempts=2
Let’s dive into the detail;, first of all notice the repetitive “certificateOrderedSubscribableChannel” definitions after each “bindings” entry, that points out to the input value in the @Input annotation. These two entries must match, otherwise we will never be able to bind the consumer properly. Now let’s move onto the each definition:
destination: It defines the exchange that we will connect to,
binding-routing-key: this key is defined bidirectionally, as we saw in the Producer, it indicates to the Event mapping. The more details are given in the Producer’s section,
bind-queue: sets queues for the given routing key,
group: grouping is a very vital feature, and deserves a bit longer explanation. By setting this value up, you define a strategy of each designated consumer receives one message at a time, it is so-called “robin round” fashion. If the grouping is not defined, then each consumer will get every message. I believe the explanation might confuse you, so let me visualize it
More information and available settings are in the documentation[4]
auto-bind-dlq: It sets up a DLQ and configures the original queue to forward rejections into this queue. In the following chapters, I’ll give an example from the application code,
republishToDlq: The binder forwards the message to the DLQ with the exception information in the header,
max-attempts: It sets maximum attempts for the application to receive the message upon errors,
You can find more about the available use cases for the Exceptions and the retry mechanisms in the documentation[5]
Exception Handling
The Spring Cloud API allows you to only define a strategy for the failure cases in your application, the rest of the details will be taken care. When exceptions are thrown by your applications, depending on your configuration, the API will wrap it up and handle the situation. In the above configuration, I have defined some strategies with the Retry Attempt and the Dead Letter Queue.
In the PKI application, I have a minor demonstration case that will achieve the mentioned following case with the below Certification case:
curl -X POST http://localhost:7000/api/certificates -H "Content-Type: application/json" -d "{\"commonName\":\"tugrulaslan.com\",\"algorithm\":\"sha1\"}"
After PKI receives such event, it will retry twice and move it to the Dead Letter Queue “CREDS_EXCHANGE.certificateOrderedQueue.dlq“
Benefits
I have populated some of the benefits, that I personally believe the Cloud API will bring into your projects;
- Applications will be kept apart from heavy settings,
- Abstraction from the inner workings of the exchange communication and specific API implementations,
- Yet another abstraction benefit, while changing the Message Broker, you can easily change one broker to another, and your application will not be aware, list of binders[6],
- Easy management on the exception cases land, retrieval strategies,
- Spring Cloud Sleuth API is enabled by default, you can easily trace the messages,
- Since its a Spring project other native projects can be easily hooked up,
- Automatic marshaling and unmarshaling in the event exchange right out of the box without any prior configuration
References
- https://www.cloudamqp.com/blog/2015-09-03-part4-rabbitmq-for-beginners-exchanges-routing-keys-bindings.html
- https://docs.spring.io/spring-boot/docs/current/reference/html/appendix-auto-configuration-classes.html
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html#_producer_properties
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/html/_configuration_options.html#_consumer_properties
- https://docs.spring.io/spring-cloud-stream/docs/current/reference/htmlsingle/#spring-cloud-stream-overview-error-handling
- https://cloud.spring.io/spring-cloud-stream/spring-cloud-stream.html#_binder_implementations