Airflow の DAG 定義ファイルに入門する

公式ドキュメントにある tutorial の参考にしつつ、 Airflow の DAG 定義ファイルの書き方を確認します。

tutorial

環境

DAG 定義ファイルの必要要素

必要となる要素の確認。

  • Python Library のインストール
  • DAG オブジェクトのインスタンス
    • 各種引数の設定
  • タスクの宣言
  • タスクの依存関係の定義

サンプルの DAG 定義ファイル

今回作成したサンプルのファイルです。

sample_dag.py

gist.github.com

サンプルの DAG 定義ファイル(詳細)

コメントを付与した部分毎に詳細を記載します。

define default arguments for operators.

default arguments とは、DAG をインスタンス化する際に指定できるパラメーターで、その DAG 内で利用される Operator に共通で適用されるパラメーターとなるようです。

default_args (Optional[Dict]) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.

class airflow.models.dag.DAG

すべての Operator は、BaseOperator クラスを必ず継承するようなので、 BaseOperator クラスで定義されているパラメーターを設定する感じでしょうか。

Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.

class airflow.models.baseoperator.BaseOperator

instantiate a DAG.

DAG オブジェクトをインスタンス化します。上のサンプルでは Context Manager を利用していますが、幾つか方法が用意されているようです。

  • you can use a standard constructor, passing the dag into any operators you use
  • you can use a context manager, which will add the DAG to anything inside it implicitly
  • you can use the @dag decorator to turn a function into a DAG generator

Declaring a DAG

インスタンス化する際に、上で定義した Default Parameter の他、スケジュール設定やリトライ時動作などの DAG に関するパラメーターを設定してあげます。

定義されているパラメーターは下記参照。

class airflow.models.dag.DAG

add documentation to a DAG.

DAG や Task には、ドキュメントを定義できます。

It’s possible to add documentation or notes to your DAGs & task objects that are visible in the web interface (“Graph” & “Tree” for DAGs, “Task Instance Details” for tasks).

DAG & Task Documentation

DAG に設定したドキュメントは、WEB コンソール上で下記に表示されています。

task definitions

DAG 内で実行されるタスクを宣言していきます。

define the task of bash operator.

BashOperator は、Bash コマンドを実行できる Operator です。

BashOperator

BashOperator のクラスが下記。タスクもオブジェクトとして存在することになるんですね。

airflow.operators.bash.BashOperator

define the task of bash operator with jinja template.

BashOperator では、実行する bash コマンド部分で jinja のテンプレートを利用できるそうです。

jinja テンプレート内の予約変数たち。

Templates reference - Variables

jinja テンプレート内で利用できるマクロたち。

Templates reference - Macros

define the task of python operator.

PythonOperator は、Python のコードを実行できる Operator です。

PythonOperator

PythonOperator のクラスが下記。

class airflow.operators.python.PythonOperator

実行する Python コード内で利用するライブラリは、 DAG ファイルの先頭でインポートするのではなく、実行するコード内(具体的にはタスクが呼び出す関数内)でインポートするのがお作法らしいです。

You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow.

One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example.

Top level Python Code

define the task of python operator using decorator.

PythonOperator は、呼び出す関数に airflow.decorators.task を decorate することでも宣言できます。

Airflow task decorator converts any Python function to an Airflow operator. The decorated function can be called once to set the arguments and key arguments for operator execution.

Python task decorator

setting up dependencies.

上記までで宣言してきた各タスクに対して、依存関係を設定します。

今回のサンプルファイルでは、宣言した順にタスクを順次実行しています。

Setting up Dependencies

DAG の実行

作成した DAG ファイルを指定の場所に置いておくと、Airflow のスケジューラーが読み取って登録してくれます。

登録直後、DAG は pause ステータスとなっているので、 unpuase してあげると、登録されたスケジュールで実行されはじます。

$ airflow dags unpause sample_dag

WEB コンソール上で確認。

タスクのログは、ここから確認できます。