Trong bài Concurrency pattern trong Go - Phần 1, chúng ta đã tìm hiểu 3 pattern quan trọng là confinement, vòng lặp for select
và ngăn goroutine leak. Ở phần này, chúng ta sẽ tìm hiểu tiếp 3 pattern quan trọng khác, bao gồm: or
channel, xử lý lỗi và pipeline.
1. Or
channel
Ở phần 1, tôi đề cập tới pattern done
channel dùng để dừng một goroutine. Đôi khi, một goroutine có nhiều điều kiện dừng. Pattern or
channel sinh ra để giải quyết bài toán này.
Or
Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng. Pattern này tạo ra một channel tổng hợp, giúp đơn giản hóa việc quản lý và lắng nghe trên nhiều goroutine chạy đồng thời, làm cho code gọn gàng và dễ đọc hơn.
Đoạn code trên trả ra kết quả như sau:
done after 1.001204583s
2. Xử lý lỗi
Trong Go, nhiều goroutine chạy đồng thời và độc lập với nhau và với cả process sinh ra chúng. Khi lỗi ở một goroutine, chúng ta sẽ gặp khó khăn xử lý lỗi nếu không có cơ chế giao tiếp phù hợp giữa các goroutine với nhau.
Điển hình là ví dụ dưới:
package main
import (
"fmt"
"net/http"
)
func main() {
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://badhost", "https://badhost2"}
for response := range checkStatus(done, urls...) {
fmt.Printf("Response: %v\n", response.Status)
}
}
func checkStatus(done <-chan interface{}, urls ...string) <-chan *http.Response {
responses := make(chan *http.Response)
go func() {
defer close(responses)
for _, url := range urls {
resp, err := http.Get(url)
if err != nil {
// Khi lỗi xảy ra, goroutine chỉ biết in ra console mà không có cách nào để thông báo lỗi tới nơi xử lý chúng.
// Và không thể dừng checkStatus khi có lỗi xảy ra.
fmt.Println(err)
continue
}
select {
case <-done:
return
case responses <- resp:
}
}
}()
return responses
}
Nếu có lỗi xảy ra, chương trình sẽ chỉ in ra màn hình kết quả:
Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host
Get "https://badhost2": dial tcp: lookup badhost: no such host
Solution:
Để xử lý vấn đề này, tôi sẽ trả ra một struct chứa cả data và error thay vì chỉ data chứa http.Response
.
// Tạo thêm một struct chứa cả dữ liệu và error
type Result struct {
Error error
Response *http.Response
}
func checkStatus(done <-chan interface{}, urls ...string) <-chan Result {
results := make(chan Result)
go func() {
defer close(results)
for _, url := range urls {
var result Result
resp, err := http.Get(url)
// Cho thêm error vào kết quả trả ra ở channel
result = Result{Error: err, Response: resp}
select {
case <-done:
return
case results <- result:
}
}
}()
return results
}
func main() {
done := make(chan interface{})
defer close(done)
urls := []string{"https://www.google.com", "https://badhost"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
// Nếu có lỗi khi chạy goroutine thì in ra lỗi, sau đó, dừng vòng lặp
fmt.Printf("error: %v", result.Error)
return
}
// Nếu không có lỗi thì in kết quả ra màn hình.
fmt.Printf("Response: %v\n", result.Response.Status)
}
}
Khi chạy tới url số 2, checkStatus
lỗi nên chương trình dừng.
Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host
Ngoài ra, tôi có thể check nếu error vượt quá giới hạn thì mới dừng chương trình.
done := make(chan interface{})
defer close(done)
errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
if result.Error != nil {
fmt.Printf("error: %v\n", result.Error)
errCount++
// Dừng chương trình nếu 3 lỗi xảy ra
if errCount >= 3 {
fmt.Println("Too many errors, breaking!")
break
}
continue
}
fmt.Printf("Response: %v\n", result.Response.Status)
}
Chương trình sẽ chỉ dừng nếu gặp 3 lỗi
error: Get "a": unsupported protocol scheme ""
Response: 200 OK
error: Get "b": unsupported protocol scheme ""
error: Get "c": unsupported protocol scheme ""
Too many errors, breaking!
3. Pipeline
Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.
Các stage hoạt động độc lập với nhau và việc chỉnh sửa một stage sẽ không ảnh hưởng tới các stage còn lại, do đó, chúng hoàn toàn có thể chạy đồng thời hoặc chạy theo thứ tự nhất định.
Sau đây là ví dụ đơn giản về stage:
Hàm multiply
và add
được coi là 2 stage khác nhau. Hàmmultiply
trả về giá trị là tích của hai số cho sẵn. Hàm add
trả về tổng của hai số cho sẵn.
Tôi có thể kết hợp hai stage này để trả về pipeline add(multiply(v, 2), 1))
.
Đoạn code trên sẽ in ra kết quả như sau:
6
10
14
18
Trong thực tế, các bài toán thường phức tạp hơn thế này. Trong mỗi vòng for, có thể là một hoặc nhiều pipeline bao gồm các stage đang chạy. Nếu một stage bị lỗi, làm sao để tôi dừng được cả pipeline cũng như dừng toàn bộ các pipeline khác? Hay khi một pipeline hoàn thành, làm sao để tôi dừng các pipeline khác?
Để giải quyết vấn đề trên, tôi sẽ thêm vào hàm add
hai channel done
và in
. Channel done
dùng để dừng stage khi cần thiết. Channel in
chứa dữ liệu cần xử lý.
func add(done, in <-chan int, additive int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
select {
case <-done:
return
case out <- i:
}
out <- n + additive
}
close(out)
}()
return out
}
Tôi cũng thêm tương tự với hàm multiply
.
func multiply(done, in <-chan int, multiplier int) <-chan int {
out := make(chan int)
go func() {
for n := range in {
select {
case <-done:
return
case out <- i:
}
out <- n * multiplier
}
close(out)
}()
return out
}
Pipeline mới của tôi sẽ có dạng như sau:
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)
4. Tổng kết
Or
Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng.- Trong Go, các tác vụ chạy đồng thời sử dụng goroutine. Các goroutine chạy độc lập với nhau và với cả process sinh ra chúng, nên khi gặp lỗi, cần có cơ chế giao tiếp giữa các goroutine với nhau xử lý lỗi phát sinh.
- Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.
5. Tài liệu tham khảo
- Concurrency in Go - Katherine Cox-Buday
- https://go.dev/talks/2012/concurrency.slide
- https://go.dev/blog/pipelines
Bài viết liên quan
Prettier là gì? Công cụ Định dạng mã nguồn tự động cho Lập trình viên
Sep 10, 2024 • 6 min read
Thư viện Husky là gì? Đảm bảo chất lượng Code với Git Hooks và Husky
Sep 08, 2024 • 5 min read
Functional Programming là gì? Giải pháp cho Hệ thống đa luồng và Xử lý song song
Sep 06, 2024 • 6 min read
ESLint là gì? Hướng dẫn cấu hình ESLint cho dự án Typescript
Sep 04, 2024 • 11 min read
Hướng dẫn TypeScript Syntax cơ bản cho người mới - Phần 2
Sep 04, 2024 • 16 min read
Jest là gì? Hướng dẫn cấu hình Jest với Typescript
Sep 02, 2024 • 9 min read