Airflow のテストについて
Airflow の DAG ファイルのテスト方法について、調べた内容をまとめます。
環境
実行環境は、下記以前の投稿で作成したローカル上の Airflow 環境になります。
Apache Airflow をローカル環境にインストールする - goodbyegangsterのブログ
参考ドキュメント
参考にした資料たちです。
- Airflow 公式チュートリアル
- Airflow 公式 ベストプラクティス
- Astronomer Testing
- Testing in Airflow Part 1 — DAG Validation Tests, DAG Definition Tests and Unit Tests
- Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests
example の DAG ファイルを無効化
Airflow 側で用意してくれている example や tutorial 用の DAG フォルダを無効化しておきます。これら DAG ファイル内の記述のせいで、煩わしい警告メッセージが大量に出力されてしまいます。
airflow.cfg
# Whether to load the DAG examples that ship with Airflow. It's good to # get started, but you probably want to set this to ``False`` in a production # environment load_examples = False
config reference load_examples
テスト対象の DAG ファイル
利用できる Airflow CLI Commnad たち
CLI Command で利用できそうなものを確認。
DAG の正常登録を確認
airflow dags show
コマンドにて、DAG が正しく登録されているかどうかを確認できます。
$ airflow dags show sample_test_dag [2022-08-04 20:05:48,219] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags digraph sample_test_dag { graph [label=sample_test_dag labelloc=t rankdir=LR] get_version [color="#000000" fillcolor="#ffefeb" label=get_version shape=rectangle style="filled,rounded"] print_version [color="#000000" fillcolor="#ffefeb" label=print_version shape=rectangle style="filled,rounded"] get_version -> print_version }
airflow tasks list
コマンドでも確認可能です。
$ airflow tasks list -t sample_test_dag <Task(_PythonDecoratedOperator): get_version> <Task(_PythonDecoratedOperator): print_version>
DAG および Task の動作テスト
実際に DAG や Task を動かして、処理結果を確認することができます。
airflow dags test
コマンドでは、指定した DAG を実行することができます。実行した場合、メタデータベース上にも JOB 実行の実績が記録されます。
$ airflow dags test sample_test_dag 2022-08-04 [2022-08-04 20:09:36,434] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags [2022-08-04 20:09:36,673] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [queued]>'] print: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-04 20:09:41,763] {backfill_job.py:378} INFO - [backfill progress] | finished run 0 of 1 | tasks waiting: 1 | succeeded: 1 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 1 [2022-08-04 20:09:41,784] {base_executor.py:91} INFO - Adding to queue: ['<TaskInstance: sample_test_dag.print_version backfill__2022-08-04T00:00:00+00:00 [queued]>'] receive: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-04 20:09:46,652] {dagrun.py:562} INFO - Marking run <DagRun sample_test_dag @ 2022-08-04T00:00:00+00:00: backfill__2022-08-04T00:00:00+00:00, externally triggered: False> successful [2022-08-04 20:09:46,653] {dagrun.py:622} INFO - DagRun Finished: dag_id=sample_test_dag, execution_date=2022-08-04T00:00:00+00:00, run_id=backfill__2022-08-04T00:00:00+00:00, run_start_date=2022-08-04 11:09:36.603766+00:00, run_end_date=2022-08-04 11:09:46.653258+00:00, run_duration=10.049492, state=success, external_trigger=False, run_type=backfill, data_interval_start=2022-08-04T00:00:00+00:00, data_interval_end=2022-08-05T00:00:00+00:00, dag_hash=None [2022-08-04 20:09:46,653] {backfill_job.py:378} INFO - [backfill progress] | finished run 1 of 1 | tasks waiting: 0 | succeeded: 2 | running: 0 | failed: 0 | skipped: 0 | deadlocked: 0 | not ready: 0 [2022-08-04 20:09:46,660] {backfill_job.py:879} INFO - Backfill done. Exiting.
airflow tasks test
コマンドでは、Task 単位で実行することができます。Task 単位で実行した場合、メタデータベース上には JOB 実行の実績が記録されません。
$ airflow tasks test sample_test_dag get_version 2022-08-04 airflow tasks test sample_test_dag get_version 2022-08-04 [2022-08-05 01:15:43,066] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags [2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,078] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): get_version>, print_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,079] {taskmixin.py:206} WARNING - Dependency <Task(_PythonDecoratedOperator): print_version>, get_version already registered for DAG: sample_test_dag [2022-08-05 01:15:43,124] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]> [2022-08-05 01:15:43,141] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]> [2022-08-05 01:15:43,142] {taskinstance.py:1356} INFO - -------------------------------------------------------------------------------- [2022-08-05 01:15:43,142] {taskinstance.py:1357} INFO - Starting attempt 1 of 2 [2022-08-05 01:15:43,142] {taskinstance.py:1358} INFO - -------------------------------------------------------------------------------- [2022-08-05 01:15:43,143] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00 [2022-08-05 01:15:43,185] {taskinstance.py:1571} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=sample_test_dag AIRFLOW_CTX_TASK_ID=get_version AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00 print: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-05 01:15:43,186] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-05 01:15:43,198] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161543
Dependency に関する警告が出力されています。下記の GitHub によると、Xcoms を利用しているために起こる問題らしいです。ただ、TaskFlow API Task で書けば警告でないよみたいなコメントもあるのですが、僕の環境では警告メッセージが出力されていました。。。
"already registered for DAG" when dynamically generate tasks
taskmixin.py
で除外して出力すると、きれいに表示してくれます。
$ airflow tasks test sample_test_dag get_version 2022-08-04 | grep -v "taskmixin.py" [2022-08-05 01:17:44,607] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags [2022-08-05 01:17:44,652] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]> [2022-08-05 01:17:44,658] {taskinstance.py:1159} INFO - Dependencies all met for <TaskInstance: sample_test_dag.get_version backfill__2022-08-04T00:00:00+00:00 [success]> [2022-08-05 01:17:44,658] {taskinstance.py:1356} INFO - -------------------------------------------------------------------------------- [2022-08-05 01:17:44,658] {taskinstance.py:1357} INFO - Starting attempt 1 of 2 [2022-08-05 01:17:44,658] {taskinstance.py:1358} INFO - -------------------------------------------------------------------------------- [2022-08-05 01:17:44,659] {taskinstance.py:1377} INFO - Executing <Task(_PythonDecoratedOperator): get_version> on 2022-08-04 00:00:00+00:00 [2022-08-05 01:17:44,695] {taskinstance.py:1571} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=airflow AIRFLOW_CTX_DAG_ID=sample_test_dag AIRFLOW_CTX_TASK_ID=get_version AIRFLOW_CTX_EXECUTION_DATE=2022-08-04T00:00:00+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=backfill__2022-08-04T00:00:00+00:00 print: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-05 01:17:44,696] {python.py:173} INFO - Done. Returned value was: 3.7.13 (default, Jul 10 2022, 01:10:55) [GCC 9.4.0] [2022-08-05 01:17:44,710] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=sample_test_dag, task_id=get_version, execution_date=20220804T000000, start_date=20220804T110936, end_date=20220804T161744
「メタデータベース上に実績が記録されないのであれば、XComs を利用するような処理はテストできないのでは?」と思ったのですが、問題なく実行できました。下記の Stackoverflow によると、JOB 実行の履歴が残らないだけで、XComs にはアクセスがされているようです。
How to test Apache Airflow tasks that uses XCom
DAG Definition Tests
Python の unittest や pytest を利用して、宣言した DAG の定義情報を Validation する方法です。
テストコード
tests/test.py
実行
$ pytest -s tests/test.py =============================================================================== test session starts =============================================================================== platform linux -- Python 3.7.13, pytest-7.1.2, pluggy-1.0.0 rootdir: /home/mikochi/work/airflow plugins: anyio-3.6.1 collected 1 item tests/test.py [2022-08-05 01:51:31,400] {dagbag.py:507} INFO - Filling up the DagBag from /home/mikochi/work/airflow/dags <class 'airflow.models.dag.DAG'> {'_access_control': None, '_dag_id': 'sample_test_dag', '_default_view': 'grid', '_description': 'A sample test DAG', '_max_active_tasks': 16, '_pickle_id': None, '_task_group': <airflow.utils.task_group.TaskGroup object at 0x7f0a5da5b610>, 'catchup': False, 'dagrun_timeout': None, 'default_args': {'retries': 1, 'retry_delay': datetime.timedelta(seconds=300)}, 'doc_md': 'This is a sample test DAG.', 'edge_info': {}, 'end_date': DateTime(2022, 12, 31, 0, 0, 0, tzinfo=Timezone('UTC')), 'fileloc': '/home/mikochi/work/airflow/dags/sample_test.py', 'has_on_failure_callback': False, 'has_on_success_callback': False, 'is_paused_upon_creation': None, 'jinja_environment_kwargs': None, 'last_loaded': datetime.datetime(2022, 8, 4, 16, 51, 31, 479758, tzinfo=Timezone('UTC')), 'max_active_runs': 16, 'on_failure_callback': None, 'on_success_callback': None, 'orientation': 'LR', 'params': <airflow.models.param.ParamsDict object at 0x7f0a5e862490>, 'partial': False, 'render_template_as_native_obj': False, 'safe_dag_id': 'sample_test_dag', 'schedule_interval': datetime.timedelta(days=1), 'sla_miss_callback': None, 'start_date': DateTime(2022, 8, 1, 0, 0, 0, tzinfo=Timezone('UTC')), 'tags': ['sample'], 'task_count': 2, 'task_dict': {'get_version': <Task(_PythonDecoratedOperator): get_version>, 'print_version': <Task(_PythonDecoratedOperator): print_version>}, 'template_searchpath': None, 'template_undefined': <class 'jinja2.runtime.StrictUndefined'>, 'timetable': <airflow.timetables.interval.DeltaDataIntervalTimetable object at 0x7f0a5da5b2d0>, 'timezone': Timezone('UTC'), 'user_defined_filters': None, 'user_defined_macros': None} tasks: ['get_version', 'print_version'] .
DagBag とは
airflow.models.dagbag
とは、以下とのこと。
A dagbag is a collection of dags, parsed out of a folder tree and has high level configuration settings, like what database to use as a backend and what executor to use to fire off tasks.
dagbag オブジェクトに dag_id を指定して get_dag() メソッドを実行すると、該当 dag_id の DAG オブジェクトを取得でき、登録された情報を参照できます。その値を確認して、設定忘れがないか(catchup が無効であるかとか)を確認することができます。
Unit Test / End-to-End Pipeline Tests (Data Integrity Testing)
PythonOperator は、それ自体が Python の関数となるため、通常の Python プログラムのように単体テストのコードを書けば良いわけですが、外部のデータベース上データを操作する場合にどうするのか、適当なものがないか調べてみたのですが、やっぱり大変ですね。これはデータパイプライン全体のテストを実行する場合と同じですが、結局テストデータを用意して愚直に実施するしかないのかな、と。
以下のブログでは、その方法が紹介されていましたが、結局以下のような手順でした。
- テストデータを用意
- Airflow が提供する REST API を叩いて DAG を実行
- DAG の終了を待って、データ結果を確認
Testing in Airflow Part 2 — Integration Tests and End-To-End Pipeline Tests
近道はない、と。
Airflow API はこちら。