Produce Messages With Spring Cloud Stream Kafka

Update:

In a recent post, I explained how to create a Kafka Consumer application with Spring Boot using Spring Cloud Stream with the Kafka Binder. In this installment, I explain how to build the other side of the Kafka connection: the producer.

The main concepts are the same. The most significant change is that instead of a Consumer<T>, you implement a Supplier<T>.

Brokering Messages

The first thing you need is a Kafka cluster. The project’s website has a very concise and excellent quick-start guide. I followed it to the letter, and it immediately worked on my Mac. Note that Kafka does not run natively on Windows. You will have to rely on WSL for that.

(I honestly just copied this section from the other blog post).

Producing Messages to Broker

Spring Cloud Stream supports two programming models for producers: imperative and reactive. The difference between the two lies in who triggers the supplier and how often. In the imperative approach, Spring Cloud Stream will invoke Supplier::get() periodically – every second by default. Using the reactive style, the framework ensures that this happens only once, assuming the returned Flux<T> is a stream of data. It usually does not make sense to invoke that more often (exceptions prove the rule).

I will show a reactive implementation based on Flux<T> and Sinks.Many<T>. When you add data to the reactive stream, it is pushed to the message broker.

Here are the bean definitions. You can think of the Sinks.Many<T> bean as an “OutputStream” to the Flux<T>. The code does not interact with the flux but instead utilizes the sink to publish messages to the stream. You can find way more details in the Reactor Core documentation.

@Configuration
public class ProducerConfig {

    @Bean
    public Sinks.Many<Message<RequestMessage>> messageSink() {
        return Sinks.many().unicast().onBackpressureBuffer();
    }

    @Bean
    public RequestMessageProducer requestMessageProducer() {
        return new RequestMessageProducer(messageSink());
    }
}

And here is the implementation. Like the consumer example, I prefer keeping business logic out of configuration classes. Therefore, I have created a RequestmessageProducer class that implements the Supplier<T> interface.

@Slf4j
@RequiredArgsConstructor
public class RequestMessageProducer 
        implements Supplier<Flux<Message<RequestMessage>>> {

    private final Sinks.Many<Message<RequestMessage>> messageSink;

    public void sendMessage(String alternativeFact) {

        var payload = new RequestMessagePayload(alternativeFact);
        var message = new RequestMessage("lie", "Liar", payload);

        messageSink.emitNext(new GenericMessage<>(message), Sinks.EmitFailureHandler.FAIL_FAST);
    }

    @Override
    public Flux<Message<RequestMessage>> get() {
        return messageSink.asFlux()
                .doOnNext(m -> log.info("Manually sending message {}", m))
                .doOnError(t -> log.error("Error encountered", t));
    }
}

The get() method only exists for Spring Cloud Stream and is not supposed to be used by any client code. sendMessage() is the tool for that. As you can see, it utilizes the Sinks.Many::emitNext() method to add data to the stream. The Sinks.Many::asFlux() method turns it into something you can use in a reactive message supplier.

I have chosen the most straightforward way how you can implement a consumer. Be aware that the documentation recommends the tryEmit* methods instead.

Let’s YAML the Configuration

The configuration is very similar to the consumer example. If you understand how that works, the producer is a piece of cake.

server:
  port: 8081
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoCreateTopics: false
      function:
        definition: requestMessageProducer
      bindings:
        requestMessageProducer-out-0:
          destination: fake-news
          producer:
            partitionKeyExpression: payload.type

The Kafka settings are the same. The producer bean is listed in the spring.cloud.stream.function.definition property to instruct Spring Cloud Stream to consider RequestMessageProducer for binding. In the bindings section, you configure the supplier bean as an output that publishes to the “fake-news” Kafka topic.

I added an optional producer configuration that is technically not required. Suppose your messages need processing in order and you set up your topic with more than one partition. In that case, you must specify a partition key that ensures that messages with the same key always end up in the same partition and, consequently, at the same consumer. You do that through the producer property.

payload is an identifier provided by Spring Cloud Stream. It refers to your message object, which is RequestMessage in this example. You can use SpEL to describe the partition key. If you wanted RequestMessagePayload#alternativeFact to be the key, you would set payload.payload.alternativeFact as the value. It looks odd for my example because I also named the real payload object “payload”.

Big Picture

You can find the complete example on GitHub.

Famous Last Words

I created separate applications to demonstrate how to create a producer and a consumer using Spring Cloud Stream and the Kafka Binder. If you want, you can put both in a single application, of course. It depends on the use case, and a more complex application will likely have several inputs and outputs simultaneously.

I have successfully used the same configuration to connect a Kafka Binder-based application to the Azure Event hub. It was a struggle, but only because the documentation contradicts itself. Once I knew how things worked, I only had to replace the broker and set the desired way of authentication. However, this is a topic for another day, and I will leave you with this cliffhanger.

Thank you for reading.

2 thoughts on “Produce Messages With Spring Cloud Stream Kafka

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

Connecting to %s

This site uses Akismet to reduce spam. Learn how your comment data is processed.