Apache Airflow を理解するため、公式ドキュメントの Quick Start を試してみます。Docker Compose を利用して Airflow 環境の作成ができるようです。
環境
- Ubuntu 20.04.4 LTS (WSL2)
- Docker 20.10.16
- Docker Compose 2.6.0
- Airfow 2.3.2
Airflow 環境の作成
環境作成
compose.yaml
ファイルをダウンロード。
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/docker-compose.yaml'
Airflow 系の複数サービスと、postgresとredis のサービスが定義されています。
https://airflow.apache.org/docs/apache-airflow/2.3.2/docker-compose.yaml
Airflow のコンテナがマウントするディレクトリ、コンテナが参照するコンフィグファイルを作成します。
$ mkdir -p ./dags ./logs ./plugins $ echo -e "AIRFLOW_UID=$(id -u)" > .env $ cat .env AIRFLOW_UID=1000
Airflow 環境のための、初期化処理用コンテナを実行。
$ docker-compose up airflow-init
apache/airflow
やredis
やpostres
のコンテナイメージを pull してきて、諸々チェックした後、データベースに初期データを作っているぽいです。
pullされたイメージ。
$ docker image ls -a REPOSITORY TAG IMAGE ID CREATED SIZE apache/airflow 2.3.2 0f778fa22e4c 2 days ago 1.06GB redis latest 604d80444252 6 days ago 117MB postgres 13 bb3dc2277987 3 weeks ago 373MB
起動します。
$ docker-compose up
起動されたコンテナ。
$ docker ps CONTAINER ID IMAGE COMMAND CREATED STATUS PORTS NAMES ed69a754612e apache/airflow:2.3.2 "/usr/bin/dumb-init …" 6 minutes ago Up 6 minutes (healthy) 8080/tcp airflow-airflow-triggerer-1 e4c781f96acf apache/airflow:2.3.2 "/usr/bin/dumb-init …" 6 minutes ago Up 6 minutes (healthy) 8080/tcp airflow-airflow-worker-1 12594171d6e5 apache/airflow:2.3.2 "/usr/bin/dumb-init …" 6 minutes ago Up 6 minutes (healthy) 8080/tcp airflow-airflow-scheduler-1 e69c757c635c apache/airflow:2.3.2 "/usr/bin/dumb-init …" 6 minutes ago Up 6 minutes (healthy) 0.0.0.0:8080->8080/tcp airflow-airflow-webserver-1 00d9f81f1bb8 redis:latest "docker-entrypoint.s…" 6 minutes ago Up 6 minutes (healthy) 6379/tcp airflow-redis-1 938abd8f3166 postgres:13 "docker-entrypoint.s…" 6 minutes ago Up 6 minutes (healthy) 5432/tcp airflow-postgres-1
Airflow コマンドの実行
起動している Airflow に対して、 Airflow コマンドを実行してみます。ドキュメント記載のシェルスクリプトをダウンロード。
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.3.2/airflow.sh' $ chmod +x airflow.sh
シェルスクリプトは、airflow-cli という bash -c airflow
コマンドを実行するコンテナに、引数を渡して起動しているものでした。
https://airflow.apache.org/docs/apache-airflow/2.3.2/airflow.sh
airflow version
コマンドを実行。
$ ./airflow.sh version [+] Running 2/0 ⠿ Container airflow-redis-1 Running 0.0s ⠿ Container airflow-postgres-1 Running 0.0s [+] Running 3/3 ⠿ Container airflow-postgres-1 Healthy 2.7s ⠿ Container airflow-redis-1 Healthy 2.6s ⠿ Container airflow-airflow-init-1 Exited 27.8s /home/airflow/.local/lib/python3.7/site-packages/airflow/configuration.py:360: FutureWarning: The auth_backends setting in [api] has had airflow.api.auth.backend.session added in the running config, which is needed by the UI. Please update your config before Apache Airflow 3.0. FutureWarning, 2.3.2
WEB コンソールにアクセス
以下の情報でアクセスできるらしいです。
The webserver is available at:
http://localhost:8080
. The default account has the loginairflow
and the passwordairflow
.
example の DAG が、多数登録されています。
Airflow のアーキテクチャー
Airflow を構成する各コンポーネントたちを確認します。
基本的なアーキテクチャー
構成図。
各コンポーネントの説明。
コンポーネント名 | 役割 |
---|---|
scheduler | handles both triggering scheduled workflows, and submitting Tasks to the executor to run. |
executor | handles running tasks. |
webserver | presents a handy user interface to inspect, trigger and debug the behaviour of DAGs and tasks. |
folder of DAG files | read by the scheduler and executor (and any workers the executor has) |
metadata database | used by the scheduler, executor and webserver to store state. |
Executor について
今回 QuickStart で作成した環境の Executor は、Celery Executor
というものらしいです。
Executor には、下記の通り2種類あるらしく、
There are two types of executor - those that run tasks locally (inside the scheduler process), and those that run their tasks remotely (usually via a pool of workers).
Celery Executor
は、Remote Executor に分類されるものです。
worker をスケールアウトするために利用されているとのこと。
CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow.cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.
そもそも Celery
とは、 OSS の分散タスクキュー/ジョブキューの事です。下記あたりを読むと、 Celery
がしてくれることをイメージできるようなります。
Celery is a simple, flexible, and reliable distributed system to process vast amounts of messages, while providing operations with the tools required to maintain such a system.
It’s a task queue with focus on real-time processing, while also supporting task scheduling.
Celery - Distributed Task Queue
Task queues are used as a mechanism to distribute work across threads or machines.
A task queue’s input is a unit of work called a task. Dedicated worker processes constantly monitor task queues for new work to perform.
Celery communicates via messages, usually using a broker to mediate between clients and workers. To initiate a task the client adds a message to the queue, the broker then delivers that message to a worker.
A Celery system can consist of multiple workers and brokers, giving way to high availability and horizontal scaling.
Airflow のアーキテクチャーでは、以下の部分を担っています。
コンポーネント名 | 役割 |
---|---|
Queue broker | Stores commands for execution |
Result backend | Stores status of completed commands |
シーケンス図を見ると、よりイメージしやすいです。
Trigger Instance について
起動されているコンテナを見ると、 airflow-airflow-triggerer-1
というコンテナがいます。compose.yaml を見ると airflow triggerer
コマンドを実行して、 Trigger Instance を実行するものらしいですが。
こいつが何をしているのかさっぱり分からなかったのですが、下記の方のブログが非常に参考になりました。
公式ドキュメントでは、以下の説明があります。
Standard Operators and Sensors take up a full worker slot for the entire time they are running, even if they are idle; for example, if you only have 100 worker slots available to run Tasks, and you have 100 DAGs waiting on a Sensor that’s currently running but idle, then you cannot run anything else - even though your entire Airflow cluster is essentially idle. reschedule mode for Sensors solves some of this, allowing Sensors to only run at fixed intervals, but it is inflexible and only allows using time as the reason to resume, not anything else.
This is where Deferrable Operators come in. A deferrable operator is one that is written with the ability to suspend itself and free up the worker when it knows it has to wait, and hand off the job of resuming it to something called a Trigger. As a result, while it is suspended (deferred), it is not taking up a worker slot and your cluster will have a lot less resources wasted on idle Operators or Sensors.
Deferrable Operators & Triggers
Operators
とは、DAG ファイルを書くときに指定するあの Operator のこと。Smart Sensors
とは、Deferrable Operators
の先行となった Airflow におけるインフラリソース管理の仕組みのこと。
Smart Sensors This is a deprecated early-access feature that will be removed in Airflow 2.4.0. It is superseded by Deferrable Operators, which offer a more flexible way to achieve efficient long-running sensors, as well as allowing operators to also achieve similar efficiency gains. If you are considering writing a new Smart Sensor, you should instead write it as a Deferrable Operator.
Deferrable Operators
という worker を効率的に利用できる新しい Operator があり、 Trigger Instance
とは、Deferrable Operators で利用される Airflow のインフラリソース管理用プロセスなんだな、ぐらいの理解をしました。
起動しているコンテナと比較
Docker Compose で起動されている各コンテナと、これまで確認したコンポーネントをマッピングしてみます。こんな感じでしょうか。
コンテナ名 | マッピングされるコンポーネント |
---|---|
airflow-airflow-scheduler-1 | scheduler |
airflow-airflow-worker-1 | worker |
airflow-airflow-webserver-1 | webserver |
airflow-airflow-triggerer-1 | Deferrable Operators で利用されるプロセス |
airflow-postgres-1 | metadata database |
airflow-redis-1 | Queue mechanismcelery が担う所 |
Amazon Managed Workflows for Apache Airflow と比較してみる
Amazon Managed Workflows for Apache Airflow
のアーキテクチャー図を眺めてみます。
Google Cloud の Cloud Composer 2 と比較してみる
Cloud Composer 2
のアーキテクチャー図を眺めてみます。