Facebook Pixel

Kafka cơ bản: Cách sử dụng Kafka với Confluent & Go

26 Aug, 2023

Cập nhật kiến thức Kafka cơ bản cho các bạn mới bắt đầu. Bài viết này sẽ hướng dẫn bạn cách sử dụng Kafka với Confluent & Go. Lưu lại ngay nhé!

Kafka cơ bản: Cách sử dụng Kafka với Confluent & Go

Mục Lục

Cập nhật kiến thức Kafka cơ bản cho các bạn mới bắt đầu. Bài viết này sẽ hướng dẫn bạn cách sử dụng Kafka với Confluent & Go. Lưu lại ngay nhé!

Apache Kafka (gọi tắt là Kafka) là một nền tảng phân phối sự kiện phân tán mã nguồn mở được phát triển bởi Apache Software Foundation và được viết bằng JavaScala.

Kafka được tạo ra để giải quyết những thách thức trong việc xử lý lượng dữ liệu khổng lồ trong thời gian thực (real-time), cho phép các ứng dụng xuất bản (publish), đăng ký (subscribe), lưu trữ (store) và xử lý (process) các luồng bản ghi (streaming event) một cách hiệu quả.

Đọc thêm: Kafka là gì? Các thành phần trong Apache Kafka

Kafka là gì? Các thành phần trong Kafka
Kafka là một nền tảng phân phối sự kiện phân tán mã nguồn mở được phát triển bởi Apache Software Foundation và được viết bằng Java và Scala

Với sự ra đời của Confluent Cloud, các nhà phát triển có thể tận dụng Kafka mà không phải đối mặt với sự phức tạp của việc quản lý cơ sở hạ tầng. Confluent Cloud cung cấp dịch vụ Kafka được tự chủ quản lý với các tính năng như tự động mở rộng quy mô, lưu giữ dữ liệu và tích hợp liền mạch với các thành phần khác của Confluent Platform.

Trong bài này, chúng ta sẽ khám phá cách sử dụng Kafka với Confluent Cloud và Go. 200Lab sẽ hướng dẫn từ việc tạo tài khoản Confluent Cloud, thiết lập Kafka Cluster đến khởi tạo clients (producers và consumers) trong Go để gửi và đọc tin nhắn từ Kafka.

Ngoài giải pháp có sẵn của Confluent, bạn có thể tự host Kafka Cluster bằng Docker.

Toàn bộ source code của bài nằm ở repo Github kafka-demo.

Xem Thêm Khóa Học Lập Trình

Yêu cầu kỹ thuật khi sử dụng Kafka với Confluent và Go

Sau đây là những công nghệ bạn cần phải cài đặt nếu muốn sử dụng Kafka với Confluent và Go!

1. Confluent

1.1. Confluent Cloud (CCloud)

Confluent Cloud (gọi tắt là CCloud) là một dịch vụ truyền dữ liệu linh hoạt, có thể mở rộng, dựa trên Apache Kafka và được phân phối dưới dạng dịch vụ tự chủ quản lý.

Bạn cần phải đăng ký tài khoản Confluent Cloud thông qua giao diện web gọi là Confluent Cloud Console. Sau khi đăng ký thành công, bạn sẽ được tặng $400 đô dùng thử trong vòng 1 tháng.

Confluent cung cấp 3 cách để tương tác với Confluent Cloud:

  1. Confluet CLI (giao diện dòng lệnh)
  2. Confluent Cloud Console (giao diện web)
  3. Confluent Cloud APIs (sử dụng APIs)

Đọc thêm: Tài liệu hướng dẫn sử dụng Confluent Cloud của Confluent

1.2. Confluent CLI (không bắt buộc)

Confluent CLI (Command-line Interface) là giao diện dòng lệnh của Confluent, cho phép các nhà phát triển quản lý Confluent Cloud và Confluent Platform.

Để cài đặt và sử dụng các lệnh cơ bản trong Confluent CLI, xin bạn vui lòng đọc bài Hướng dẫn cài đặt và sử dụng Confluent CLI.

2. Go

Chúng ta sẽ dùng ngôn ngữ Go phiên bản mới nhất (1.20) để tạo producers và subscribers (gọi chung là clients) cho Kafka cluster. Bạn có thể cài đặt Go dễ dàng thông qua hướng dẫn cài đặt trên trang chủ của Go.

Đọc thêm: Golang là gì? Backend Developer có nên học Golang

Golang là gì? Backend Developer có nên học Golang?
Go (hay Golang) là ngôn ngữ lập trình mã nguồn mở giúp xây dựng phần mềm dễ dàng, tin cậy và hiệu quả do các kỹ sư hàng đầu Google phát triển

Kafka cơ bản: Cách sử dụng Kafka với Confluent & Go

Bước 1: Tạo topic bên trong Kafka Cluster

Ở mục viết về Confluent phía trên, 200Lab đã liệt kê ra 3 cách để tương tác với Confluent Cloud. Trong phạm vi bài viết, 200Lab chỉ hướng dẫn 2 cách phổ biến là:

  1. Dùng Confluent CLI (giao diện dòng lệnh)
  2. Dùng Confluent Cloud Console (giao diện web)

Cách 1: Dùng Confluent CLI

1. Tạo cluster

Bỏ qua bước này nếu bạn đã có sẵn một cluster.

Bash
confluent kafka cluster create <name> --cloud <cloud provider> --region <cloud region>
Câu lệnh tạo cluster trong Confluent CLI

Ví dụ

Bash
confluent kafka cluster create dev0 --cloud aws --region us-east-1
Ví dụ về câu lệnh tạo cluster trong Confluent CLI

2. Tạo topic bên trong cluster vừa tạo ở bước 1

cluster ID lấy từ output sau khi tạo cluster của bước 2.

Bash
confluent kafka topic create <name> --cluster <cluster ID>
Câu lệnh tạo topic trong Confluent CLI

Ví dụ

Bash
confluent kafka topic create test_topic --cluster <ID of the dev0 cluster>
Ví dụ về câu lệnh tạo topic trong Confluent CLI

Nếu bạn đã có sẵn cluster, chạy câu lệnh sau để lấy thông tin (bao gồm ID) của tất cả cluster.

Bash
confluent kafka cluster list
Câu lệnh lấy thông tin của tất cả các clusters

3. Tạo API key cho cluster

API key dùng để xác thực kết nối giữa clients với cluster.

Bash
confluent api-key create --resource <cluster ID>
tạo API key cho cluster

Cách 2: Dùng Cloud Console

1. Tạo cluster

Có nhiều lựa chọn để tạo cluster tương ứng với các cấu hình khác nhau. Ở phạm vi demo của bài này, chúng ta sẽ chọn cluster Basic.

kafka-co-ban
Giao diện tạo Kafka cluster trên Confluent Cloud

2. Tạo topic bên trong cluster vừa tạo ở bước 1

Từ menu điều hướng, click vào Topics, sau đó click vào nút Add Topic.

kafka-co-ban

Hiện tại, Confluent Cloud không giới hạn số partitions của topic, giá trị mặc định là 6.

kafka-co-ban
Giao diện tạo Kafka topic trên Confluent Cloud

Việc lựa chọn số partition phụ thuộc vào nhiều yếu tố như:

  • Resource overhead (Chi phí tài nguyên): Mỗi partitions yêu cầu một số tài nguyên hệ thống về bộ nhớ và xử lý tệp. Số lượng partitions rất cao có thể chiếm dụng tài nguyên của brokers.
  • Latency (Độ trễ): Số lượng partitions cao có thể dẫn tới tăng thời gian xử lý gây ra bởi chi phí quản lý và điều phối số lượng lớn partitions.
  • ZooKeeper load (Mức tải của ZooKeeper): ZooKeeper dùng để quản lý metadata của Kafka. Nếu partitions tăng số lượng lớn thì ZooKeeper phải quan lý nhiều metadata hơn dẫn tới mức tải của ZooKeeper sẽ tăng theo.
  • Throughput (Thông lượng): Tăng partitions giúp tăng thông lượng do nhiều consumers cùng đọc đồng thời từ nhiều partitions khác nhau.
  • Scalability (Độ mở rộng): Số parttions tăng sẽ giúp việc xử lý events tốt hơn giữa các brokers.

3. Tạo API key cho cluster

Từ menu điều hướng, click vào API Keys, sau đó click vào nút Add Key. Sau đó, điền mô tả cho key (nếu cần) và tải xuống file chứa key và secret.

kafka-co-ban
Giao diện tạo API key cho cluster

Bước 2: Setup producer

Confluent cung cấp sẵn một thư viện confluent-kafka-go để giúp clients kết nối với Confluent Cloud thông qua ngôn ngữ Go. Ngoài ra, bạn có thể xem code mẫu cho nhiều trường hợp khác nhau ở folder /sample.

Trong bài, 200Lab sẽ khởi tạo một project Go có cấu trúc như sau:

Bash
|-- kafka-using-confluent/
	|-- consumer.go
	|-- producer.go
	|-- README.md
	|-- go.mod
	|-- go.sum
Cấu trúc của project chứa client code

Mở terminal, khởi tạo dự án Go chứa clients code.

Bash
# Tạo folder để chứa source code
mkdir kafka-using-confluent
# Khởi tạo Go project
go mod init kafka-demo
Khởi tạo project Go chứa client code

Tạo file producer.go trong folder kafka-using-confluent để gửi message tới Kafka.

Bash
touch producer.go
Tạo file `producer.go`

Message của chúng ta có dạng JSON như sau:

JSON
{
  "id": 5950,
  "quantity": 3,
  "item_type": "guitar",
  "price_per_unit": "BMg="
}
JSON Message

Nội dung file producer.go như sau:

Go
# producer.go
package main

import (
	"fmt"
	"os"
	"time"

	"github.com/confluentinc/confluent-kafka-go/kafka"
	"github.com/confluentinc/confluent-kafka-go/schemaregistry"
	"github.com/confluentinc/confluent-kafka-go/schemaregistry/serde"
	"github.com/confluentinc/confluent-kafka-go/schemaregistry/serde/jsonschema"
)

// Purchase is a simple record example
type Purchase struct {
	Id           int    `json:"id"`
	Quantity     int    `json:"quantity"`
	ItemType     string `json:"item_type"`
	PricePerUnit string `json:"price_per_unit"`
}

func main() {
	// Get env
	if len(os.Args) < 6 {
		fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n", os.Args[0])
		os.Exit(1)
	}

	bootstrapServers := os.Args[1]
	clusterApiKey := os.Args[2]
	clusterApiSecret := os.Args[3]
	topic := os.Args[4]
	schemaRegistryUrl := os.Args[5]
	schemaRegistryApiKey := os.Args[6]
	schemaRegistryApiSecret := os.Args[7]

	hostname, err := os.Hostname()
	if err != nil {
		fmt.Printf("Failed to get hostname: %s", err)
		os.Exit(1)
	}

	// Create producer
	p, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers": bootstrapServers,
		"sasl.mechanisms":   "PLAIN",
		"security.protocol": "SASL_SSL",
		"client.id":         hostname,
		"sasl.username":     clusterApiKey,
		"sasl.password":     clusterApiSecret,
		"acks":              "all",
	})
	if err != nil {
		fmt.Printf("Failed to create producer: %s", err)
		os.Exit(1)
	}

	// Create schema register client
	client, err := schemaregistry.NewClient(schemaregistry.NewConfigWithAuthentication(
		schemaRegistryUrl,
		schemaRegistryApiKey,
		schemaRegistryApiSecret,
	))
	if err != nil {
		fmt.Printf("Failed to create schema registry client: %s\n", err)
		os.Exit(1)
	}

	ser, err := jsonschema.NewSerializer(client, serde.ValueSerde, jsonschema.NewSerializerConfig())
	if err != nil {
		fmt.Printf("Failed to create serializer: %s\n", err)
		os.Exit(1)
	}

	// Serialize message
	msg := Purchase{
		Id:           5950,
		Quantity:     3,
		ItemType:     "guitar",
		PricePerUnit: "BMg=",
	}
	msgBytes, err := ser.Serialize(topic, &msg)
	if err != nil {
		fmt.Printf("Failed to serialize payload: %s\n", err)
		os.Exit(1)
	}

	// Asynchronous writes
	deliveryChan := make(chan kafka.Event, 10000)
	err = p.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &topic, Partition: kafka.PartitionAny},
		Value:          msgBytes,
		Key:            []byte(time.Now().UTC().Format(time.RFC1123)),
		Headers:        []kafka.Header{{Key: "myTestHeader", Value: []byte("header values are binary")}},
	}, deliveryChan)
	if err != nil {
		fmt.Printf("Failed to deliver message: %s", err)
		os.Exit(1)
	}

	// Go-routine to handle message delivery reports and possibly other event types (errors, stats, etc)
	go func() {
		for e := range p.Events() {
			switch ev := e.(type) {
			case *kafka.Message:
				if ev.TopicPartition.Error != nil {
					fmt.Printf("Failed to deliver: %v\n", ev.TopicPartition.Error)
				} else {
					fmt.Printf("Delivered message to topic %s partition [%d] @ offset %v\n",
						*ev.TopicPartition.Topic, ev.TopicPartition.Partition, ev.TopicPartition.Offset)
				}
			case kafka.Error:
				// Generic client instance-level errors, such as
				// broker connection failures, authentication issues, etc.
				//
				// These errors should generally be considered informational
				// as the underlying client will automatically try to
				// recover from any errors encountered, the application
				// does not need to take action on them.
				fmt.Printf("Error: %v\n", ev)
			default:
				fmt.Printf("Ignored event: %s\n", ev)
			}
		}
	}()

	// Wait for all messages to be delivered
	// Flush and close the producer and the events channel
	for p.Flush(1000) > 0 {
		fmt.Print("Still waiting to flush outstanding messages\n")
	}
	p.Close()
}
Source code cho file producer.go

Chúng ta sẽ cần chạy 2 terminal. Terminal 1 cho producer tạo message tới Kafka. Terminal 2 cho consumer đọc message từ Kafka.

Chạy dòng lệnh sau trên terminal 1, truyền vào các tham số cần thiết để tạo producer và gửi message tới Kafka.

Bash
go run producer.go \
<bootstrap-servers> \
<cluster-api-key> \
<cluster-api-secret> \
<topic> \
<schema-registry-url> \
<schema-registry-api-key> \
<schema-registry-api-secret>
Script tạo producer

Trong câu lệnh trên, ta có các tham số cần thiết là:

  1. bootstrap-servers: danh sách các brokers mà clients sẽ tương tác với. Ví dụ, host1:port1,host2:port2.
  2. cluster-api-key: API key của cluster.
  3. cluster-api-secret: API secret của cluster.
  4. topic: topic mà producer sẽ gửi message tới.
  5. schema-registry-url: API endpoint để các ứng dựng tương tác với Schema Registry.

Một event mới tạo ra sẽ trông như sau ở Confluent Console:

kafka-co-ban
Event mới tạo trên Cluster
kafka-co-ban
Value của event mới tạo trên Cluster

Bước 3: Setup consumer

  • Tạo file consumer.go trong folder kafka-using-confluent. File này dùng để tạo consumer để đọc message từ Kafka và hiển thị ra console.
Go
# consumer.go
package main

import (
	"fmt"
	"os"
	"os/signal"
	"syscall"

	"github.com/confluentinc/confluent-kafka-go/kafka"
)

func main() {
	if len(os.Args) < 5 {
		fmt.Fprintf(os.Stderr, "Usage: %s <bootstrap-servers> <group> <topics..>\n", os.Args[0])
		os.Exit(1)
	}

	bootstrapServers := os.Args[1]
	clusterApiKey := os.Args[2]
	clusterApiSecret := os.Args[3]
	group := os.Args[4]
	topics := os.Args[5:]

	sigchan := make(chan os.Signal, 1)
	signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM)

	c, err := kafka.NewConsumer(&kafka.ConfigMap{
		"bootstrap.servers": bootstrapServers,
		// Avoid connecting to IPv6 brokers:
		// This is needed for the ErrAllBrokersDown show-case below
		// when using localhost brokers on OSX, since the OSX resolver
		// will return the IPv6 addresses first.
		// You typically don't need to specify this configuration property.
		"broker.address.family": "v4",
		"sasl.mechanisms":       "PLAIN",
		"security.protocol":     "SASL_SSL",
		"sasl.username":         clusterApiKey,
		"sasl.password":         clusterApiSecret,
		"group.id":              group,
		"session.timeout.ms":    6000,
		// Start reading from the first message of each assigned
		// partition if there are no previously committed offsets
		// for this group.
		"auto.offset.reset": "earliest",
		// Whether we store offsets automatically.
		"enable.auto.offset.store": false,
	})
	if err != nil {
		fmt.Fprintf(os.Stderr, "Failed to create consumer: %s\n", err)
		os.Exit(1)
	}

	fmt.Printf("Created Consumer %v\n", c)

	if err := c.SubscribeTopics(topics, nil); err != nil {
		fmt.Fprintf(os.Stderr, "Failed to subcribe topics %s %s\n", topics, err)
		os.Exit(1)
	}

	run := true

	for run {
		select {
		case sig := <-sigchan:
			fmt.Printf("Caught signal %v: terminating\n", sig)
			run = false
		default:
			// Retrieve records one-by-one
			ev := c.Poll(100)
			if ev == nil {
				continue
			}

			switch e := ev.(type) {
			case *kafka.Message:
				// Process the message received.
				fmt.Printf("%% Message on %s:\n%s\n",
					e.TopicPartition, string(e.Value))
				if e.Headers != nil {
					fmt.Printf("%% Headers: %v\n", e.Headers)
				}

				// We can store the offsets of the messages manually or let
				// the library do it automatically based on the setting
				// enable.auto.offset.store. Once an offset is stored, the
				// library takes care of periodically committing it to the broker
				// if enable.auto.commit isn't set to false (the default is true).
				// By storing the offsets manually after completely processing
				// each message, we can ensure atleast once processing.
				if _, err := c.StoreMessage(e); err != nil {
					fmt.Fprintf(os.Stderr, "%% Error storing offset after message %s:\n", e.TopicPartition)
				}
			case kafka.Error:
				// Errors should generally be considered
				// informational, the client will try to automatically recover.
				// But in this example we choose to terminate the application if all brokers are down.
				fmt.Fprintf(os.Stderr, "%% Error: %v: %v\n", e.Code(), e)
				if e.Code() == kafka.ErrAllBrokersDown {
					run = false
				}
			default:
				fmt.Printf("Ignored %v\n", e)
			}
		}
	}

	fmt.Printf("Closing consumer\n")
	c.Close()
}
Source code cho file consumer.go

Mở terminal 2, chạy dòng lệnh sau để tạo consumer đọc message từ Kafka.

Bash
# Chạy consumer, truyền vào các tham số cần thiết
go run consumer.go \
<bootstrap-servers> \
<cluster-api-key> \
<cluster-api-secret> \
<consumer-group-id> \
<topic-1> \
<topic-2> \
...
<topic-N>
Câu lệnh tạo consumer

Trong câu lệnh trên, ta có các tham số cần thiết là:

  1. bootstrap-servers: danh sách các brokers mà clients sẽ tương tác với. Ví dụ, host1:port1,host2:port2.
  2. cluster-api-key: API key của cluster
  3. cluster-api-secret: API secret của cluster
  4. consumer-group-id: ID của consumer group
  5. topic-1, topic-2, topic-N: consumers subscribe tới nhiều topic 1, 2, ..., N.

Tổng kết

Trong bài viết, chúng ta đã cùng nhau thiết lập Kafka trên Confluent Cloud, khởi tạo clients bằng ngôn ngữ Go để gửi và nhận messages. Hi vọng bài viết sẽ giúp bạn trang bị kiến thức cơ bản về cách sử dụng Kafka trong Confluent Cloud và Go.

(*) Bài viết này có tham khảo thông tin ở Confluent & Tổng quan về Schema Registry.

Vậy là giờ đây, bạn đã có được những kiến thức bổ ích về Kafka cơ bản cũng nhưng cách sử dụng Kafka với Confluent & Go. Học vấn là chặng hành trình dành đòi hỏi nhiều kiên nhẫn. Hãy đọc thêm các bài viết mới trên Trang Blog Công Nghệ & Lập Trình của 200Lab nhé.

Xem Thêm Khóa Học Lập Trình

Một vài bài viết liên quan có thể bạn sẽ thích:

Kafka là gì? Các thành phần trong Kafka
Redux là gì? Tìm hiểu Redux cơ bản cho người mới bắt đầu
Quản lý Schema và Avro Serialization trong hệ thống Apache Kafka
Blockchain là gì? Ưu & nhược điểm của các ứng dụng Blockchain
Flutter cơ bản: Điều cần biết khi lập trình ứng dụng đầu tiên

Bài viết liên quan

Lập trình backend expressjs

xây dựng hệ thống microservices
  • Kiến trúc Hexagonal và ứng dụngal font-
  • TypeScript: OOP và nguyên lý SOLIDal font-
  • Event-Driven Architecture, Queue & PubSubal font-
  • Basic scalable System Designal font-

Đăng ký nhận thông báo

Đừng bỏ lỡ những bài viết thú vị từ 200Lab