Facebook Pixel

Concurrency pattern trong Go - Phần 2

11 Apr, 2024

Ở phần này, chúng ta sẽ tìm hiểu tiếp 3 pattern quan trọng khác, bao gồm: or channel, xử lý lỗi và pipeline.

Concurrency pattern trong Go - Phần 2

Mục Lục

Trong bài Concurrency pattern trong Go - Phần 1, chúng ta đã tìm hiểu 3 pattern quan trọng là confinement, vòng lặp for select và ngăn goroutine leak. Ở phần này, chúng ta sẽ tìm hiểu tiếp 3 pattern quan trọng khác, bao gồm: or channel, xử lý lỗi và pipeline.

1. Or channel

Ở phần 1, tôi đề cập tới pattern done channel dùng để dừng một goroutine. Đôi khi, một goroutine có nhiều điều kiện dừng. Pattern or channel sinh ra để giải quyết bài toán này.

Or Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng. Pattern này tạo ra một channel tổng hợp, giúp đơn giản hóa việc quản lý và lắng nghe trên nhiều goroutine chạy đồng thời, làm cho code gọn gàng và dễ đọc hơn.

Go
package main

import (
	"fmt"
	"time"
)

func main() {
	start := time.Now()
	<-Or(
		Sig(2*time.Hour),
		Sig(5*time.Minute),
		Sig(1*time.Second),
		Sig(1*time.Hour),
		Sig(1*time.Minute),
	)
	fmt.Printf("done after %v", time.Since(start))
}

// Hàm Sig tạo ra một channel sẽ đóng sau một khoảng thời gian xác định after.
func Sig(after time.Duration) <-chan any {
	c := make(chan interface{})
	go func() {
    	// Đóng channel sau một khoảng thời gian
		defer close(c)
		time.Sleep(after)
	}()
	return c
}

// Hàm `Or` nhận vào một slice các channel và trả ra duy nhất 1 channel tổng hợp.
func Or(channels ...<-chan any) <-chan any {
	switch len(channels) {
	case 0:
    	// Trường hợp không có channel nào, trả về một channel đã đóng để không xảy ra deadlock.
		closedCh := make(chan any)
		close(closedCh)
        return closedCh
	case 1:
    	// Chỉ có một channel, trả về chính channel đó.
		return channels[0]
	default:
    	// Nếu có từ 2 channel trở lên thì kết hợp nhiều channels thành 1 channel `orDone`
		orDone := make(chan any)
		go func() {
			defer close(orDone)

			select {
			case <-channels[0]:
			case <-channels[1]:
            // Gọi đệ quy cho tới khi channels chỉ còn một phần tử
			case <-Or(append(channels[2:], orDone)...):
			}
		}()
        // Trả ra 1 channel tổng hợp
		return orDone
	}
}
Ví dụ khởi tạo và sử dụng Or channel

Đoạn code trên trả ra kết quả như sau:

Bash
done after 1.001204583s

2. Xử lý lỗi

Trong Go, nhiều goroutine chạy đồng thời và độc lập với nhau và với cả process sinh ra chúng. Khi lỗi ở một goroutine, chúng ta sẽ gặp khó khăn xử lý lỗi nếu không có cơ chế giao tiếp phù hợp giữa các goroutine với nhau.

Điển hình là ví dụ dưới:

Go
package main

import (
	"fmt"
	"net/http"
)

func main() {
	done := make(chan interface{})
	defer close(done)

    urls := []string{"https://www.google.com", "https://badhost", "https://badhost2"}
    for response := range checkStatus(done, urls...) {
        fmt.Printf("Response: %v\n", response.Status)
    }
}

func checkStatus(done <-chan interface{}, urls ...string) <-chan *http.Response {
    responses := make(chan *http.Response)
    go func() {
        defer close(responses)
        for _, url := range urls {
            resp, err := http.Get(url)
            if err != nil {
            	// Khi lỗi xảy ra, goroutine chỉ biết in ra console mà không có cách nào để thông báo lỗi tới nơi xử lý chúng.
                // Và không thể dừng checkStatus khi có lỗi xảy ra.
                fmt.Println(err)
                continue
            }
            select {
            case <-done:
                return
            case responses <- resp:
            }
        }
    }()
    return responses
}

Nếu có lỗi xảy ra, chương trình sẽ chỉ in ra màn hình kết quả:

Bash
Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host
Get "https://badhost2": dial tcp: lookup badhost: no such host

Solution:

Để xử lý vấn đề này, tôi sẽ trả ra một struct chứa cả data và error thay vì chỉ data chứa http.Response.

Go
// Tạo thêm một struct chứa cả dữ liệu và error
type Result struct {
    Error error
    Response *http.Response
}

func checkStatus(done <-chan interface{}, urls ...string) <-chan Result {
    results := make(chan Result)
    go func() {
        defer close(results)

        for _, url := range urls {
            var result Result
            resp, err := http.Get(url)
            // Cho thêm error vào kết quả trả ra ở channel
            result = Result{Error: err, Response: resp}
            select {
            case <-done:
                return
            case results <- result:
            }
        }
    }()
    return results
}
Go
func main() {
	done := make(chan interface{})
    defer close(done)

    urls := []string{"https://www.google.com", "https://badhost"}
    for result := range checkStatus(done, urls...) {
        if result.Error != nil {
        	// Nếu có lỗi khi chạy goroutine thì in ra lỗi, sau đó, dừng vòng lặp
            fmt.Printf("error: %v", result.Error)
            return
        }
        // Nếu không có lỗi thì in kết quả ra màn hình.
        fmt.Printf("Response: %v\n", result.Response.Status)
    }
}

Khi chạy tới url số 2, checkStatus lỗi nên chương trình dừng.

Go
Response: 200 OK
Get "https://badhost": dial tcp: lookup badhost: no such host

Ngoài ra, tôi có thể check nếu error vượt quá giới hạn thì mới dừng chương trình.

Go
done := make(chan interface{})
defer close(done)

errCount := 0
urls := []string{"a", "https://www.google.com", "b", "c", "d"}
for result := range checkStatus(done, urls...) {
    if result.Error != nil {
        fmt.Printf("error: %v\n", result.Error)
        errCount++
        // Dừng chương trình nếu 3 lỗi xảy ra
        if errCount >= 3 {
            fmt.Println("Too many errors, breaking!")
            break
        }
        continue
    }
    fmt.Printf("Response: %v\n", result.Response.Status)
}

Chương trình sẽ chỉ dừng nếu gặp 3 lỗi

Go
error: Get "a": unsupported protocol scheme ""
Response: 200 OK
error: Get "b": unsupported protocol scheme ""
error: Get "c": unsupported protocol scheme ""
Too many errors, breaking!

3. Pipeline

Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.

Pipeline

Các stage hoạt động độc lập với nhau và việc chỉnh sửa một stage sẽ không ảnh hưởng tới các stage còn lại, do đó, chúng hoàn toàn có thể chạy đồng thời hoặc chạy theo thứ tự nhất định.

Sau đây là ví dụ đơn giản về stage:

Go
multiply := func(value, multiplier int) int {
    return value * multiplier
}

add := func(value, additive int) int {
    return value + additive
}
Ví dụ về stage

Hàm multiplyadd được coi là 2 stage khác nhau. Hàmmultiply trả về giá trị là tích của hai số cho sẵn. Hàm add trả về tổng của hai số cho sẵn.

Tôi có thể kết hợp hai stage này để trả về pipeline add(multiply(v, 2), 1)).

Go
ints := []int{1, 2, 3, 4}
for _, v := range ints {
    fmt.Println(multiply(add(multiply(v, 2), 1), 2))
}
Ví dụ về pipeline

Đoạn code trên sẽ in ra kết quả như sau:

Bash
6
10
14
18

Trong thực tế, các bài toán thường phức tạp hơn thế này. Trong mỗi vòng for, có thể là một hoặc nhiều pipeline bao gồm các stage đang chạy. Nếu một stage bị lỗi, làm sao để tôi dừng được cả pipeline cũng như dừng toàn bộ các pipeline khác? Hay khi một pipeline hoàn thành, làm sao để tôi dừng các pipeline khác?

Để giải quyết vấn đề trên, tôi sẽ thêm vào hàm add hai channel donein. Channel done dùng để dừng stage khi cần thiết. Channel in chứa dữ liệu cần xử lý.

Go
func add(done, in <-chan int, additive int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
        	select {
            case <-done:
                return
            case out <- i:
            }
            out <- n + additive
        }
        close(out)
    }()
    return out
}

Tôi cũng thêm tương tự với hàm multiply.

Go
func multiply(done, in <-chan int, multiplier int) <-chan int {
    out := make(chan int)
    go func() {
        for n := range in {
        	select {
            case <-done:
                return
            case out <- i:
            }
            out <- n * multiplier
        }
        close(out)
    }()
    return out
}

Pipeline mới của tôi sẽ có dạng như sau:

Go
pipeline := multiply(done, add(done, multiply(done, intStream, 2), 1), 2)

4. Tổng kết

  • Or Channel cho phép một goroutine lắng nghe nhiều channel cùng một lúc và phản hồi ngay khi có channel nào đó sẵn sàng.
  • Trong Go, các tác vụ chạy đồng thời sử dụng goroutine. Các goroutine chạy độc lập với nhau và với cả process sinh ra chúng, nên khi gặp lỗi, cần có cơ chế giao tiếp giữa các goroutine với nhau xử lý lỗi phát sinh.
  • Pipeline là kỹ thuật thiết kế cho phép quản lý luồng xử lý dữ liệu thông qua chuỗi các giai đoạn (gọi là pipeline). Mỗi giai đoạn (gọi là stage) sẽ nhận vào dữ liệu từ một nguồn, xử lý và gửi dữ liệu đã xử lý vào một nguồn khác.

5. Tài liệu tham khảo

  1. Concurrency in Go - Katherine Cox-Buday
  2. https://go.dev/talks/2012/concurrency.slide
  3. https://go.dev/blog/pipelines

Bài viết liên quan

Lập trình backend expressjs

xây dựng hệ thống microservices
  • Kiến trúc Hexagonal và ứng dụngal font-
  • TypeScript: OOP và nguyên lý SOLIDal font-
  • Event-Driven Architecture, Queue & PubSubal font-
  • Basic scalable System Designal font-

Đăng ký nhận thông báo

Đừng bỏ lỡ những bài viết thú vị từ 200Lab