公式ドキュメントにある tutorial の参考にしつつ、 Airflow の DAG 定義ファイルの書き方を確認します。
環境
DAG 定義ファイルの必要要素
必要となる要素の確認。
サンプルの DAG 定義ファイル
今回作成したサンプルのファイルです。
sample_dag.py
サンプルの DAG 定義ファイル(詳細)
コメントを付与した部分毎に詳細を記載します。
define default arguments for operators.
default arguments とは、DAG をインスタンス化する際に指定できるパラメーターで、その DAG 内で利用される Operator に共通で適用されるパラメーターとなるようです。
default_args (Optional[Dict]) – A dictionary of default parameters to be used as constructor keyword parameters when initialising operators. Note that operators have the same hook, and precede those defined here, meaning that if your dict contains ‘depends_on_past’: True here and ‘depends_on_past’: False in the operator’s call default_args, the actual value will be False.
すべての Operator は、BaseOperator
クラスを必ず継承するようなので、 BaseOperator クラスで定義されているパラメーターを設定する感じでしょうか。
Abstract base class for all operators. Since operators create objects that become nodes in the dag, BaseOperator contains many recursive methods for dag crawling behavior. To derive this class, you are expected to override the constructor as well as the ‘execute’ method.
class airflow.models.baseoperator.BaseOperator
instantiate a DAG.
DAG オブジェクトをインスタンス化します。上のサンプルでは Context Manager を利用していますが、幾つか方法が用意されているようです。
- you can use a standard constructor, passing the dag into any operators you use
- you can use a context manager, which will add the DAG to anything inside it implicitly
- you can use the @dag decorator to turn a function into a DAG generator
インスタンス化する際に、上で定義した Default Parameter の他、スケジュール設定やリトライ時動作などの DAG に関するパラメーターを設定してあげます。
定義されているパラメーターは下記参照。
add documentation to a DAG.
DAG や Task には、ドキュメントを定義できます。
It’s possible to add documentation or notes to your DAGs & task objects that are visible in the web interface (“Graph” & “Tree” for DAGs, “Task Instance Details” for tasks).
DAG に設定したドキュメントは、WEB コンソール上で下記に表示されています。
task definitions
DAG 内で実行されるタスクを宣言していきます。
define the task of bash operator.
BashOperator
は、Bash コマンドを実行できる Operator です。
BashOperator のクラスが下記。タスクもオブジェクトとして存在することになるんですね。
airflow.operators.bash.BashOperator
define the task of bash operator with jinja template.
BashOperator では、実行する bash コマンド部分で jinja のテンプレートを利用できるそうです。
jinja テンプレート内の予約変数たち。
Templates reference - Variables
jinja テンプレート内で利用できるマクロたち。
define the task of python operator.
PythonOperator
は、Python のコードを実行できる Operator です。
PythonOperator のクラスが下記。
class airflow.operators.python.PythonOperator
実行する Python コード内で利用するライブラリは、 DAG ファイルの先頭でインポートするのではなく、実行するコード内(具体的にはタスクが呼び出す関数内)でインポートするのがお作法らしいです。
You should avoid writing the top level code which is not necessary to create Operators and build DAG relations between them. This is because of the design decision for the scheduler of Airflow and the impact the top-level code parsing speed on both performance and scalability of Airflow.
One of the important factors impacting DAG loading time, that might be overlooked by Python developers is that top-level imports might take surprisingly a lot of time and they can generate a lot of overhead and this can be easily avoided by converting them to local imports inside Python callables for example.
define the task of python operator using decorator.
PythonOperator は、呼び出す関数に airflow.decorators.task
を decorate することでも宣言できます。
Airflow task decorator converts any Python function to an Airflow operator. The decorated function can be called once to set the arguments and key arguments for operator execution.
setting up dependencies.
上記までで宣言してきた各タスクに対して、依存関係を設定します。
今回のサンプルファイルでは、宣言した順にタスクを順次実行しています。
DAG の実行
作成した DAG ファイルを指定の場所に置いておくと、Airflow のスケジューラーが読み取って登録してくれます。
登録直後、DAG は pause ステータスとなっているので、 unpuase してあげると、登録されたスケジュールで実行されはじます。
$ airflow dags unpause sample_dag
WEB コンソール上で確認。
タスクのログは、ここから確認できます。