Airflow の Connections と Hook の概念を理解する

Airflow の apache-airflow-providers-google を利用しながら、Airflow における Connection と Hook の概念について確認します。

環境

Connections とは

Connections とは、Airflow から Airflow 外部のシステムに接続するために利用する設定とのことです。

Airflow is often used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to talk to external systems.

Connections & Hooks

Connections を設定するには、以下の方法があります。

  • 環境変数に設定
  • external Secret Backend で設定
    • HashiCorp Volt や AWS SSM Parameter のような外部サービスを利用する方法
  • Airflow のメタデータベースに設定
    • WEB-UI や CLI 経由でデータベースに保管する方法

Managing Connections

apache-airflow-providers-google の Connection を設定

Google Cloud 向け provider である apache-airflow-providers-google を利用して、Connection の設定を試してみます。

Airflow の 公式 コンテナイメージを利用する場合、apache-airflow-providers-google はデフォルトでインストールされています。

$ airflow providers list
package_name                             | description                                                                                  | version
=========================================+==============================================================================================+========
...
apache-airflow-providers-google          | Google services including:                                                                   | 7.0.0  
                                         |                                                                                              |        
                                         |   - Google Ads https://ads.google.com/                                                       |        
                                         |   - Google Cloud (GCP) https://cloud.google.com/                                             |        
                                         |   - Google Firebase https://firebase.google.com/                                             |        
                                         |   - Google LevelDB https://github.com/google/leveldb/                                        |        
                                         |   - Google Marketing Platform https://marketingplatform.google.com/                          |        
                                         |   - Google Workspace https://workspace.google.pl/ (formerly Google Suite)                    |        
...

airflow providers list

WEB-UI 上から、Google Cloud 向けの Connections 設定を行います。事前に Google Cloud 側でサービスアカウントを作成し、作成したサービスアカウントの情報を入力します。

Hook とは

Hook とは、外部のプラットフォームに接続するための high-level interface とのことです。

A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They’re also often the building blocks that Operators are built out of.

Hooks

以下で、Hook がどこで利用されているのか意識しながら、apache-airflow-providers-google を利用した DAG を作成してみます。

apache-airflow-providers-google を利用した DAG の作成

apache-airflow-providers-google に関する公式ページ。

apache-airflow-providers-google

DAG ファイル

gist.github.com

Hook の利用箇所

DAG ファイル内で利用した operator たち。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator

class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator

Hook はどこあるのだろうとソースを見てみると、 Operator 内の インスタンスメソッド内で Google Cloud(BigQuery) 向けの Hook を作成していました。これのことですね。

def execute(self, context: 'Context') -> None:
        bq_hook = BigQueryHook(
            gcp_conn_id=self.gcp_conn_id,
            delegate_to=self.delegate_to,
            location=self.location,
            impersonation_chain=self.impersonation_chain,
        )

        try:
            dataset = bq_hook.create_empty_dataset(
                project_id=self.project_id,
                dataset_id=self.dataset_id,
                dataset_reference=self.dataset_reference,
                location=self.location,
                exists_ok=self.exists_ok,
            )
            BigQueryDatasetLink.persist(
                context=context,
                task_instance=self,
                dataset_id=dataset["datasetReference"]["datasetId"],
                project_id=dataset["datasetReference"]["projectId"],
            )
        except Conflict:
            dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id)
            self.log.info('Dataset %s already exists.', dataset_id)