Airflow の apache-airflow-providers-google
を利用しながら、Airflow における Connection と Hook の概念について確認します。
環境
Connections
とは
Connections
とは、Airflow から Airflow 外部のシステムに接続するために利用する設定とのことです。
Airflow is often used to pull and push data into other systems, and so it has a first-class Connection concept for storing credentials that are used to talk to external systems.
Connections を設定するには、以下の方法があります。
- 環境変数に設定
- external Secret Backend で設定
- HashiCorp Volt や AWS SSM Parameter のような外部サービスを利用する方法
- Airflow のメタデータベースに設定
- WEB-UI や CLI 経由でデータベースに保管する方法
apache-airflow-providers-google
の Connection を設定
Google Cloud 向け provider である apache-airflow-providers-google
を利用して、Connection の設定を試してみます。
Airflow の 公式 コンテナイメージを利用する場合、apache-airflow-providers-google はデフォルトでインストールされています。
$ airflow providers list package_name | description | version =========================================+==============================================================================================+======== ... apache-airflow-providers-google | Google services including: | 7.0.0 | | | - Google Ads https://ads.google.com/ | | - Google Cloud (GCP) https://cloud.google.com/ | | - Google Firebase https://firebase.google.com/ | | - Google LevelDB https://github.com/google/leveldb/ | | - Google Marketing Platform https://marketingplatform.google.com/ | | - Google Workspace https://workspace.google.pl/ (formerly Google Suite) | ...
WEB-UI 上から、Google Cloud 向けの Connections 設定を行います。事前に Google Cloud 側でサービスアカウントを作成し、作成したサービスアカウントの情報を入力します。
Hook
とは
Hook とは、外部のプラットフォームに接続するための high-level interface とのことです。
A Hook is a high-level interface to an external platform that lets you quickly and easily talk to them without having to write low-level code that hits their API or uses special libraries. They’re also often the building blocks that Operators are built out of.
以下で、Hook がどこで利用されているのか意識しながら、apache-airflow-providers-google
を利用した DAG を作成してみます。
apache-airflow-providers-google
を利用した DAG の作成
apache-airflow-providers-google に関する公式ページ。
apache-airflow-providers-google
DAG ファイル
Hook の利用箇所
DAG ファイル内で利用した operator たち。
class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyDatasetOperator
class airflow.providers.google.cloud.operators.bigquery.BigQueryCreateEmptyTableOperator
class airflow.providers.google.cloud.operators.bigquery.BigQueryInsertJobOperator
Hook はどこあるのだろうとソースを見てみると、 Operator 内の インスタンスメソッド内で Google Cloud(BigQuery) 向けの Hook を作成していました。これのことですね。
def execute(self, context: 'Context') -> None: bq_hook = BigQueryHook( gcp_conn_id=self.gcp_conn_id, delegate_to=self.delegate_to, location=self.location, impersonation_chain=self.impersonation_chain, ) try: dataset = bq_hook.create_empty_dataset( project_id=self.project_id, dataset_id=self.dataset_id, dataset_reference=self.dataset_reference, location=self.location, exists_ok=self.exists_ok, ) BigQueryDatasetLink.persist( context=context, task_instance=self, dataset_id=dataset["datasetReference"]["datasetId"], project_id=dataset["datasetReference"]["projectId"], ) except Conflict: dataset_id = self.dataset_reference.get("datasetReference", {}).get("datasetId", self.dataset_id) self.log.info('Dataset %s already exists.', dataset_id)