Airflow の DAG 間で依存関係を設定する
Airflowにて、DAG の依存関係を設定する方法を確認します。今回も Astronomer 社のサイトより、下記ページを参考にしています。
環境
- Apache Airflow 2.3.2
TriggerDagRunOperator
を利用する方法
TriggerDagRunOperator
は、異なる DAG を実行するための Operator です。
The
TriggerDagRunOperator
is an easy way to implement cross-DAG dependencies. This operator allows you to have a task in one DAG that triggers another DAG in the same Airflow environment.The
TriggerDagRunOperator
is ideal in situations where you have one upstream DAG that needs to trigger one or more downstream DAGs, or if you have dependent DAGs that have both upstream and downstream tasks in the upstream DAG (i.e. the dependent DAG is in the middle of tasks in the upstream DAG). Because you can use this operator for any task in your DAG, it is highly flexible.
下記は公式で用意してくれているサンプルコードです。
example_trigger_controller_dag.py
DAG ファイル
trigger_run_dag
trigger_target_dag
グラフ図
DAG 間の依存関係を示す図も、WEB UI 上に表示してくれます。
TriggerRunDagOperator
の使い方
TriggerRunDagOperator
では、パラメーター trigger_dag_id
で指定した DAG を実行してくれます。パラメーター wait_for_completion
にて、TriggerRunDagOperator であるタスクが、呼び出したタスクの終了を待つか否かを指定できます。
class airflow.operators.trigger_dagrun.TriggerDagRunOperator
TriggerRunDagOperator のパラメーター conf
に辞書を渡すことで、起動された DAG の DAG run
の conf パラメーターに値を持たせることができます。DAG run
とは、DAGが実行される都度生成されるオブジェクトとのこと。実際はメタデータベース上で値が管理されているぽいです。
A DAG Run is an object representing an instantiation of the DAG in time. Any time the DAG is executed, a DAG Run is created and all tasks inside it are executed. The status of the DAG Run depends on the tasks states. Each DAG Run is run separately from another, meaning that you can have running DAG many times at the same time.
各 DAG の Operator では、自身の「DAG Run」内の conf パラメーターの値を利用することができます。上のサンプルコードでは、trigger_target_dag 側の print_message 関数がそれです。
When triggering a DAG from the CLI, the REST API or the UI, it is possible to pass configuration for a DAG Run as a JSON blob.
Note: The parameters from
dag_run.conf
can only be used in a template field of an operator.
Passing Parameters when triggering dags
出力されたログを見ると、正しく値を受け取っています。
[2022-06-26, 14:12:50 JST] {task_command.py:370} INFO - Running <TaskInstance: trigger_target_dag.pring_message manual__2022-06-26T05:12:48.263234+00:00 [running]> on host 6670a0ab2606 [2022-06-26, 14:12:50 JST] {taskinstance.py:1571} INFO - Exporting the following env vars: AIRFLOW_CTX_DAG_OWNER=*** AIRFLOW_CTX_DAG_ID=trigger_target_dag AIRFLOW_CTX_TASK_ID=pring_message AIRFLOW_CTX_EXECUTION_DATE=2022-06-26T05:12:48.263234+00:00 AIRFLOW_CTX_TRY_NUMBER=1 AIRFLOW_CTX_DAG_RUN_ID=manual__2022-06-26T05:12:48.263234+00:00 [2022-06-26, 14:12:50 JST] {logging_mixin.py:115} INFO - received message: Mikochi. <= ここ [2022-06-26, 14:12:50 JST] {python.py:173} INFO - Done. Returned value was: None [2022-06-26, 14:12:50 JST] {taskinstance.py:1400} INFO - Marking task as SUCCESS. dag_id=trigger_target_dag, task_id=pring_message, execution_date=20220626T051248, start_date=20220626T051250, end_date=20220626T051250
ExternalTaskSensor
を利用する方法
続いて ExternalTaskSensor
を利用する方法を確認します。起動される DAG 側のタスクに、ExternalTaskSensor
を設定することで、依存関係を設定することができます。
The next method for creating cross-DAG dependencies is to add an
ExternalTaskSensor
to your downstream DAG. The downstream DAG will wait until a task is completed in the upstream DAG before moving on to the rest of the DAG.This method is not as flexible as the
TriggerDagRunOperator
, since the dependency is implemented in the downstream DAG. It is ideal in situations where you have a downstream DAG that is dependent on multiple upstream DAGs.
下記は公式で用意してくれているサンプルコードです。
example_external_task_marker_dag.py
Sensor
とは
ExternalTaskSensor
とは Sensor
の一種となり、そもそも Sensor
とは、何かの出来事を待ち受けるよう設計された Operator
の一種となります。
Sensors are a special type of Operator that are designed to do exactly one thing - wait for something to occur. It can be time-based, or waiting for a file, or an external event, but all they do is wait until something happens, and then succeed so their downstream tasks can run.
Sensor
の起動モードには、以下の3種類があり、「 worker リソースの利用」や「タスク着火条件を気づくタイミング」に差異があります。
Because they are primarily idle, Sensors have three different modes of running so you can be a bit more efficient about using them:
・poke (default): The Sensor takes up a worker slot for its entire runtime
・reschedule: The Sensor takes up a worker slot only when it is checking, and sleeps for a set duration between checks
・smart sensor: There is a single centralized version of this Sensor that batches all executions of it
The poke and reschedule modes can be configured directly when you instantiate the sensor; generally, the trade-off between them is latency. Something that is checking every second should be in poke mode, while something that is checking every minute should be in reschedule mode.
class airflow.sensors.base.BaseSensorOperator
smart sensor は、すでに利用非推奨となっており、smart sensor を、且つ poke モードの Sensor を改善するものとして、 Deferrable Operators
が発表されています。
Deferrable Operators & Triggers
DAG ファイル
グラフ図
ExternalTaskMarker
とは
タスク parent_task では、ExternalTaskMarker
というオペレーターを利用しています。ExternalTaskMarker は、異なる DAG のタスクが、この ExternalTaskMarker のタスクに依存していることを明示的に示すために利用できます。このオペレーターがなくとも ExternalTaskSensor
は正しく動作しますが、コード上で依存関係の明示的に示せる点がメリットでしょうか。且つ、タスク実行結果をクリアする際に、依存関係を持つ下流 DAG のタスクも含めて、クリアできるとのこと。
Use this operator to indicate that a task on a different DAG depends on this task. When this task is cleared with “Recursive” selected, Airflow will clear the task on the other DAG and its downstream tasks recursively. Transitive dependencies are followed until the recursion_depth is reached.
class airflow.sensors.external_task.ExternalTaskMarker
ExternalTaskSensor
とは
ExternalTaskSensor
が、異なる DAG のタスクを待ち受けてくれる Sensor となります。待ち受ける事になる external_dag_id
と external_task_id
を指定して利用します。
class airflow.sensors.external_task.ExternalTaskSensor
ExternalTaskSensor
を利用する注意点として、上流の DAG と下流の DAG の、start_date
と schedule_interval
が一致していなければなりません。ExternalTaskSensor は、各 DAG やタスクの Execution_date
の値を見ているらしく、両者 DAG のこの値が一致していない場合、動作しなくなります。
Also note that in the example above, the upstream DAG and downstream DAG must have the same start date and schedule interval. This is because the
ExternalTaskSensor
will look for completion of the specified task or DAG at the sameexecution_date
. To look for completion of the external task at a different date, you can make use of either of theexecution_delta
orexecution_date_fn
parameters.
仮に両者 DAG の start_date
と schedule_interval
が一致していない場合(つまり、両者 DAG の execution_date の値が一致しない場合 )、その点を補う機能として execution_delta
と execution_date_fn
というパラメーターが ExternalTaskSensor
には用意されています。
・execution_delta (Optional[datetime.timedelta]) – time difference with the previous execution to look at, the default is the same logical date as the current task or DAG. For yesterday, use [positive!] datetime.timedelta(days=1). Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
・execution_date_fn (Optional[Callable]) – function that receives the current execution’s logical date as the first positional argument and optionally any number of keyword arguments available in the context dictionary, and returns the desired logical dates to query. Either execution_delta or execution_date_fn can be passed to ExternalTaskSensor, but not both.
これらパラメーターを利用して、両者 DAG の execution_date をなんとか一致させろ、という事しょうね。