Microservices with Spring Boot : Event driven architecture with Apache Kafka

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Kafka combines three key capabilities so you can implement your use cases for event streaming end-to-end with a single battle-tested solution:

  1. To publish (write) and subscribe to (read) streams of events, including continuous import/export of your data from other systems.

  2. To store streams of events durably and reliably for as long as you want.

  3. To process streams of events as they occur or retrospectively.

And all this functionality is provided in a distributed, highly scalable, elastic, fault-tolerant, and secure manner. Kafka can be deployed on bare-metal hardware, virtual machines, and containers, and on-premises as well as in the cloud. You can choose between self-managing your Kafka environments and using fully managed services offered by a variety of vendors.

In this article we shall work on Wikimedia events stream.

We will setup a local Kafka cluster on our local machine and build two microservices, one that will publish messages to Kafka and other will consume the messages and store them in MySQL database.

Set up Kafka on your machine.

  1. Download Kafka from kafka.apache.org/downloads

  2. For instructions to set up the Kafka environment, refer to kafka.apache.org/quickstart

For Mac and Linux users,

Open terminal and navigate to the downloaded kafka directory.

Start the Zookeeper service

bin/zookeeper-server-start.sh config/zookeeper.properties

In another terminal instance, start the Kafka broker service.

bin/kafka-server-start.sh config/server.properties

For Windows users

Open command prompt and navigate to the downloaded kafka directory.

Start the Zookeeper service

bin\wndows\zookeeper-server-start.bat config\zookeeper.properties

In another command prompt instance, start the Kafka broker service.

bin\windows\kafka-server-start.bat config\server.properties

The Kafka broker will be up at port 9092.

Set up the Producer Microservice

  1. Go to start.spring.io

  2. For this article, we will create a maven project. Add the following dependencies

  • Spring for Apache Kafka

  • Lombok

  1. Generate the project and open it in any IDE (like IntelliJ, Eclipse, VSCode, etc.).

Now in the producer microservice, we have the following dependencies

producer/pom.xml

<dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

To process the wikimedia stream, we need to create the event source and event handlers. For this purpose, we will add the the following dependencies in pom.xml of producer microservice.

        <!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
        <dependency>
            <groupId>com.launchdarkly</groupId>
            <artifactId>okhttp-eventsource</artifactId>
            <version>2.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.4.1</version>
        </dependency>

Finally, the dependencies in pom.xml for producer microservice will look like :

producer/pom.xml

<dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.launchdarkly/okhttp-eventsource -->
        <dependency>
            <groupId>com.launchdarkly</groupId>
            <artifactId>okhttp-eventsource</artifactId>
            <version>2.5.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.13.2</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind -->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.13.4.1</version>
        </dependency>

    </dependencies>

Next, we will add the Kafka related properties in application.properties file

application.properties

spring.kafka.producer.bootstrap-servers: localhost:9092
spring.kafka.producer.key-serializer: org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer: org.apache.kafka.common.serialization.StringSerializer

For this article, we will serialize and deserialize the messages as Strings, hence we specifies StringSerializer and StringDeserializer respectively.

Next we will create a class that will contain the name of the topic which we will publish message to from the publisher , as a static field.

producer/KafkaTopic.java

public class KafkaTopic
{
    public static final String WIKIMEDIA_STREAM_TOPIC="wikimedia_stream_topic";
}

Next, we will create a configuration class to create a Topic Bean.

producer/KafkaTopicConfig.java

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.TopicBuilder;

@Configuration
public class KafkaTopicConfig
{
     @Bean
     public NewTopic topic()
     {
          return TopicBuilder.name(KafkaTopic.WIKIMEDIA_STREAM_TOPIC)
                  .build();
     }
}

Next, we will create an implementation of Event Handler interface, to process the event stream and publish message to Kafka.

This class will have Topic name and KafkaTemplate as parameters. For this article, we will only provide implementation for onMessage(..) overridden method.

producer/WikimediaChangesHandler.java

import com.launchdarkly.eventsource.EventHandler;
import com.launchdarkly.eventsource.MessageEvent;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.core.KafkaTemplate;

public class WikimediaChangesHandler implements EventHandler {

    private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaChangesHandler.class);
    private KafkaTemplate<String,String> kafkaTemplate;
    private String topic;

    public WikimediaChangesHandler(KafkaTemplate<String, String> kafkaTemplate, String topic) {
        this.kafkaTemplate = kafkaTemplate;
        this.topic = topic;
    }
    @Override
    public void onMessage(String s, MessageEvent messageEvent) throws Exception {

        kafkaTemplate.send(topic, messageEvent.getData());

        LOGGER.info(String.format("Event data published -> %s", messageEvent.getData()));

    }

    @Override
    public void onOpen() throws Exception { }

    @Override
    public void onClosed() throws Exception { }

    @Override
    public void onComment(String s) throws Exception { }

    @Override
    public void onError(Throwable throwable) { }
}

Next, lets create a producer class that will connect to the events stream and start the publishing.

producer/WikimediaProducer.java

import java.net.URI;
import java.util.concurrent.TimeUnit;

@Service
public class WikimediaProducer
{
    private static final Logger LOGGER = LoggerFactory.getLogger(WikimediaProducer.class);

    private KafkaTemplate<String, String> kafkaTemplate;

    public WikimediaProducer(KafkaTemplate<String, String> kafkaTemplate) {
        this.kafkaTemplate = kafkaTemplate;
    }

    public void publishMessageToKafka() throws InterruptedException {
        String topic = KafkaTopic.WIKIMEDIA_STREAM_TOPIC;

        EventHandler eventHandler = new WikimediaChangesHandler(kafkaTemplate,topic);

        String url = "https://stream.wikimedia.org/v2/stream/recentchange";

        EventSource.Builder builder = new EventSource.Builder(eventHandler, URI.create(url));

         EventSource eventSource = builder.build();
         eventSource.start();

         // Adding a delay of 5 minutes for simulation purpose.
         TimeUnit.MINUTES.sleep(5);
    }

}

To test our service, we will create an implementation of CommandLineRunner , so that when the application starts, our publisher will start publishing.

producer/StreamSimulationRunner.java

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.stereotype.Component;

@Component
public class StreamSimulationRunner implements CommandLineRunner {

    @Autowired
    private WikimediaProducer wikimediaProducer;

    @Override
    public void run(String... args) throws Exception
    {
        wikimediaProducer.publishMessageToKafka();
    }
}

Set up the Consumer Microservice

  1. Go to start.spring.io

  2. For this article, we will create a maven project. Add the following dependencies

  • Spring for Apache Kafka

  • Lombok

  • Spring Data JPA

  • MySQL Driver

  1. Generate the project and open it in any IDE (like IntelliJ, Eclipse, VSCode, etc.).

The consumer microservice will have following dependencies.

consumer/pom.xml

    <dependencies>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-jpa</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>com.mysql</groupId>
            <artifactId>mysql-connector-j</artifactId>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

    </dependencies>

Next, we will add the Kafka consumer, MySQL and JPA configurations in application.properties file.

consumer/application.properties

# Kafka consumer configs
spring.kafka.consumer.bootstrap-servers: localhost:9092
spring.kafka.consumer.group-id: dbConsumerGroup
spring.kafka.consumer.auto-offset-reset: earliest
spring.kafka.consumer.key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

# MySQL configs
spring.datasource.url=jdbc:mysql://localhost:3306/wikimediadb
spring.datasource.username=<your username>
spring.datasource.password=<your password>


# JPA configs
spring.jpa.properties.hibernate.dialect=org.hibernate.dialect.MySQL8Dialect
spring.jpa.hibernate.ddl-auto=update
spring.jpa.properties.hibernate.show-sql=true
spring.jpa.properties.hibernate.use_sql_comments=true
spring.jpa.properties.hibernate.format_sql=true

Next we will create a class that will contain the name of the topic which we will publish message to from the publisher , as a static field.

consumer/KafkaTopic.java

public class KafkaTopic
{
    public static final String WIKIMEDIA_STREAM_TOPIC="wikimedia_stream_topic";
}

Next, we will create an entity for our Wikimedia stream message.

consumer/entity/WikimediaModel

import lombok.Getter;
import lombok.Setter;

import javax.persistence.*;

@Entity
@Table(name = "wikimedia_recent_changes")
@Getter
@Setter
public class WikimediaModel
{
    @Id
    @GeneratedValue(strategy = GenerationType.IDENTITY)
    private Long id;

    @Lob
    private String wikimediaEventData;
}

Next will create a JPA repository for persisting data in database.

consumer/WikimediaRepository.java

import com.umang345.wikimediaconsumermicroservice.entity.WikimediaModel;
import org.springframework.data.jpa.repository.JpaRepository;
import org.springframework.stereotype.Repository;

@Repository
public interface WikimediaRepository extends JpaRepository<WikimediaModel,Long>
{ }

Next we will create a consumer class that will consume messages from Kafka, and usinf JPA repository, save them in the database.

consumer/KafkaWikiDBConsumer.java

import com.umang345.wikimediaconsumermicroservice.config.KafkaTopic;
import com.umang345.wikimediaconsumermicroservice.entity.WikimediaModel;
import com.umang345.wikimediaconsumermicroservice.repository.WikimediaRepository;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

@Service
public class KafkaWikiDBConsumer
{
    private static final Logger LOGGER = LoggerFactory.getLogger(KafkaWikiDBConsumer.class);

    private WikimediaRepository dataRepository;

    public KafkaWikiDBConsumer(WikimediaRepository dataRepository) {
        this.dataRepository = dataRepository;
    }

    @KafkaListener(topics= KafkaTopic.WIKIMEDIA_STREAM_TOPIC
                ,groupId = "dbConsumerGroup")
    public void consume(String eventMessage)
    {
        LOGGER.info(String.format("Event message consumed -> %s", eventMessage));

        WikimediaModel wikimediaModel = new WikimediaModel();
        wikimediaModel.setWikimediaEventData(eventMessage);

        dataRepository.save(wikimediaModel);
    }
}

Test

Start the Kafka broker and then start both the services. You can verify in the console messages being published to Kafka and then being consumed by the consumer. Also you can verify them being stored in the database.

I hope you found the article useful.

Lets connect :

Happy Coding :) .