Skip to content

Kafka

Установка kafka

  • Установить сам пакет
  • Запустить kafka в docker
shell
docker pull apache/kafka:3.7.0
docker run -p 9092:9092 apache/kafka:3.7.0

Создать топик

Команды нужно выполнять в папке с исходниками kafka

shell
bin/kafka-topics.sh --create --topic topic-name --bootstrap-server localhost:9092

Создать producer и отправить события в топик

shell
bin/kafka-console-producer.sh --topic topic-name --bootstrap-server localhost:9092

Создать consumer и считать события из топика

shell
bin/kafka-console-consumer.sh --topic topic-name --from-beginning --bootstrap-server localhost:9092

Работа с kafka из vscode

  • Установить расширение jeppeandersen.vscode-kafka
  • Можно увидеть брокеры, топики, консьюмер группы
  • Можно запустить consumer на выбранном топике
  • Можно удалить consumer

Работа с kafka из node.js приложения

  • Установить пакет kafkajs
shell
npm install kafkajs

Пример использования kafkajs:

js
const { Kafka } = require('kafkajs');

const kafka = new Kafka({
  clientId: 'kafka-node-js',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'kafka-node-js-consumer-group' });

const run = async () => {
  // Producing
  await producer.connect();
  await producer.send({
    topic: 'topic-name',
    messages: [{ value: 'Hello from node.js' }],
  });

  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-name', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      });
    },
  });
};

run().catch(console.error);

Работа с kafka из bun.js приложения

  • Установить пакет kafkajs
shell
bun add kafkajs

Пример использования kafkajs:

js
import { Kafka } from 'kafkajs'

const kafka = new Kafka({
  clientId: 'kafka-bun-js',
  brokers: ['localhost:9092'],
});

const producer = kafka.producer();
const consumer = kafka.consumer({ groupId: 'kafka-bun-js-consumer-group' });

const run = async () => {
  // Producing
  await producer.connect();
  await producer.send({
    topic: 'topic-name',
    messages: [{ value: 'Hello from bun.js' }],
  });

  // Consuming
  await consumer.connect();
  await consumer.subscribe({ topic: 'topic-name', fromBeginning: true });

  await consumer.run({
    eachMessage: async ({ partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value?.toString() || '',
      });
    },
  });
};

run().catch(console.error);

Работа с kafka из go приложения

  • Установить пакет kafka-go
shell
go get github.com/segmentio/kafka-go

Пример использования kafka-go:

Бесконечный цикл при чтении сообщений не выполняется постоянно, каждый раз происходит ожидание следующего сообщения

go
package main

import (
	"context"
	"fmt"
	"log"
	"time"

	"github.com/segmentio/kafka-go"
)

func main() {
	go readMessages()
	writeMessage()

	for {
		// Предотвращает завершение программы.
		time.Sleep(time.Hour)
	}
}

func readMessages() {
	r := kafka.NewReader(kafka.ReaderConfig{
		Brokers:   []string{"localhost:9092"},
		Topic:     "quickstart-events",
		Partition: 0,
	})

	for {
		m, err := r.ReadMessage(context.Background())

		if err != nil {
			break
		}

		fmt.Printf("message at offset %d: %s = %s\n", m.Offset, string(m.Key), string(m.Value))
	}

	if err := r.Close(); err != nil {
		log.Fatal("failed to close reader:", err)
	}
}

func writeMessage() {
	w := &kafka.Writer{
		Addr:  kafka.TCP("localhost:9092"),
		Topic: "quickstart-events",
	}

	err := w.WriteMessages(context.Background(),
		kafka.Message{
			Value: []byte("Hello kafka from golang"),
		},
	)

	if err != nil {
		log.Fatal("failed to write messages:", err)
	}

	if err := w.Close(); err != nil {
		log.Fatal("failed to close writer:", err)
	}
}