Airflow の apache-airflow-providers-google
を利用しながら、Airflow における Connection と Hook の概念について確認します。
環境
Connections
とは
Connections
とは、Airflow から Airflow 外部のシステムに接続するために利用する設定とのことです。
Airflow is often used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to talk to external systems.
Connections を設定するには、以下の方法があります。
- 環境変数に設定
- external Secret Backend で設定
- HashiCorp Volt や AWS SSM Parameter のような外部サービスを利用する方法
- Airflow のメタデータベースに設定
- WEB-UI や CLI 経由でデータベースに保管する方法
apache-airflow-providers-google
の Connection を設定
Google Cloud 向け provider である apache-airflow-providers-google
を利用して、Connection の設定を試してみます。
Airflow の 公式 コンテナイメージを利用する場合、apache-airflow-providers-google はデフォルトでインストールされています。
$ airflow providers list package_name | description | version =========================================+==============================================================================================+======== ... apache-airflow-providers-google | Google services including: | 7.0.0 | | | - Google Ads https://ads.google.com/ | | - Google Cloud (GCP) https://cloud.google.com/ | | - Google Firebase https://firebase.google.com/ | | - Google LevelDB https://github.com/google/leveldb/ | | - Google Marketing Platform https://marketingplatform.google.com/ | | - Google Workspace https://workspace.google.pl/ (formerly Google Suite) | ...
WEB-UI 上から、Google Cloud 向けの Connections 設定を行います。事前に Google Cloud 側でサービスアカウントを作成し、作成したサービスアカウントの情報を入力します。
Hook
とは
Hook とは、外部のプラットフォームに接続するための high-level interface とのことです。
A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They’re also often the building blocks that Operators are built out of.
以下で、Hook がどこで利用されているのか意識しながら、apache-airflow-providers-google
を利用した DAG を作成してみます。
apache-airflow-providers-google
を利用した DAG の作成
apache-airflow-providers-google に関する公式ページ。
apache-airflow-providers-google
DAG ファイル
from datetime import datetime | |
from airflow import models | |
from airflow.providers.google.cloud.operators.bigquery import ( | |
BigQueryCreateEmptyDatasetOperator, | |
BigQueryCreateEmptyTableOperator, | |
BigQueryInsertJobOperator, | |
) | |
CONNECTION_ID = "conn_sample_gcp" | |
PROJECT_ID = "sample-project-xxx" | |
DATASET_NAME = "sample" | |
TABLE_NAME = "sample" | |
with models.DAG( | |
dag_id="bq_sample", | |
description="A Sample Google Provider DAG", | |
schedule_interval="@once", | |
start_date=datetime(2022, 1, 1), | |
catchup=False, | |
tags=["example"], | |
) as dag: | |
create_dataset = BigQueryCreateEmptyDatasetOperator( | |
gcp_conn_id=CONNECTION_ID, | |
task_id="create_dataset", | |
dataset_id=DATASET_NAME, | |
location="asia-northeast1", | |
) | |
create_table = BigQueryCreateEmptyTableOperator( | |
bigquery_conn_id=CONNECTION_ID, | |
task_id="create_table", | |
dataset_id=DATASET_NAME, | |
table_id=TABLE_NAME, | |
schema_fields=[ | |
{"name": "id", "type": "STRING", "mode": "REQUIRED"}, | |
{"name": "name", "type": "STRING", "mode": "NULLABLE"} | |
], | |
) | |
insert_data = BigQueryInsertJobOperator( | |
gcp_conn_id=CONNECTION_ID, | |
task_id="insert_query_job", | |
configuration={ | |
"query": { | |
"query": f"INSERT INTO {DATASET_NAME}.{TABLE_NAME} VALUES ('001', 'mikochi');", # noqa: E501 | |
"useLegacySql": False, | |
} | |
}, | |
) | |
create_dataset >> create_table >> insert_data |
Hook の利用箇所
DAG ファイル内で利用した operator たち。
class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator
class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator
class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
Hook はどこあるのだろうとソースを見てみると、 Operator 内の インスタンスメソッド内で Google Cloud(BigQuery) 向けの Hook を作成していました。これのことですね。
def execute(self, context: 'Context') -> None: bq_hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, location=self.location, impersonation_chain=self.impersonation_chain, ) try: dataset = bq_hook.create_empty_dataset( project_id=self.project_id, dataset_id=self.dataset_id, dataset_reference=self.dataset_reference, location=self.location, exists_ok=self.exists_ok, ) BigQueryDatasetLink.persist( context=context, task_instance=self, dataset_id=dataset["datasetReference"]["datasetId"], project_id=dataset["datasetReference"]["projectId"], ) except Conflict: dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id) self.log.info('Dataset %s already exists.', dataset_id)