Airflow là gì? Nền tảng Orchestration - Điều phối Workflow chuyên nghiệp
25 Sep, 2025
Hướng nội
AuthorApache Airflow là nền tảng mã nguồn mở được viết bằng Python để tạo, lập lịch và giám sát các workflow phức tạp

Mục Lục
Đối với Data Engineer, việc điều phối ETL/ELT bằng cron và script rời rạc dễ dẫn đến lỗi phụ thuộc ngầm, khó giám sát và khó mở rộng. Airflow giúp bạn định nghĩa, lên lịch, theo dõi và khôi phục workflow dưới dạng DAG một cách trực quan và có thể mở rộng. Hãy cùng mình tìm hiểu Airflow là gì, kèm theo ví dụ chi tiết để có thể bạn áp dụng ngay.
1. Airflow là gì?
Apache Airflow là nền tảng mã nguồn mở được viết bằng Python để tạo, lập lịch và giám sát các workflow phức tạp. Khác với các hệ thống lập lịch truyền thống, Airflow cho phép bạn định nghĩa toàn bộ quy trình nghiệp vụ dưới dạng code Python (Workflow as Code), mang lại khả năng version control, testing và collaboration vượt trội.
Ví dụ: Thay vì quản lý hàng chục cron job riêng lẻ với log files rải rác, bạn có thể định nghĩa toàn bộ pipeline ETL trong một file Python duy nhất với đầy đủ dependency tracking và monitoring.
2. Vì sao Airflow ra đời?
Trước khi có Airflow, các developer thường xây dựng pipeline data bằng cron kết hợp với shell scripts.
Ví dụ: Bạn cần xây dựng pipeline ETL hàng ngày để:
- Extract data từ database production
- Transform data (làm sạch, tính toán)
- Load vào data warehouse
- Gửi email báo cáo
Cách implement truyền thống với cron
# File crontab
0 2 * * * /scripts/extract_data.sh
0 3 * * * /scripts/transform_data.sh
0 4 * * * /scripts/load_to_dwh.sh
0 5 * * * /scripts/send_email_report.sh
Những vấn đề gặp phải:
- Không có dependency management: Nếu script
extract_data.shchạy lâu hơn dự kiến,transform_data.shvẫn chạy đúng giờ và fail - Monitoring khó khăn: Phải check manual từng log file riêng lẻ
- Retry phức tạp: Khi một job fail, phải manually trigger lại cả chain
- Khó debug: Không có visibility vào trạng thái tổng thể của pipeline
- Scalability issues: Khó phân phối workload trên nhiều server
Airflow ra đời năm 2014 tại Airbnb để giải quyết chính những vấn đề này bằng cách:
- Định nghĩa workflow dưới dạng code Python
- Tự động quản lý dependencies giữa các tasks
- Cung cấp giao diện web để giám sát trực quan
- Tự động retry tasks failed với configurable retry policy
- Distributed execution cho workloads lớn
3. Các chức năng chính của Airflow
3.1 Tạo Workflow linh hoạt
Airflow cho phép bạn xây dựng pipeline một cách tự động thông qua code Python. Khác với các hệ thống truyền thống yêu cầu config tĩnh, với Airflow, bạn có thể:
- Tạo workflow dựa trên điều kiện runtime
- Sinh tự động các task dựa trên dữ liệu đầu vào
- Thay đổi luồng công việc theo logic nghiệp vụ
Ví dụ: Khi xây dựng pipeline xử lý dữ liệu cho nhiều clients, thay vì tạo DAG riêng cho từng client, bạn có thể tạo một DAG template và khởi tạo tự động:
for client in get_active_clients():
dag_id = f'process_{client}_data'
globals()[dag_id] = create_client_dag(client)
3.2. Lập lịch và Kích hoạt thông minh
Airflow cung cấp hệ thống lập lịch mạnh mẽ vượt xa khả năng của cron:
- Lập lịch linh hoạt: Hỗ trợ cron expressions, khoảng thời gian cố định, hoặc kết hợp cả hai
- Trigger đa dạng: Manual trigger qua UI, API trigger, hoặc event-based triggering thông qua sensors
- Backfill và Catchup: Tự động xử lý các khoảng thời gian bị miss, cực kỳ hữu ích khi deploy pipeline mới hoặc sau thời gian downtime
Trong khi cron chỉ chạy job tại thời điểm cố định mà không quan tâm đến trạng thái của job trước đó, Airflow đảm bảo các dependency được thỏa mãn trước khi trigger job tiếp theo.
3.3 Giám sát và Cảnh báo toàn diện
Giao diện web của Airflow cung cấp khả năng giám sát cực kì trực quan:
- Tree View: Xem lịch sử execution của toàn bộ DAG qua các ngày
- Graph View: Hiển thị trực quan mối quan hệ phụ thuộc giữa các tasks
- Task Duration Analysis: Theo dõi hiệu năng và phát hiện các tasks chạy lâu bất thường
- Tự động cảnh báo: Gửi notification qua email, Slack, PagerDuty khi tasks failed hoặc retry
Khi một task extract data từ API bị failed do timeout, Airflow tự động retry 3 lần (configurable) và gửi alert đến team ngay sau lần retry cuối cùng failed, giúp giảm thời gian phát hiện và xử lý sự cố từ hours xuống minutes.
3.4 Kho Plugin phong phú
Airflow được thiết kế để dễ dàng tích hợp với hầu hết các hệ thống khác:
- Hàng trăm operators tích hợp sẵn: PostgreSQLOperator, BigQueryOperator, S3Operator,...
- Custom Operators: Dễ dàng tạo operators custom cho hệ thống nội bộ
- Hooks: Quản lý kết nối đến các services (databases, APIs, cloud services)
- Plugins System: Thêm tính năng mới vào web UI hoặc extend core functionality
Ví dụ: Bạn có thể tạo custom operator để tương tác với internal API của công ty:
class InternalAPIOperator(BaseOperator):
def __init__(self, endpoint, method='GET', **kwargs):
super().__init__(**kwargs)
self.endpoint = endpoint
self.method = method
def execute(self, context):
# Logic to call internal API
response = call_internal_api(self.endpoint, self.method)
return response
3.5 Khả năng Mở rộng theo nhu cầu
Airflow hỗ trợ multiple executors để đáp ứng nhu cầu scale khác nhau:
- LocalExecutor: Chạy task song song trên một server
- CeleryExecutor: Distributed task queue, scale horizontally bằng cách thêm workers
- KubernetesExecutor: Tự động khởi chạy tasks trên Kubernetes cluster
Một công ty có thể bắt đầu với LocalExecutor trên single server, khi volume data tăng lên thì chuyển sang CeleryExecutor với multiple workers, và cuối cùng sử dụng KubernetesExecutor để auto-scale dựa trên workload.
4. Hướng dẫn cài đặt Airflow bằng Docker
- Bước 1: Tạo project structure và requirements (file này optional, bạn không muốn cài thêm thư viện thì bỏ qua file này).
mkdir airflow-project && cd airflow-project
mkdir -p dags logs plugins
- Bước 2: Tạo docker-compose.yml, mình sẽ chạy airflow standalone để demo, khi lên production thì các bạn không nên dùng mode này nhé.
services:
# Airflow Database
airflow-db:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
ports:
- "5433:5432"
volumes:
- airflow_data:/var/lib/postgresql/data
restart: always
# Airflow
airflow:
image: apache/airflow:latest-python3.11
environment:
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@airflow-db:5432/airflow
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
# Metadata DB connection
PYTHONPATH: /opt/airflow/dags
volumes:
- ./dags:/opt/airflow/dags
- ./logs:/opt/airflow/logs
- ./requirements.txt:/requirements.txt
ports:
- "8080:8080"
depends_on:
- airflow-db
command: >
bash -c "
(airflow db migrate &&
pip install -r /requirements.txt) &&
airflow standalone
"
restart: always
volumes:
airflow_data:
- Bước 3: Khởi tạo container bằng lệnh
docker compose up - Bước 4: Truy cập Web UI
- Truy cập URL:http://localhost:8080
- Sử dụng username: admin, password sẽ được tìm thấy bên trong container, bạn truy cập vào container airflow webserver bằng câu lệnh docker
exec -it container_name bash, sau đócatfile chứa password, file này sẽ nằm ngay tại airflow home nên bạn vào là sẽ thấy nó liền.
5. Demo Pipeline ETL hoàn chỉnh với Airflow
Pipeline này sẽ mô phỏng quy trình báo cáo doanh thu hàng ngày mà nhiều công ty đang sử dụng.
# dags/daily_sales_report.py
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.email import EmailOperator
from airflow.operators.bash import BashOperator
from datetime import datetime, timedelta
import pandas as pd
import requests
default_args = {
'owner': 'data_team',
'depends_on_past': False,
'start_date': datetime(2023, 1, 1),
'retries': 3,
'retry_delay': timedelta(minutes=5),
'email_on_failure': True,
'email_on_retry': True,
}
def extract_sales_data(**kwargs):
"""Mock function to extract data from API"""
print("Extracting sales data from API...")
# Giả lập API call
data = {
'date': ['2023-10-01', '2023-10-01', '2023-10-02'],
'product': ['A', 'B', 'A'],
'revenue': [100, 200, 150]
}
df = pd.DataFrame(data)
df.to_csv('/tmp/sales_data_raw.csv', index=False)
return '/tmp/sales_data_raw.csv'
def transform_sales_data(**kwargs):
"""Clean and transform sales data"""
print("Transforming sales data...")
ti = kwargs['ti']
input_file = ti.xcom_pull(task_ids='extract_sales_data')
df = pd.read_csv(input_file)
# Data cleaning
df['date'] = pd.to_datetime(df['date'])
# Aggregation
daily_sales = df.groupby('date')['revenue'].sum().reset_index()
daily_sales['revenue'] = daily_sales['revenue'].astype(int)
daily_sales.to_csv('/tmp/daily_sales_report.csv', index=False)
return '/tmp/daily_sales_report.csv'
def load_to_database(**kwargs):
"""Mock function to load data to database"""
print("Loading data to database...")
ti = kwargs['ti']
input_file = ti.xcom_pull(task_ids='transform_sales_data')
# Giả lập database load
df = pd.read_csv(input_file)
print(f"Loaded {len(df)} records to database")
# Trả về summary để sử dụng trong email
total_revenue = df['revenue'].sum()
return f"Total revenue: ${total_revenue}"
with DAG(
'daily_sales_report',
default_args=default_args,
description='Daily sales ETL pipeline',
schedule_interval='@daily',
catchup=False,
tags=['sales', 'reporting'],
) as dag:
extract_task = PythonOperator(
task_id='extract_sales_data',
python_callable=extract_sales_data,
)
transform_task = PythonOperator(
task_id='transform_sales_data',
python_callable=transform_sales_data,
)
load_task = PythonOperator(
task_id='load_to_database',
python_callable=load_to_database,
)
send_email = EmailOperator(
task_id='send_report_email',
to='data-team@company.com',
subject='Daily Sales Report - {{ ds }}',
html_content="""
<h3>Daily Sales Report</h3>
<p>Date: {{ ds }}</p>
<p>Summary: {{ ti.xcom_pull(task_ids='load_to_database') }}</p>
<p>Pipeline completed successfully!</p>
""",
)
# Define dependencies
extract_task >> transform_task >> load_task >> send_email
# Parallel task example
data_validation = BashOperator(
task_id='validate_data',
bash_command='echo "Running data validation checks..."',
)
transform_task >> data_validation
6. Kết luận
Apache Airflow là công cụ phù hợp cho việc xây dựng và quản lý workflow, đặc biệt là các developer làm việc trong lĩnh vực data engineering và automation. Với khả năng mở rộng, cộng đồng hỗ trợ lớn và sự linh hoạt trong việc định nghĩa pipeline bằng code, Airflow xứng đáng là sự lựa chọn hàng đầu để quản lý các quy trình phức tạp.