Airflow でタスクの条件分岐を行う
Airflow の DAG 定義内で、タスクの条件分岐する方法を確認します。Astronomer 社のサイトを参考にしています。
環境
- Airflow 2.3.2
BranchPythonOperator
を利用する
BranchPythonOperator
を利用する方法を確認します。以下は参考にした Github 上にあるサンプルコードです。
DAG ファイル
sample_branching_dag.py
グラフ図
XComs について
beginning()
関数は、取得した乱数を返す PythonOperator
です。ここで取得された乱数の値は、下流の branching()
関数にて、条件分岐の値として利用されています。
コード上では明示されていないですが、ここでは XComs
という Airflow のタスク間でデータをやり取りする機能が利用されています。
XComs (short for “cross-communications”) are a mechanism that let Tasks talk to each other, as by default Tasks are entirely isolated and may be running on entirely different machines.
XComs に関しても、Astronomer 社のサイトが参考になります。タスクの戻り値は暗黙的に XComs として扱われ、Airflow 内のメタデータベースに保管される、とのことです。この暗黙的な設定は、タスクのパラメーターで変更することができます。
XComs can be “pushed”, meaning sent by a task, or “pulled”, meaning received by a task. When an XCom is pushed, it is stored in Airflow’s metadata database and made available to all other tasks. Any time a task returns a value (e.g. if your Python callable for your PythonOperator has a return), that value will automatically be pushed to XCom. Tasks can also be configured to push XComs by calling the
xcom_push()
method. Similarly,xcom_pull()
can be used in a task to receive an XCom.
Passing Data Between Airflow Tasks
BranchPythonOperator
について
branching()
関数が BranchPythonOperator にあたります。BranchPythonOperator も PythonOperator と同様に decorate することで宣言でき、decorate には airflow.decorators.branch_python.branch_task()
を利用します。
BranchPythonOperator は、下流となる task_id を戻り値として指定することで、条件分岐をしてくれます。
It derives the PythonOperator and expects a Python function that returns a single task_id or list of task_ids to follow. The task_id(s) returned should point to a task directly downstream from {self}. All other “branches” or directly downstream tasks are marked with a state of skipped so that these paths can’t move forward. The skipped states are propagated downstream to allow for the DAG state to fill up and the DAG run’s state to be inferred.
class airflow.operators.python.BranchPythonOperator
trigger_rule
について
分岐された処理が、再び結合する finishing タスクでは、パラメーター trigger_rule
に値 none_failed_min_one_success
を明示的に指定しています。
trigger_rule を変更することで、そのタスクが実行される「上流タスクの成功可否条件」を変更できます。
:param trigger_rule: defines the rule by which dependencies are applied for the task to get triggered. Options are:
{ all_success | all_failed | all_done | all_skipped | one_success | one_failed | none_failed | none_failed_min_one_success | none_skipped | always}
default isall_success
. Options can be set as string or using the constants defined in the static classairflow.utils.TriggerRule
ShortCircuitOperator
を利用する
続いて ShortCircuitOperator
を利用する方法を確認します。以下は、Github 上のサンプルコード。
example_short_circuit_operator.py
DAGファイル
グラフ図
ShortCircuitOperator
について
上記のサンプルコードでは、short_circuit_operator という名前のタスクが ShortCircuitOperator
を利用している箇所となります。
ShortCircuitOperator
は PythonOperator
のサブクラスとなり、タスクを宣言する際に実行する Python 関数を指定します。指定した関数の戻り値の真偽値により、下流タスクの実行を制御することができます。
Use the ShortCircuitOperator to control whether a pipeline continues if a condition is satisfied or a truthy value is obtained. The evaluation of this condition and truthy value is done via the output of a
python_callable
. If thepython_callable
returns True or a truthy value, the pipeline is allowed to continue and an XCom of the output will be pushed. If the output is False or a falsy value, the pipeline will be short-circuited based on the configured short-circuiting (more on this later). In the example below, the tasks that follow the “condition_is_True” ShortCircuitOperator will execute while the tasks downstream of the “condition_is_False” ShortCircuitOperator will be skipped.
class airflow.operators.python.ShortCircuitOperator
ドキュメント内で気になった記述。
ShortCircuitOperator は、ShortCircuitOperator タスクの下流タスク全てを、trigger_rule を無視してスキップさせるが、パラメーター ignore_downstream_trigger_rules
を False にした場合、直下の下流タスクのみをスキップさせる、ということでしょうか。
The “short-circuiting” can be configured to either respect or ignore the trigger rule defined for downstream tasks. If
ignore_downstream_trigger_rules
is set to True, the default configuration, all downstream tasks are skipped without considering thetrigger_rule
defined for tasks. If this parameter is set to False, the direct downstream tasks are skipped but the specifiedtrigger_rule
for other subsequent downstream tasks are respected. In this short-circuiting configuration, the operator assumes the direct downstream task(s) were purposely meant to be skipped but perhaps not other subsequent tasks. This configuration is especially useful if only part of a pipeline should be short-circuited rather than all tasks which follow the ShortCircuitOperator task.
その他
タスクの条件分岐に利用できる、その他 Operator を列挙します。
BranchSQLOperator
SQL クエリ結果の真偽値により、下流のタスクを制御できるもの。
BranchDayOfWeekOperator
タスクが実行された曜日により制御できるもの。
example example_branch_day_of_week_operator.py
BranchDateTimeOperator
タスクが実行された日時により制御できるもの。
example example_branch_datetime_operator.py
LatestOnlyOperator
何らかの理由により、過去に実行されるべきであった DAG のスケジューリングが滞留していた場合、最新スケジューリングでの DAG 実行時のみ下流タスクを実行するもの。
Allows a workflow to skip tasks that are not running during the most recent schedule interval.
If the task is run outside of the latest schedule interval (i.e. external_trigger), all directly downstream tasks will be skipped.