Consume Messages With Spring Cloud Stream Kafka

Update:

Spring Cloud Stream is a very complex topic and a remarkable piece of technology. It builds on other intricate Spring technologies like Spring Integration and Spring Cloud Function, and when you add Apache Kafka to the mix, you have a steep learning curve on your hands.

There is a lot of documentation to read and comprehend, and I do not think it helps that your first interaction with the technology is by showing off. Here is the sample in the “Introducing Spring Cloud Stream” section.

@SpringBootApplication
public class SampleApplication {
    public static void main(String[] args) {
        SpringApplication.run(SampleApplication.class, args);
    }
    @Bean
    public Function<String, String> uppercase() {
        return value -> value.toUpperCase();
    }
}

That supposedly is a fully functioning application. The uppercase() method consumes and produces simultaneously, essentially turning it into a way software can pleasure itself. To understand this example, you must know about all the Spring Boot auto-configuration magic happening in the background. Otherwise, it is an opaque magical, and indecipherable showpiece.

This post will show a practical example of a simple consumer application receiving messages from a Kafka cluster. This was my use case, and while the documentation contains a ton of helpful information, it only succeeded in confusing me at first, coming to the technology with fresh eyes. As a result, it took me a long time to put together all the pieces before I understood what was going on.

Brokering Messages

The first thing you need is a Kafka cluster. The project’s website has a very concise and excellent quick-start guide. I followed it to the letter, and it immediately worked on my Mac. Note that Kafka does not run natively on Windows. You will have to rely on WSL for that.

My example consumes messages from a topic called “fake-news” and the consumer group “impartial-outlet”.

$ bin/kafka-topics.sh --create --topic fake-news --bootstrap-server localhost:9092

You can write messages to it using the kafka-console-producer.

$ bin/kafka-console-producer.sh --topic fake-news --bootstrap-server localhost:9092

I went with a JSON message format deserialized into POJOs by the application. For some fun, I added a nested object.

{
    "type": "fake-news",
    "referrer": "anonymous",
    "payload": {
        "alternativeFact": "Episode One is the best Star Wars movie"
    }
}

The “type” and “referrer” could be helpful in a finished application, like describing the purpose of the content and who sent it. It could come in handy in a request-response situation where the consumer must post the response to the correct topic. I do not use it in this example, but I wanted to include it for a more realistic scenario.

Indisputable Fact: I Want Code!

Spring Cloud Stream gives you two options to consume data: a Function<T,R> or a Consumer<T>. Both are standard Java interfaces that have existed since Java 8. The Consumer<T> interface was my tool of choice to remove any ambiguity. I wanted to ingest data and do something with it, nothing more.

Therefore, I defined a bean of that type.

@Bean
public Consumer<Message<RequestMessage>> requestMessageConsumer() {
    return message -> { /* do something */ };
}

However, I am not a fan of the inline business logic shown by the documentation. As such, I created a dedicated class, a @RestController, if you will, that implements the Consumer<T> interface.

RequestMessage is my POJO that represents the JSON format listed earlier. The Message interface comes from Spring.

@Slf4j
@RequiredArgsConstructor
public class RequestMessageConsumer implements Consumer<Message<RequestMessage>> {
    private final MessageAggregationService messageAggregationService;
    @Override
    public void accept(Message<RequestMessage> requestMessageMessage) {
        messageAggregationService.
                evaluateMessage(requestMessageMessage.getPayload());
    }
}

As you can see, it requires another bean to which it hands off the message. I see it as the messaging equivalent of a @RestController. It receives the data, validates it, and then passes it on to the business logic.

The bean definition now looks the following.

@Bean
public MessageAggregationService messageAggregationService() {
    return new MessageAggregationService();
}
@Bean
public RequestMessageConsumer requestMessageConsumer() {
    return new RequestMessageConsumer(messageAggregationService());
}

Demystify With Configuration

Earlier I mentioned that the first example in the docs is “magical” and “indecipherable”. How can this be codified into configuration to turn it into something comprehendible?

Let me show you the complete configuration, and then I will explain what is happening.

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9092"
          autoCreateTopics: false
      function:
        definition: requestMessageConsumer;fakeNewsError
      bindings:
        requestMessageConsumer-in-0:
          destination: fake-news
          group: impartial-outlet
          error-handler-definition: fakeNewsError
          consumer:
            maxAttempts: 1

The configuration consists of two parts: the generic Spring Cloud Stream portion and the Kafka-specific portion. The kafka element easily identifies the latter. Let me start with that first.

spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: "localhost:9092"
          autoCreateTopics: false

Every compatible system has its own documentation and set of configurations. You can find the complete documentation for the Kafka Binder here. My simple example only requires where to find the broker to connect to. I include autoCreateTopics as an example that there is more and because I do not like that type of automation.

Binders, like the Kafka Binder, allow many different types of systems to interact with Spring Cloud Stream in a way that is transparent to your business logic. For example, your code does not have to change when you switch from Kafka to RabbitMQ. Change the binder, and you should be done. “Should” because I have not verified this. But that is the beauty of this design.

Next, I defined the beans I want Spring Cloud Stream to consider. The function.definition property takes care of that. Here I listed all the bean names separated with a semi-colon. You can even do functional composition and create a pipeline for more complex scenarios. This way, you can keep your bean logic small and testable and use composition to achieve your goal. This is a topic way beyond this blog post, so I only mention it as an FYI. The important part: use a semi-colon instead of a comma. Spring Cloud Stream will interpret the latter as composition.

spring:
  cloud:
    stream:
      function:
        definition: requestMessageConsumer;fakeNewsError

Lastly, I defined my application’s list of input (and output for producers) bindings.

spring:
  cloud:
    stream:
      bindings:
        requestMessageConsumer-in-0:
          destination: fake-news
          group: impartial-outlet
          error-handler-definition: fakeNewsError
          consumer:
            maxAttempts: 1

The definition tells Spring Cloud Stream which beans to use, and the bindings describe the data source or destination, gluing it all together. You can read about the rules in the documentation. The short version is this:

  • A binding starts with the bean’s name.
  • -in or -out define whether it is a consumer or a producer.
  • The -0 at the end has to do with the number of input and output arguments. I have not run into a situation where I needed this, so for this simple consumer, it is -0.

As you may have deduced from the designators “fake-news” and “impartial-outlet” I mentioned earlier, destination refers to the Kafka topic and group to the consumer group. Producers also use the property destination to define the Kafka topic. Keep in mind that this configuration is technology-agnostic. In my case, it refers to Kafka topics and consumer groups. It may map to different concepts in RabbitMQ or other systems.

Something is Wrong

So what about this “fakeNewsError” error-handler-definition? What does that do? As the name suggests, it is an error handler. If you do not define one, you will end up with unhelpful stack traces when 💩 hits the fan.

Caused by: org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'kafka-127127710.requestMessageConsumer-in-0.errors'.

This handler is triggered by Spring Cloud Stream when your application receives data that Spring cannot parse into the POJO you defined. That means your application code is not even called yet. Or, your business logic throws an exception. All these cases will end up in the error handler.

Initially, researching this error led me to this GitHub issue comment. Searching for the documentation proved difficult, even knowing what I was looking for. You can find an explanation in the Spring Cloud Stream error handling section. Unfortunately, the error message does not hint precisely where to look. Every application’s function bean name will be different. Even the “kafka-127127710” prefix changed from run to run. It is hard to scour the Interwebs for a solution, and the reference documentation also contains no mention of the error message, which would allow all-knowing Google to index it. I have spent hours trying to find the source of the error.

Big Picture

You can find the complete example on GitHub.

Famous Last Words

As I mentioned earlier, every supported message or event broker should work as long as you know how to configure it. The coded business logic does not change – which is the beauty of Spring Cloud Stream. I use Kafka in this example since the end goal was to connect to Azure Event Hub using the Kafka interface. I can run Kafka locally, simplifying my development process before deploying to Azure. I could exchange the binders, replace Kafka with Event Hub in the final product, and use Spring profiles to separate the local config from the production config.

The business logic consists of standard Java functions with no mention of KafkaTemplate or other technology-specific integration into my code. Imagine changing the messaging system when your code is littered with specific library code. Not so with Spring Cloud Stream, but this flexibility also comes with a price: a steep learning curve. Many technologies are at play here, and I have not even touched on Project Reactor, which introduces Mono and Flux.

I hope I get around to showing the producer I created in conjunction with the consumer. For reasons I detailed last year, I’m in a giant coding-writing slump that I’m still trying to crawl out of.

Anyway. I hope this was useful, and thank you for reading.

2 thoughts on “Consume Messages With Spring Cloud Stream Kafka

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.