Apache Airflow に入門する

Apache Airflow を理解するため、公式ドキュメントの Quick Start を試してみます。Docker Compose を利用して Airflow 環境の作成ができるようです。

Running Airflow in Docker

環境

  • Ubuntu 20.04.4 LTS (WSL2)
  • Docker 20.10.16
  • Docker Compose 2.6.0
  • Airfow 2.3.2

Airflow 環境の作成

環境作成

compose.yaml ファイルをダウンロード。

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/docker-compose.yaml'

Airflow 系の複数サービスと、postgresとredis のサービスが定義されています。

https://airflow.apache.org/docs/apache-airflow/2.3.2/docker-compose.yaml

Airflow のコンテナがマウントするディレクトリ、コンテナが参照するコンフィグファイルを作成します。

$ mkdir -p ./dags ./logs ./plugins
$ echo -e "AIRFLOW_UID=$(id -u)" > .env
$ cat .env
AIRFLOW_UID=1000

Airflow 環境のための、初期化処理用コンテナを実行。

$ docker-compose up airflow-init

apache/airflowredispostresのコンテナイメージを pull してきて、諸々チェックした後、データベースに初期データを作っているぽいです。

pullされたイメージ。

$ docker image ls -a
REPOSITORY       TAG             IMAGE ID       CREATED       SIZE
apache/airflow   2.3.2           0f778fa22e4c   2 days ago    1.06GB
redis            latest          604d80444252   6 days ago    117MB
postgres         13              bb3dc2277987   3 weeks ago   373MB

起動します。

$ docker-compose up

起動されたコンテナ。

$ docker ps
CONTAINER ID   IMAGE                  COMMAND                  CREATED         STATUS                   PORTS                    NAMES
ed69a754612e   apache/airflow:2.3.2   "/usr/bin/dumb-init …"   6 minutes ago   Up 6 minutes (healthy)   8080/tcp                 airflow-airflow-triggerer-1
e4c781f96acf   apache/airflow:2.3.2   "/usr/bin/dumb-init …"   6 minutes ago   Up 6 minutes (healthy)   8080/tcp                 airflow-airflow-worker-1
12594171d6e5   apache/airflow:2.3.2   "/usr/bin/dumb-init …"   6 minutes ago   Up 6 minutes (healthy)   8080/tcp                 airflow-airflow-scheduler-1
e69c757c635c   apache/airflow:2.3.2   "/usr/bin/dumb-init …"   6 minutes ago   Up 6 minutes (healthy)   0.0.0.0:8080->8080/tcp   airflow-airflow-webserver-1
00d9f81f1bb8   redis:latest           "docker-entrypoint.s…"   6 minutes ago   Up 6 minutes (healthy)   6379/tcp                 airflow-redis-1
938abd8f3166   postgres:13            "docker-entrypoint.s…"   6 minutes ago   Up 6 minutes (healthy)   5432/tcp                 airflow-postgres-1

Airflow コマンドの実行

起動している Airflow に対して、 Airflow コマンドを実行してみます。ドキュメント記載のシェルスクリプトをダウンロード。

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/airflow.sh'
$ chmod +x airflow.sh

シェルスクリプトは、airflow-cli という bash -c airflow コマンドを実行するコンテナに、引数を渡して起動しているものでした。

https://airflow.apache.org/docs/apache-airflow/2.3.2/airflow.sh

airflow version コマンドを実行。

$ ./airflow.sh version
[+] Running 2/0
 ⠿ Container airflow-redis-1     Running                                                                                                                                                                    0.0s
 ⠿ Container airflow-postgres-1  Running                                                                                                                                                                    0.0s
[+] Running 3/3
 ⠿ Container airflow-postgres-1      Healthy                                                                                                                                                                2.7s
 ⠿ Container airflow-redis-1         Healthy                                                                                                                                                                2.6s
 ⠿ Container airflow-airflow-init-1  Exited                                                                                                                                                                27.8s
/home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:360: FutureWarning: The auth_backends setting in [api] has had airflow.api.auth.backend.session added in the running config, which is needed by the UI. Please update your config before Apache Airflow 3.0.
  FutureWarning,
2.3.2

airflow version

WEB コンソールにアクセス

以下の情報でアクセスできるらしいです。

The webserver is available at: http://localhost:8080. The default account has the login airflow and the password airflow.

example の DAG が、多数登録されています。

Airflow のアーキテクチャ

Airflow を構成する各コンポーネントたちを確認します。

基本的なアーキテクチャ

構成図。

Architecture Overview

コンポーネントの説明。

コンポーネント 役割
scheduler handles both triggering scheduled workflows, and submitting Tasks to the executor to run.
executor handles running tasks.
webserver presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks.
folder of DAG files read by the scheduler and executor (and any workers the executor has)
metadata database used by the scheduler, executor and webserver to store state.

Executor について

今回 QuickStart で作成した環境の Executor は、Celery Executor というものらしいです。

Executor には、下記の通り2種類あるらしく、

There are two types of executor - those that run tasks locally (inside the scheduler process), and those that run their tasks remotely (usually via a pool of workers).

Executor Types

Celery Executor は、Remote Executor に分類されるものです。

worker をスケールアウトするために利用されているとのこと。

CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow.cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.

Celery Executor

そもそも Celery とは、 OSS の分散タスクキュー/ジョブキューの事です。下記あたりを読むと、 Celery がしてくれることをイメージできるようなります。

Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.

It’s a task queue with focus on real-time processing, while also supporting task scheduling.

Celery - Distributed Task Queue

Task queues are used as a mechanism to distribute work across threads or machines.

A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.

Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.

A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.

What’s a Task Queue?

Airflow のアーキテクチャーでは、以下の部分を担っています。

コンポーネント 役割
Queue broker Stores commands for execution
Result backend Stores status of completed commands

Celery Executor Architecture

シーケンス図を見ると、よりイメージしやすいです。

Task execution process

Trigger Instance について

起動されているコンテナを見ると、 airflow-airflow-triggerer-1 というコンテナがいます。compose.yaml を見ると airflow triggerer コマンドを実行して、 Trigger Instance を実行するものらしいですが。

airflow triggerer

こいつが何をしているのかさっぱり分からなかったのですが、下記の方のブログが非常に参考になりました。

Airflow Deferrable Operators

公式ドキュメントでは、以下の説明があります。

Standard Operators and Sensors take up a full worker slot for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that’s currently running but idle, then you cannot run anything else - even though your entire Airflow cluster is essentially idle. reschedule mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else.

This is where Deferrable Operators come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.

Deferrable Operators & Triggers

Operators とは、DAG ファイルを書くときに指定するあの Operator のこと。Sensors とは、Deferrable Operators の先行となった Airflow におけるインフラリソース管理の仕組みのこと。

Smart Sensors This is a deprecated early-access feature that will be removed in Airflow 2.4.0. It is superseded by Deferrable Operators, which offer a more flexible way to achieve efficient long-running sensors, as well as allowing operators to also achieve similar efficiency gains. If you are considering writing a new Smart Sensor, you should instead write it as a Deferrable Operator.

Smart Sensors

Deferrable Operators という worker を効率的に利用できる新しい Operator があり、 Trigger Instance とは、Deferrable Operators で利用される Airflow のインフラリソース管理用プロセスなんだな、ぐらいの理解をしました。

起動しているコンテナと比較

Docker Compose で起動されている各コンテナと、これまで確認したコンポーネントマッピングしてみます。こんな感じでしょうか。

コンテナ名 マッピングされるコンポーネント
airflow-airflow-scheduler-1 scheduler
airflow-airflow-worker-1 worker
airflow-airflow-webserver-1 webserver
airflow-airflow-triggerer-1 Deferrable Operators で利用されるプロセス
airflow-postgres-1 metadata database
airflow-redis-1 Queue mechanism
celery が担う所

Amazon Managed Workflows for Apache Airflow と比較してみる

Amazon Managed Workflows for Apache Airflowアーキテクチャー図を眺めてみます。

アーキテクチャ

Google Cloud の Cloud Composer 2 と比較してみる

Cloud Composer 2アーキテクチャー図を眺めてみます。

Cloud Composer 環境のアーキテクチャ