Airflow のテストについて

Airflow の DAG ファイルのテスト方法について、調べた内容をまとめます。

環境

実行環境は、下記以前の投稿で作成したローカル上の Airflow 環境になります。

Apache Airflow をローカル環境にインストールする - goodbyegangsterのブログ

参考ドキュメント

参考にした資料たちです。

example の DAG ファイルを無効化

Airflow 側で用意してくれている example や tutorial 用の DAG フォルダを無効化しておきます。これら DAG ファイル内の記述のせいで、煩わしい警告メッセージが大量に出力されてしまいます。

airflow.cfg

# Whether to load the DAG examples that ship with Airflow. It's good to
# get started, but you probably want to set this to ``False`` in a production
# environment
load_examples = False

config reference load_examples

テスト対象の DAG ファイル

gist.github.com

利用できる Airflow CLI Commnad たち

CLI Command で利用できそうなものを確認。

DAG の正常登録を確認

airflow dags show コマンドにて、DAG が正しく登録されているかどうかを確認できます。

$ airflow dags show sample_test_dag
[2022-08-04 20:05:48,219] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
digraph sample_test_dag {
        graph [label=sample_test_dag labelloc=t rankdir=LR]
        get_version [color="#000000" fillcolor="#ffefeb" label=get_version shape=rectangle style="filled,rounded"]
        print_version [color="#000000" fillcolor="#ffefeb" label=print_version shape=rectangle style="filled,rounded"]
        get_version -> print_version
}

airflow dags show

airflow tasks list コマンドでも確認可能です。

$ airflow tasks list -t sample_test_dag
<Task(_PythonDecoratedOperator): get_version>
    <Task(_PythonDecoratedOperator): print_version>

airflow tasks list

DAG および Task の動作テスト

実際に DAG や Task を動かして、処理結果を確認することができます。

airflow dags test コマンドでは、指定した DAG を実行することができます。実行した場合、メタデータベース上にも JOB 実行の実績が記録されます。

$ airflow dags test sample_test_dag 2022-08-04
[2022-08-04 20:09:36,434] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-04 20:09:36,673] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [queued]>']
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-04 20:09:41,763] {backfill_job.py:378} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1
[2022-08-04 20:09:41,784] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.print_version backfill__2022-08-04T00:00:00+00:00 [queued]>']
receive: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-04 20:09:46,652] {dagrun.py:562} INFO - Marking run <DagRun sample_test_dag @ 2022-08-04T00:00:00+00:00: backfill__2022-08-04T00:00:00+00:00, externally triggered: False> successful
[2022-08-04 20:09:46,653] {dagrun.py:622} INFO - DagRun Finished: dag_id=sample_test_dag, execution_date=2022-08-04T00:00:00+00:00, run_id=backfill__2022-08-04T00:00:00+00:00, run_start_date=2022-08-04 11:09:36.603766+00:00, run_end_date=2022-08-04 11:09:46.653258+00:00, run_duration=10.049492, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-08-04T00:00:00+00:00, data_interval_end=2022-08-05T00:00:00+00:00, dag_hash=None
[2022-08-04 20:09:46,653] {backfill_job.py:378} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0
[2022-08-04 20:09:46,660] {backfill_job.py:879} INFO - Backfill done. Exiting.

airflow dags test

airflow tasks test コマンドでは、Task 単位で実行することができます。Task 単位で実行した場合、メタデータベース上には JOB 実行の実績が記録されません。

$ airflow tasks test sample_test_dag get_version 2022-08-04
airflow tasks test sample_test_dag get_version 2022-08-04
[2022-08-05 01:15:43,066] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag
[2022-08-05 01:15:43,124] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:15:43,141] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:15:43,142] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:15:43,142] {taskinstance.py:1357} INFO - Starting attempt 1 of 2
[2022-08-05 01:15:43,142] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:15:43,143] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00
[2022-08-05 01:15:43,185] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample_test_dag
AIRFLOW_CTX_TASK_ID=get_version
AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:15:43,186] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:15:43,198] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161543

airflow tasks test

Dependency に関する警告が出力されています。下記の GitHub によると、Xcoms を利用しているために起こる問題らしいです。ただ、TaskFlow API Task で書けば警告でないよみたいなコメントもあるのですが、僕の環境では警告メッセージが出力されていました。。。

"already registered for DAG" when dynamically generate tasks

taskmixin.py で除外して出力すると、きれいに表示してくれます。

$ airflow tasks test sample_test_dag get_version 2022-08-04 | grep -v "taskmixin.py"
[2022-08-05 01:17:44,607] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
[2022-08-05 01:17:44,652] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:17:44,658] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]>
[2022-08-05 01:17:44,658] {taskinstance.py:1356} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:17:44,658] {taskinstance.py:1357} INFO - Starting attempt 1 of 2
[2022-08-05 01:17:44,658] {taskinstance.py:1358} INFO -
--------------------------------------------------------------------------------
[2022-08-05 01:17:44,659] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00
[2022-08-05 01:17:44,695] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=airflow
AIRFLOW_CTX_DAG_ID=sample_test_dag
AIRFLOW_CTX_TASK_ID=get_version
AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00
print: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:17:44,696] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55)
[GCC 9.4.0]
[2022-08-05 01:17:44,710] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161744

メタデータベース上に実績が記録されないのであれば、XComs を利用するような処理はテストできないのでは?」と思ったのですが、問題なく実行できました。下記の Stackoverflow によると、JOB 実行の履歴が残らないだけで、XComs にはアクセスがされているようです。

How to test Apache Airflow tasks that uses XCom

DAG Definition Tests

Python の unittest や pytest を利用して、宣言した DAG の定義情報を Validation する方法です。

テストコード

tests/test.py

gist.github.com

実行

$ pytest -s tests/test.py
=============================================================================== test session starts ===============================================================================
platform linux -- Python 3.7.13, pytest-7.1.2, pluggy-1.0.0
rootdir: /home/mikochi/work/airflow
plugins: anyio-3.6.1
collected 1 item                                                                                                                                                                  

tests/test.py [2022-08-05 01:51:31,400] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags
<class 'airflow.models.dag.DAG'>
{'_access_control': None,
 '_dag_id': 'sample_test_dag',
 '_default_view': 'grid',
 '_description': 'A sample test DAG',
 '_max_active_tasks': 16,
 '_pickle_id': None,
 '_task_group': <airflow.utils.task_group.TaskGroup object at 0x7f0a5da5b610>,
 'catchup': False,
 'dagrun_timeout': None,
 'default_args': {'retries': 1, 'retry_delay': datetime.timedelta(seconds=300)},
 'doc_md': 'This is a sample test DAG.',
 'edge_info': {},
 'end_date': DateTime(2022, 12, 31, 0, 0, 0, tzinfo=Timezone('UTC')),
 'fileloc': '/home/mikochi/work/airflow/dags/sample_test.py',
 'has_on_failure_callback': False,
 'has_on_success_callback': False,
 'is_paused_upon_creation': None,
 'jinja_environment_kwargs': None,
 'last_loaded': datetime.datetime(2022, 8, 4, 16, 51, 31, 479758, tzinfo=Timezone('UTC')),
 'max_active_runs': 16,
 'on_failure_callback': None,
 'on_success_callback': None,
 'orientation': 'LR',
 'params': <airflow.models.param.ParamsDict object at 0x7f0a5e862490>,
 'partial': False,
 'render_template_as_native_obj': False,
 'safe_dag_id': 'sample_test_dag',
 'schedule_interval': datetime.timedelta(days=1),
 'sla_miss_callback': None,
 'start_date': DateTime(2022, 8, 1, 0, 0, 0, tzinfo=Timezone('UTC')),
 'tags': ['sample'],
 'task_count': 2,
 'task_dict': {'get_version': <Task(_PythonDecoratedOperator): get_version>,
               'print_version': <Task(_PythonDecoratedOperator): print_version>},
 'template_searchpath': None,
 'template_undefined': <class 'jinja2.runtime.StrictUndefined'>,
 'timetable': <airflow.timetables.interval.DeltaDataIntervalTimetable object at 0x7f0a5da5b2d0>,
 'timezone': Timezone('UTC'),
 'user_defined_filters': None,
 'user_defined_macros': None}
tasks: ['get_version', 'print_version']
.

DagBag とは

airflow.models.dagbag とは、以下とのこと。

A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks.

airflow.models.dagbag

dagbag オブジェクトに dag_id を指定して get_dag() メソッドを実行すると、該当 dag_id の DAG オブジェクトを取得でき、登録された情報を参照できます。その値を確認して、設定忘れがないか(catchup が無効であるかとか)を確認することができます。

Unit Test / End-to-End Pipeline Tests (Data Integrity Testing)

PythonOperator は、それ自体が Python の関数となるため、通常の Python プログラムのように単体テストのコードを書けば良いわけですが、外部のデータベース上データを操作する場合にどうするのか、適当なものがないか調べてみたのですが、やっぱり大変ですね。これはデータパイプライン全体のテストを実行する場合と同じですが、結局テストデータを用意して愚直に実施するしかないのかな、と。

以下のブログでは、その方法が紹介されていましたが、結局以下のような手順でした。

  1. テストデータを用意
  2. Airflow が提供する REST API を叩いて DAG を実行
  3. DAG の終了を待って、データ結果を確認

Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests

近道はない、と。

Airflow API はこちら。

Airflow API (Stable) (1.0.0)