Airflow の Connections と Hook の概念を理解する

Airflow の apache-airflow-providers-google を利用しながら、Airflow における Connection と Hook の概念について確認します。

環境

Connections とは

Connections とは、Airflow から Airflow 外部のシステムに接続するために利用する設定とのことです。

Airflow is often used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to talk to external systems.

Connections & Hooks

Connections を設定するには、以下の方法があります。

  • 環境変数に設定
  • external Secret Backend で設定
    • HashiCorp Volt や AWS SSM Parameter のような外部サービスを利用する方法
  • Airflow のメタデータベースに設定
    • WEB-UI や CLI 経由でデータベースに保管する方法

Managing Connections

apache-airflow-providers-google の Connection を設定

Google Cloud 向け provider である apache-airflow-providers-google を利用して、Connection の設定を試してみます。

Airflow の 公式 コンテナイメージを利用する場合、apache-airflow-providers-google はデフォルトでインストールされています。

$ airflow providers list
package_name                             | description                                                                                  | version
=========================================+==============================================================================================+========
...
apache-airflow-providers-google          | Google services including:                                                                   | 7.0.0  
                                         |                                                                                              |        
                                         |   - Google Ads https://ads.google.com/                                                       |        
                                         |   - Google Cloud (GCP) https://cloud.google.com/                                             |        
                                         |   - Google Firebase https://firebase.google.com/                                             |        
                                         |   - Google LevelDB https://github.com/google/leveldb/                                        |        
                                         |   - Google Marketing Platform https://marketingplatform.google.com/                          |        
                                         |   - Google Workspace https://workspace.google.pl/ (formerly Google Suite)                    |        
...

airflow providers list

WEB-UI 上から、Google Cloud 向けの Connections 設定を行います。事前に Google Cloud 側でサービスアカウントを作成し、作成したサービスアカウントの情報を入力します。

Hook とは

Hook とは、外部のプラットフォームに接続するための high-level interface とのことです。

A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They’re also often the building blocks that Operators are built out of.

Hooks

以下で、Hook がどこで利用されているのか意識しながら、apache-airflow-providers-google を利用した DAG を作成してみます。

apache-airflow-providers-google を利用した DAG の作成

apache-airflow-providers-google に関する公式ページ。

apache-airflow-providers-google

DAG ファイル

from datetime import datetime
from airflow import models
from airflow.providers.google.cloud.operators.bigquery import (
BigQueryCreateEmptyDatasetOperator,
BigQueryCreateEmptyTableOperator,
BigQueryInsertJobOperator,
)
CONNECTION_ID = "conn_sample_gcp"
PROJECT_ID = "sample-project-xxx"
DATASET_NAME = "sample"
TABLE_NAME = "sample"
with models.DAG(
dag_id="bq_sample",
description="A Sample Google Provider DAG",
schedule_interval="@once",
start_date=datetime(2022, 1, 1),
catchup=False,
tags=["example"],
) as dag:
create_dataset = BigQueryCreateEmptyDatasetOperator(
gcp_conn_id=CONNECTION_ID,
task_id="create_dataset",
dataset_id=DATASET_NAME,
location="asia-northeast1",
)
create_table = BigQueryCreateEmptyTableOperator(
bigquery_conn_id=CONNECTION_ID,
task_id="create_table",
dataset_id=DATASET_NAME,
table_id=TABLE_NAME,
schema_fields=[
{"name": "id", "type": "STRING", "mode": "REQUIRED"},
{"name": "name", "type": "STRING", "mode": "NULLABLE"}
],
)
insert_data = BigQueryInsertJobOperator(
gcp_conn_id=CONNECTION_ID,
task_id="insert_query_job",
configuration={
"query": {
"query": f"INSERT INTO {DATASET_NAME}.{TABLE_NAME} VALUES ('001', 'mikochi');", # noqa: E501
"useLegacySql": False,
}
},
)
create_dataset >> create_table >> insert_data
gist.github.com

Hook の利用箇所

DAG ファイル内で利用した operator たち。

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator

class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator

class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator

Hook はどこあるのだろうとソースを見てみると、 Operator 内の インスタンスメソッド内で Google Cloud(BigQuery) 向けの Hook を作成していました。これのことですね。

def execute(self, context: 'Context') -> None:
        bq_hook = BigQueryHook(
            gcp_conn_id=self.gcp_conn_id,
            delegate_to=self.delegate_to,
            location=self.location,
            impersonation_chain=self.impersonation_chain,
        )

        try:
            dataset = bq_hook.create_empty_dataset(
                project_id=self.project_id,
                dataset_id=self.dataset_id,
                dataset_reference=self.dataset_reference,
                location=self.location,
                exists_ok=self.exists_ok,
            )
            BigQueryDatasetLink.persist(
                context=context,
                task_instance=self,
                dataset_id=dataset["datasetReference"]["datasetId"],
                project_id=dataset["datasetReference"]["projectId"],
            )
        except Conflict:
            dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id)
            self.log.info('Dataset %s already exists.', dataset_id)