Airflow でタスクの条件分岐を行う

Airflow の DAG 定義内で、タスクの条件分岐する方法を確認します。Astronomer 社のサイトを参考にしています。

Branching in Airflow

環境

  • Airflow 2.3.2

BranchPythonOperator を利用する

BranchPythonOperator を利用する方法を確認します。以下は参考にした Github 上にあるサンプルコードです。

example_branch_operator.py

DAG ファイル

sample_branching_dag.py

gist.github.com

グラフ図

XComs について

beginning() 関数は、取得した乱数を返す PythonOperator です。ここで取得された乱数の値は、下流branching() 関数にて、条件分岐の値として利用されています。

コード上では明示されていないですが、ここでは XComs という Airflow のタスク間でデータをやり取りする機能が利用されています。

XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.

XComs

XComs に関しても、Astronomer 社のサイトが参考になります。タスクの戻り値は暗黙的に XComs として扱われ、Airflow 内のメタデータベースに保管される、とのことです。この暗黙的な設定は、タスクのパラメーターで変更することができます。

XComs can be “pushed”, meaning sent by a task, or “pulled”, meaning received by a task. When an XCom is pushed, it is stored in Airflow’s metadata database and made available to all other tasks. Any time a task returns a value (e.g. if your Python callable for your PythonOperator has a return), that value will automatically be pushed to XCom. Tasks can also be configured to push XComs by calling the xcom_push() method. Similarly, xcom_pull() can be used in a task to receive an XCom.

Passing Data Between Airflow Tasks

BranchPythonOperator について

branching() 関数が BranchPythonOperator にあたります。BranchPythonOperator も PythonOperator と同様に decorate することで宣言でき、decorate には airflow.decorators.branch_python.branch_task() を利用します。

BranchPythonOperator は、下流となる task_id を戻り値として指定することで、条件分岐をしてくれます。

It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}. All other “branches” or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.

class airflow.operators.python.BranchPythonOperator

trigger_rule について

分岐された処理が、再び結合する finishing タスクでは、パラメーター trigger_rule に値 none_failed_min_one_success を明示的に指定しています。

trigger_rule を変更することで、そのタスクが実行される「上流タスクの成功可否条件」を変更できます。

:param trigger_rule: defines the rule by which dependencies are applied for the task to get triggered. Options are: { all_success | all_failed | all_done | all_skipped | one_success | one_failed | none_failed | none_failed_min_one_success | none_skipped | always} default is all_success. Options can be set as string or using the constants defined in the static class airflow.utils.TriggerRule

airflow.models.baseoperator

ShortCircuitOperator を利用する

続いて ShortCircuitOperator を利用する方法を確認します。以下は、Github 上のサンプルコード。

example_short_circuit_operator.py

DAGファイル

gist.github.com

グラフ図

ShortCircuitOperator について

上記のサンプルコードでは、short_circuit_operator という名前のタスクが ShortCircuitOperator を利用している箇所となります。

ShortCircuitOperatorPythonOperator のサブクラスとなり、タスクを宣言する際に実行する Python 関数を指定します。指定した関数の戻り値の真偽値により、下流タスクの実行を制御することができます。

Use the ShortCircuitOperator to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained. The evaluation of this condition and truthy value is done via the output of a python_callable. If the python_callable returns True or a truthy value, the pipeline is allowed to continue and an XCom of the output will be pushed. If the output is False or a falsy value, the pipeline will be short-circuited based on the configured short-circuiting (more on this later). In the example below, the tasks that follow the “condition_is_True” ShortCircuitOperator will execute while the tasks downstream of the “condition_is_False” ShortCircuitOperator will be skipped.

ShortCircuitOperator

class airflow.operators.python.ShortCircuitOperator

ドキュメント内で気になった記述。

ShortCircuitOperator は、ShortCircuitOperator タスクの下流タスク全てを、trigger_rule を無視してスキップさせるが、パラメーター ignore_downstream_trigger_rules を False にした場合、直下の下流タスクのみをスキップさせる、ということでしょうか。

The “short-circuiting” can be configured to either respect or ignore the trigger rule defined for downstream tasks. If ignore_downstream_trigger_rules is set to True, the default configuration, all downstream tasks are skipped without considering the trigger_rule defined for tasks. If this parameter is set to False, the direct downstream tasks are skipped but the specified trigger_rule for other subsequent downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This configuration is especially useful if only part of a pipeline should be short-circuited rather than all tasks which follow the ShortCircuitOperator task.

その他

タスクの条件分岐に利用できる、その他 Operator を列挙します。

BranchSQLOperator

SQL クエリ結果の真偽値により、下流のタスクを制御できるもの。

BranchSQLOperator

BranchDayOfWeekOperator

タスクが実行された曜日により制御できるもの。

BranchDayOfWeekOperator

example example_branch_day_of_week_operator.py

BranchDateTimeOperator

タスクが実行された日時により制御できるもの。

BranchDateTimeOperator

example example_branch_datetime_operator.py

LatestOnlyOperator

何らかの理由により、過去に実行されるべきであった DAG のスケジューリングが滞留していた場合、最新スケジューリングでの DAG 実行時のみ下流タスクを実行するもの。

Allows a workflow to skip tasks that are not running during the most recent schedule interval.

If the task is run outside of the latest schedule interval (i.e. external_trigger), all directly downstream tasks will be skipped.

LatestOnlyOperator