Skip to content

Build a kafka application in Python and Go

Published: at 06:59 PM

Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Some of the benefits are:

Some of the real-life use cases for Kafka are clickstream analysis, credit card fraud detection, internet of things. Overall, Kafka is a great platform to use if you need to have real-time data processing.

Components

The high level architecture of Kafka is illustrated below. It may look a bit confusing, we will decompose it and hopefully it will be clear after this section.

IR systems

Kafka implementation (on premise)

We will go through the steps you need in order to have Kafka up and running into your own server. Before starting, you should have Apache Kafka installed into your machine. In case you don’t, please follow the following walk-throughs for ubuntu and mac.

After having Kafka in your machine, you follow the following steps in order to get started:

  1. Start Kafka: sudo systemctl start kafka
  2. Check Kafka status (it should be active): sudo systemctl status kafka
  3. Start Zookeeper: sudo systemctl enable zookeeper
  4. Enable Kafka: sudo systemctl enable kafka

Python implementation

Producer

We should create first the producer which will be publishing messages to the topic.

from kafka import KafkaProducer

# Create a Kafka producer instance
producer = KafkaProducer(
    bootstrap_servers=['localhost:9092'], # list of brokers
    value_serializer=lambda x: x.encode('utf-8') # Convert data to bytes
)

# Send a message to a topic
producer.send(
            # topic you will be publishing
            topic='test-group', 
            # you can find the list of partitions 
            # producer.partitions_for('test-topic')
            # Partition you will be publishing
            partition=0,
            # must be bytes or serializable to bites
            value='Hello, Kafka!' 
            )

# Make all buffered records available to be published
producer.flush()
Consumer
We will create one consumer which will be subscribing to the topic we publish to with Producer.

from kafka import KafkaConsumer

# Create a Kafka consumer instance
consumer = KafkaConsumer(
    'test-topic',
    # list of brokers
    bootstrap_servers=['localhost:9092'],
    # Start from the earliest message in the topic
    auto_offset_reset='earliest',
    # The id of the group consumer belongs to
    group_id='test-group',
    # Convert bytes to data
    value_deserializer=lambda x: x.decode('utf-8'))

# Read messages from the topic
for message in consumer:
    print(message.value)

Go implementation

Producer

We should create first the producer which will be publishing messages to the topic.

package main

import (
    "fmt"
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Producer.Return.Successes = true

    producer, err := sarama.NewSyncProducer(
        []string{"localhost:9092"}, // Arrays of brokers
        config,
    )
    if err != nil {
        log.Fatalf("Failed to start producer: %s", err)
    }
    // make sure you close the producer
    defer func() {
        if err := producer.Close(); err != nil {
            log.Fatalf("Failed to close producer cleanly: %s", err)
        }
    }()

    topic := "test-group"
    msg := &sarama.ProducerMessage{
        Topic: topic,                                 // Topic name
        Value: sarama.StringEncoder("Hello, Kafka!"), // message content
    }

    partition, offset, err := producer.SendMessage(msg)
    if err != nil {
        log.Fatalf("Failed to send message to Kafka: %s", err)
    }

    fmt.Printf("Message sent to partition %d at offset %d\n",
        partition,
        offset)
}

Consumer

package main

import (
    "fmt"
    "log"

    "github.com/Shopify/sarama"
)

func main() {
    config := sarama.NewConfig()
    config.Consumer.Return.Errors = true

    consumer, err := sarama.NewConsumer([]string{"localhost:9092"}, config)
    if err != nil {
        log.Fatalf("Failed to start consumer: %s", err)
    }
    // make sure you close the consumer
    defer func() {
        if err := consumer.Close(); err != nil {
            log.Fatalf("Failed to close consumer cleanly: %s", err)
        }
    }()

    topic := "test-group"
    partition := int32(0)
    // Start from the earliest message in the topic
    offset := sarama.OffsetNewest 

    partitionConsumer, err := consumer.ConsumePartition(topic, partition, offset)
    if err != nil {
        log.Fatalf("Failed to start partition consumer: %s", err)
    }
    defer func() {
        if err := partitionConsumer.Close(); err != nil {
            log.Fatalf("Failed to close partition consumer cleanly: %s", err)
        }
    }()

    for {
        select {
        case msg := <-partitionConsumer.Messages():
            fmt.Printf("Received message: %s\n", msg.Value)
        case err := <-partitionConsumer.Errors():
            log.Fatalf("Error consuming message: %s", err)
        }
    }
}