Airflow の DAG 間で依存関係を設定する

Airflowにて、DAG の依存関係を設定する方法を確認します。今回も Astronomer 社のサイトより、下記ページを参考にしています。

Cross-DAG Dependencies

環境

TriggerDagRunOperator を利用する方法

TriggerDagRunOperator は、異なる DAG を実行するための Operator です。

The TriggerDagRunOperator is an easy way to implement cross-DAG dependencies. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment.

The TriggerDagRunOperator is ideal in situations where you have one upstream DAG that needs to trigger one or more downstream DAGs, or if you have dependent DAGs that have both upstream and downstream tasks in the upstream DAG (i.e. the dependent DAG is in the middle of tasks in the upstream DAG). Because you can use this operator for any task in your DAG, it is highly flexible.

下記は公式で用意してくれているサンプルコードです。

example_trigger_controller_dag.py

example_trigger_target_dag.py

DAG ファイル

trigger_run_dag

gist.github.com

trigger_target_dag

gist.github.com

グラフ図

DAG 間の依存関係を示す図も、WEB UI 上に表示してくれます。

TriggerRunDagOperator の使い方

TriggerRunDagOperator では、パラメーター trigger_dag_id で指定した DAG を実行してくれます。パラメーター wait_for_completion にて、TriggerRunDagOperator であるタスクが、呼び出したタスクの終了を待つか否かを指定できます。

class airflow.operators.trigger_dagrun.TriggerDagRunOperator

TriggerRunDagOperator のパラメーター conf に辞書を渡すことで、起動された DAG の DAG run の conf パラメーターに値を持たせることができます。DAG run とは、DAGが実行される都度生成されるオブジェクトとのこと。実際はメタデータベース上で値が管理されているぽいです。

A DAG Run is an object representing an instantiation of the DAG in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states. Each DAG Run is run separately from another, meaning that you can have running DAG many times at the same time.

DAG Runs

airflow.models.dagrun

各 DAG の Operator では、自身の「DAG Run」内の conf パラメーターの値を利用することができます。上のサンプルコードでは、trigger_target_dag 側の print_message 関数がそれです。

When triggering a DAG from the CLI, the REST API or the UI, it is possible to pass configuration for a DAG Run as a JSON blob.

Note: The parameters from dag_run.conf can only be used in a template field of an operator.

Passing Parameters when triggering dags

出力されたログを見ると、正しく値を受け取っています。

[2022-06-26, 14:12:50 JST] {task_command.py:370} INFO - Running <TaskInstance: trigger_target_dag.pring_message manual__2022-06-26T05:12:48.263234+00:00 [running]> on host 6670a0ab2606
[2022-06-26, 14:12:50 JST] {taskinstance.py:1571} INFO - Exporting the following env vars:
AIRFLOW_CTX_DAG_OWNER=***
AIRFLOW_CTX_DAG_ID=trigger_target_dag
AIRFLOW_CTX_TASK_ID=pring_message
AIRFLOW_CTX_EXECUTION_DATE=2022-06-26T05:12:48.263234+00:00
AIRFLOW_CTX_TRY_NUMBER=1
AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-26T05:12:48.263234+00:00
[2022-06-26, 14:12:50 JST] {logging_mixin.py:115} INFO - received message: Mikochi.    <= ここ
[2022-06-26, 14:12:50 JST] {python.py:173} INFO - Done. Returned value was: None
[2022-06-26, 14:12:50 JST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=trigger_target_dag, task_id=pring_message, execution_date=20220626T051248, start_date=20220626T051250, end_date=20220626T051250

ExternalTaskSensor を利用する方法

続いて ExternalTaskSensor を利用する方法を確認します。起動される DAG 側のタスクに、ExternalTaskSensor を設定することで、依存関係を設定することができます。

The next method for creating cross-DAG dependencies is to add an ExternalTaskSensor to your downstream DAG. The downstream DAG will wait until a task is completed in the upstream DAG before moving on to the rest of the DAG.

This method is not as flexible as the TriggerDagRunOperator, since the dependency is implemented in the downstream DAG. It is ideal in situations where you have a downstream DAG that is dependent on multiple upstream DAGs.

下記は公式で用意してくれているサンプルコードです。

example_external_task_marker_dag.py

Sensor とは

ExternalTaskSensor とは Sensor の一種となり、そもそも Sensor とは、何かの出来事を待ち受けるよう設計された Operator の一種となります。

Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.

Sensor の起動モードには、以下の3種類があり、「 worker リソースの利用」や「タスク着火条件を気づくタイミング」に差異があります。

Because they are primarily idle, Sensors have three different modes of running so you can be a bit more efficient about using them:

・poke (default): The Sensor takes up a worker slot for its entire runtime

・reschedule: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks

・smart sensor: There is a single centralized version of this Sensor that batches all executions of it

The poke and reschedule modes can be configured directly when you instantiate the sensor; generally, the trade-off between them is latency. Something that is checking every second should be in poke mode, while something that is checking every minute should be in reschedule mode.

Sensors

class airflow.sensors.base.BaseSensorOperator

smart sensor は、すでに利用非推奨となっており、smart sensor を、且つ poke モードの Sensor を改善するものとして、 Deferrable Operators が発表されています。

Deferrable Operators & Triggers

DAG ファイル

gist.github.com

グラフ図

ExternalTaskMarker とは

タスク parent_task では、ExternalTaskMarker というオペレーターを利用しています。ExternalTaskMarker は、異なる DAG のタスクが、この ExternalTaskMarker のタスクに依存していることを明示的に示すために利用できます。このオペレーターがなくとも ExternalTaskSensor は正しく動作しますが、コード上で依存関係の明示的に示せる点がメリットでしょうか。且つ、タスク実行結果をクリアする際に、依存関係を持つ下流 DAG のタスクも含めて、クリアできるとのこと。

Use this operator to indicate that a task on a different DAG depends on this task. When this task is cleared with “Recursive” selected, Airflow will clear the task on the other DAG and its downstream tasks recursively. Transitive dependencies are followed until the recursion_depth is reached.

class airflow.sensors.external_task.ExternalTaskMarker

ExternalTaskSensor とは

ExternalTaskSensor が、異なる DAG のタスクを待ち受けてくれる Sensor となります。待ち受ける事になる external_dag_idexternal_task_id を指定して利用します。

class airflow.sensors.external_task.ExternalTaskSensor

ExternalTaskSensor を利用する注意点として、上流の DAG と下流の DAG の、start_dateschedule_interval が一致していなければなりません。ExternalTaskSensor は、各 DAG やタスクの Execution_date の値を見ているらしく、両者 DAG のこの値が一致していない場合、動作しなくなります。

Also note that in the example above, the upstream DAG and downstream DAG must have the same start date and schedule interval. This is because the ExternalTaskSensor will look for completion of the specified task or DAG at the same execution_date. To look for completion of the external task at a different date, you can make use of either of the execution_delta or execution_date_fn parameters.

仮に両者 DAG の start_dateschedule_interval が一致していない場合(つまり、両者 DAG の execution_date の値が一致しない場合 )、その点を補う機能として execution_deltaexecution_date_fn というパラメーターが ExternalTaskSensor には用意されています。

execution_delta (Optional[datetime.timedelta]) – time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

execution_date_fn (Optional[Callable]) – function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.

これらパラメーターを利用して、両者 DAG の execution_date をなんとか一致させろ、という事しょうね。