Facebook Pixel

Airflow là gì? Nền tảng Orchestration - Điều phối Workflow chuyên nghiệp

25 Sep, 2025

Apache Airflow là nền tảng mã nguồn mở được viết bằng Python để tạo, lập lịch và giám sát các workflow phức tạp

Airflow là gì? Nền tảng Orchestration - Điều phối Workflow chuyên nghiệp

Mục Lục

Đối với Data Engineer, việc điều phối ETL/ELT bằng cron và script rời rạc dễ dẫn đến lỗi phụ thuộc ngầm, khó giám sát và khó mở rộng. Airflow giúp bạn định nghĩa, lên lịch, theo dõi và khôi phục workflow dưới dạng DAG một cách trực quan và có thể mở rộng. Hãy cùng mình tìm hiểu Airflow là gì, kèm theo ví dụ chi tiết để có thể bạn áp dụng ngay.

1. Airflow là gì?

Apache Airflow là nền tảng mã nguồn mở được viết bằng Python để tạo, lập lịch và giám sát các workflow phức tạp. Khác với các hệ thống lập lịch truyền thống, Airflow cho phép bạn định nghĩa toàn bộ quy trình nghiệp vụ dưới dạng code Python (Workflow as Code), mang lại khả năng version control, testing và collaboration vượt trội.

Ví dụ: Thay vì quản lý hàng chục cron job riêng lẻ với log files rải rác, bạn có thể định nghĩa toàn bộ pipeline ETL trong một file Python duy nhất với đầy đủ dependency tracking và monitoring.

2. Vì sao Airflow ra đời?

Trước khi có Airflow, các developer thường xây dựng pipeline data bằng cron kết hợp với shell scripts.

Ví dụ: Bạn cần xây dựng pipeline ETL hàng ngày để:

  1. Extract data từ database production
  2. Transform data (làm sạch, tính toán)
  3. Load vào data warehouse
  4. Gửi email báo cáo

Cách implement truyền thống với cron

Txt
# File crontab
0 2 * * * /scripts/extract_data.sh
0 3 * * * /scripts/transform_data.sh
0 4 * * * /scripts/load_to_dwh.sh
0 5 * * * /scripts/send_email_report.sh

Những vấn đề gặp phải:

  1. Không có dependency management: Nếu script extract_data.sh chạy lâu hơn dự kiến, transform_data.sh vẫn chạy đúng giờ và fail
  2. Monitoring khó khăn: Phải check manual từng log file riêng lẻ
  3. Retry phức tạp: Khi một job fail, phải manually trigger lại cả chain
  4. Khó debug: Không có visibility vào trạng thái tổng thể của pipeline
  5. Scalability issues: Khó phân phối workload trên nhiều server

Airflow ra đời năm 2014 tại Airbnb để giải quyết chính những vấn đề này bằng cách:

  • Định nghĩa workflow dưới dạng code Python
  • Tự động quản lý dependencies giữa các tasks
  • Cung cấp giao diện web để giám sát trực quan
  • Tự động retry tasks failed với configurable retry policy
  • Distributed execution cho workloads lớn

3. Các chức năng chính của Airflow

3.1 Tạo Workflow linh hoạt

Airflow cho phép bạn xây dựng pipeline một cách tự động thông qua code Python. Khác với các hệ thống truyền thống yêu cầu config tĩnh, với Airflow, bạn có thể:

  • Tạo workflow dựa trên điều kiện runtime
  • Sinh tự động các task dựa trên dữ liệu đầu vào
  • Thay đổi luồng công việc theo logic nghiệp vụ

Ví dụ: Khi xây dựng pipeline xử lý dữ liệu cho nhiều clients, thay vì tạo DAG riêng cho từng client, bạn có thể tạo một DAG template và khởi tạo tự động:

Python
for client in get_active_clients():
    dag_id = f'process_{client}_data'
    globals()[dag_id] = create_client_dag(client)

3.2.  Lập lịch và Kích hoạt thông minh

Airflow cung cấp hệ thống lập lịch mạnh mẽ vượt xa khả năng của cron:

  • Lập lịch linh hoạt: Hỗ trợ cron expressions, khoảng thời gian cố định, hoặc kết hợp cả hai
  • Trigger đa dạng: Manual trigger qua UI, API trigger, hoặc event-based triggering thông qua sensors
  • Backfill và Catchup: Tự động xử lý các khoảng thời gian bị miss, cực kỳ hữu ích khi deploy pipeline mới hoặc sau thời gian downtime

Trong khi cron chỉ chạy job tại thời điểm cố định mà không quan tâm đến trạng thái của job trước đó, Airflow đảm bảo các dependency được thỏa mãn trước khi trigger job tiếp theo.

3.3 Giám sát và Cảnh báo toàn diện

Giao diện web của Airflow cung cấp khả năng giám sát cực kì trực quan:

  • Tree View: Xem lịch sử execution của toàn bộ DAG qua các ngày
  • Graph View: Hiển thị trực quan mối quan hệ phụ thuộc giữa các tasks
  • Task Duration Analysis: Theo dõi hiệu năng và phát hiện các tasks chạy lâu bất thường
  • Tự động cảnh báo: Gửi notification qua email, Slack, PagerDuty khi tasks failed hoặc retry

Khi một task extract data từ API bị failed do timeout, Airflow tự động retry 3 lần (configurable) và gửi alert đến team ngay sau lần retry cuối cùng failed, giúp giảm thời gian phát hiện và xử lý sự cố từ hours xuống minutes.

3.4 Kho Plugin phong phú

Airflow được thiết kế để dễ dàng tích hợp với hầu hết các hệ thống khác:

  • Hàng trăm operators tích hợp sẵn: PostgreSQLOperator, BigQueryOperator, S3Operator,...
  • Custom Operators: Dễ dàng tạo operators custom cho hệ thống nội bộ
  • Hooks: Quản lý kết nối đến các services (databases, APIs, cloud services)
  • Plugins System: Thêm tính năng mới vào web UI hoặc extend core functionality

Ví dụ: Bạn có thể tạo custom operator để tương tác với internal API của công ty:

Python
class InternalAPIOperator(BaseOperator):
    def __init__(self, endpoint, method='GET', **kwargs):
        super().__init__(**kwargs)
        self.endpoint = endpoint
        self.method = method
        
    def execute(self, context):
        # Logic to call internal API
        response = call_internal_api(self.endpoint, self.method)
        return response

3.5 Khả năng Mở rộng theo nhu cầu

Airflow hỗ trợ multiple executors để đáp ứng nhu cầu scale khác nhau:

  • LocalExecutor: Chạy task song song trên một server
  • CeleryExecutor: Distributed task queue, scale horizontally bằng cách thêm workers
  • KubernetesExecutor: Tự động khởi chạy tasks trên Kubernetes cluster

Một công ty có thể bắt đầu với LocalExecutor trên single server, khi volume data tăng lên thì chuyển sang CeleryExecutor với multiple workers, và cuối cùng sử dụng KubernetesExecutor để auto-scale dựa trên workload.

4. Hướng dẫn cài đặt Airflow bằng Docker

  • Bước 1: Tạo project structure và requirements (file này optional, bạn không muốn cài thêm thư viện thì bỏ qua file này).
Bash
mkdir airflow-project && cd airflow-project
mkdir -p dags logs plugins
  • Bước 2: Tạo docker-compose.yml, mình sẽ chạy airflow standalone để demo, khi lên production thì các bạn không nên dùng mode này nhé.
Yml
services:
  # Airflow Database
  airflow-db:
    image: postgres:13
    environment:
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
      POSTGRES_DB: airflow
    ports:
      - "5433:5432"
    volumes:
      - airflow_data:/var/lib/postgresql/data
    restart: always

  # Airflow
  airflow:
    image: apache/airflow:latest-python3.11
    environment:
      AIRFLOW__CORE__EXECUTOR: LocalExecutor
      AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow
      AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
      # Metadata DB connection
      PYTHONPATH: /opt/airflow/dags
    volumes:
      - ./dags:/opt/airflow/dags
      - ./logs:/opt/airflow/logs
      - ./requirements.txt:/requirements.txt
    ports:
      - "8080:8080"
    depends_on:
      - airflow-db
    command: >
      bash -c "
      (airflow db migrate &&
      pip install -r /requirements.txt) &&
      airflow standalone
      "
    restart: always

volumes:
  airflow_data:
  • Bước 3: Khởi tạo container bằng lệnh docker compose up
  • Bước 4: Truy cập Web UI
    • Truy cập URL:http://localhost:8080
    • Sử dụng username: admin, password sẽ được tìm thấy bên trong container, bạn truy cập vào container airflow webserver bằng câu lệnh docker exec -it container_name bash , sau đó cat file chứa password, file này sẽ nằm ngay tại airflow home nên bạn vào là sẽ thấy nó liền.

5. Demo Pipeline ETL hoàn chỉnh với Airflow

Pipeline này sẽ mô phỏng quy trình báo cáo doanh thu hàng ngày mà nhiều công ty đang sử dụng.

Python
# dags/daily_sales_report.py

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
import requests

default_args = {
    'owner': 'data_team',
    'depends_on_past': False,
    'start_date': datetime(2023, 1, 1),
    'retries': 3,
    'retry_delay': timedelta(minutes=5),
    'email_on_failure': True,
    'email_on_retry': True,
}

def extract_sales_data(**kwargs):
    """Mock function to extract data from API"""
    print("Extracting sales data from API...")
    # Giả lập API call
    data = {
        'date': ['2023-10-01', '2023-10-01', '2023-10-02'],
        'product': ['A', 'B', 'A'],
        'revenue': [100, 200, 150]
    }
    df = pd.DataFrame(data)
    df.to_csv('/tmp/sales_data_raw.csv', index=False)
    return '/tmp/sales_data_raw.csv'

def transform_sales_data(**kwargs):
    """Clean and transform sales data"""
    print("Transforming sales data...")
    ti = kwargs['ti']
    input_file = ti.xcom_pull(task_ids='extract_sales_data')
    
    df = pd.read_csv(input_file)
    
    # Data cleaning
    df['date'] = pd.to_datetime(df['date'])
    
    # Aggregation
    daily_sales = df.groupby('date')['revenue'].sum().reset_index()
    daily_sales['revenue'] = daily_sales['revenue'].astype(int)
    
    daily_sales.to_csv('/tmp/daily_sales_report.csv', index=False)
    return '/tmp/daily_sales_report.csv'

def load_to_database(**kwargs):
    """Mock function to load data to database"""
    print("Loading data to database...")
    ti = kwargs['ti']
    input_file = ti.xcom_pull(task_ids='transform_sales_data')
    
    # Giả lập database load
    df = pd.read_csv(input_file)
    print(f"Loaded {len(df)} records to database")
    
    # Trả về summary để sử dụng trong email
    total_revenue = df['revenue'].sum()
    return f"Total revenue: ${total_revenue}"

with DAG(
    'daily_sales_report',
    default_args=default_args,
    description='Daily sales ETL pipeline',
    schedule_interval='@daily',
    catchup=False,
    tags=['sales', 'reporting'],
) as dag:

    extract_task = PythonOperator(
        task_id='extract_sales_data',
        python_callable=extract_sales_data,
    )

    transform_task = PythonOperator(
        task_id='transform_sales_data',
        python_callable=transform_sales_data,
    )

    load_task = PythonOperator(
        task_id='load_to_database',
        python_callable=load_to_database,
    )

    send_email = EmailOperator(
        task_id='send_report_email',
        to='data-team@company.com',
        subject='Daily Sales Report - {{ ds }}',
        html_content="""
        <h3>Daily Sales Report</h3>
        <p>Date: {{ ds }}</p>
        <p>Summary: {{ ti.xcom_pull(task_ids='load_to_database') }}</p>
        <p>Pipeline completed successfully!</p>
        """,
    )

    # Define dependencies
    extract_task >> transform_task >> load_task >> send_email

    # Parallel task example
    data_validation = BashOperator(
        task_id='validate_data',
        bash_command='echo "Running data validation checks..."',
    )

    transform_task >> data_validation

6. Kết luận

Apache Airflow là công cụ phù hợp cho việc xây dựng và quản lý workflow, đặc biệt là các developer làm việc trong lĩnh vực data engineering và automation. Với khả năng mở rộng, cộng đồng hỗ trợ lớn và sự linh hoạt trong việc định nghĩa pipeline bằng code, Airflow xứng đáng là sự lựa chọn hàng đầu để quản lý các quy trình phức tạp.

Bài viết liên quan

Đăng ký nhận thông báo

Đừng bỏ lỡ những bài viết thú vị từ 200Lab