Dataflow に入門する

Apache Beam のプログラムを、Google Cloud の Dataflow 上で動かしてみます。GCS 上の SJIS のファイルを、BigQuery にロードする処理を試してみます。

下記の資料を参考にしています。

PipelineOptions を設定する

Google BigQuery I/O connector

環境

事前準備

インプット用の CSV ファイル

以下の CSV ファイルを、適当な GCS 上のパスにアップロードしておきます。

gs://project-id-xxx-test/input/sample_sjis.csv

"氏名","世代","デビュー日"
"さくらみこ","0","2018-08-01"
"兎田ぺこら","3","2019-07-17"

今回は文字コード SJIS のファイルを対象としてみます。

$ nkf --guess ./sample_sjis.csv
Shift_JIS (CRLF)

ロード先の BigQuery テーブル情報

データをロードする BigQuery テーブルを作成しておきます。Apache Beam 側の処理でテーブルを作成することもできますが、今回は単純な INSERT の処理を試してみます。

スキーマ定義。

Field name Type Mode
name STRING REQUIRED
gen INTEGER REQUIRED
debut_date DATE REQUIRED

IAM 設定

実行にあたり必要となる IAM 権限を設定します。

Dataflow にジョブを投入するユーザーアカウントに、必要となるロールを付与します。

$ gcloud projects add-iam-policy-binding project-id-xxx --member="user:zunko@example.com" --role="roles/iam.serviceAccountUser"
$ gcloud projects add-iam-policy-binding project-id-xxx --member="user:zunko@example.com" --role="roles/dataflow.developer"

Dataflow のジョブが投入された後、Apache Beam での処理で利用されるサービスアカウントを作成します。このアカウントが Cloud Storage や BigQuery にアクセスすることになります。ジョブ投入時に指定しない場合、Compute Engine のデフォルトサービスアカウント PROJECT_NUMBER-compute@developer.gserviceaccount.com が利用されます。

$ # サービスアカウントの新規作成
$ gcloud iam service-accounts create kiritan --display-name="Dataflow test service account"
$ gcloud iam service-accounts list | grep kiritan
Dataflow test service account           kiritan@project-id-xxx.iam.gserviceaccount.com                      False
$
$ # ロールを付与
$ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/dataflow.admin
$ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/dataflow.worker
$ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/storage.objectAdmin
$ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/bigquery.admin

Google Cloud 上のパイプラインのセキュリティと権限

コード

./beam/sample_gcp.py

import csv
import datetime
import logging

import apache_beam
from apache_beam.coders.coders import Coder
from apache_beam.io import BigQueryDisposition, ReadFromText, WriteToBigQuery
from apache_beam.io.gcp.internal.clients import bigquery
from apache_beam.options.pipeline_options import (GoogleCloudOptions,
                                                  PipelineOptions,
                                                  SetupOptions,
                                                  StandardOptions)


class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--input", required=True)
        parser.add_argument("--dataset", required=True)
        parser.add_argument("--table", required=True)


class StrCp932Coder(Coder):
    """SJIS向けの Coder を作成
    https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.coders.coders.html#apache_beam.coders.coders.Coder
    """
    def encode(self, value):
        return value.encode("cp932")

    def decode(self, value):
        return value.decode("cp932")

    def is_deterministic(self):
        return True


class TableSchema:
    """データロード先 BigQuery テーブルのスキーマ定義"""
    table_schema = {
        "fields": [
            {"name": "name", "type": "STRING", "mode": "REQUIRED"},
            {"name": "gen", "type": "INTEGER", "mode": "REQUIRED"},
            {"name": "debut_date", "type": "DATE", "mode": "REQUIRED"}
        ]
    }


class CsvToDict:
    @classmethod
    def csv_to_dict(cls, element):
        """与えられた element を BigQuery のカラム名のペアとなる辞書にして返す"""
        keys = []
        for field in TableSchema.table_schema["fields"]:
            for key, value in field.items():
                if key == "name":
                    keys.append(value)
        return {key: value for key, value in zip(keys, element)}


def get_now() -> str:
    """現在日時を YYYYMMDDHHMMSS で返す"""
    now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9)))
    return now.strftime("%Y%m%d%H%M%S")


def main():
    options = CustomOptions()
    custom_args = options.view_as(CustomOptions)
    options.view_as(SetupOptions).save_main_session = True

    # 標準オプション
    # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions
    standard_args = options.view_as(StandardOptions)
    print(f"runner: {standard_args.runner}")
    # runner: DataflowRunner
    print(f"streaming: {standard_args.streaming}")
    # streaming: False

    # Dataflow 実行用オプション
    # Dataflowで実行する時用のオプションクラスが既に用意されている
    # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions
    gcp_args = options.view_as(GoogleCloudOptions)
    gcp_args.job_name = gcp_args.job_name + "-" + get_now()
    print(f"job_name: {gcp_args.job_name}")
    # job_name: sample-20221016233208
    print(f"project: {gcp_args.project}")
    # project: project-id-xxx
    print(f"region: {gcp_args.region}")
    # region: asia-northeast1
    print(f"service_account_email: {gcp_args.service_account_email}")
    # service_account_email: kiritan@project-id-xxx.iam.gserviceaccount.com
    print(f"temp_location: {gcp_args.temp_location}")
    # temp_location: gs://project-id-xxx-test/tmp/

    # パイプラインの作成
    with apache_beam.Pipeline(runner=standard_args.runner, options=options) as pipeline:  # noqa: E501

        # 読込処理
        input = (
            pipeline
            | "Read" >> ReadFromText(
                custom_args.input,
                # SJIS用の Coder を指定
                coder=StrCp932Coder(),
                skip_header_lines=1
            )
        )

        # 変換処理
        transforming = (
            input
            # elementを分けずに、カンマ区切りで 1 つのリストにしてあげる
            | "read_csv" >> apache_beam.Map(lambda line: next(csv.reader([line], delimiter=',', quotechar='"')))  # noqa: E501
            # 辞書に変換
            # BigQuery にデータロードするには、{BigQueryカラム名: 値} という形の辞書にする必要がある
            | "csv_to_dict" >> apache_beam.Map(CsvToDict.csv_to_dict)
        )

        # ロード先の BigQuery テーブル名を設定
        # https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-a-tablereference
        table_spec = bigquery.TableReference(
            projectId=gcp_args.project,
            datasetId=custom_args.dataset,
            tableId=custom_args.table
        )

        # データロード処理
        # https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery
        (
            transforming
            | "load_to_bq" >> WriteToBigQuery(
                table_spec,
                schema=TableSchema.table_schema,
                # テーブルがない場合の動作を指定(今回は作らない)
                # https://beam.apache.org/documentation/io/built-in/google-bigquery/#create-disposition
                create_disposition=BigQueryDisposition.CREATE_NEVER,
                # データロード時の動作を指定(今回は追記)
                # https://beam.apache.org/documentation/io/built-in/google-bigquery/#write-disposition
                write_disposition=BigQueryDisposition.WRITE_APPEND
            )
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    main()

実行

BigQueryのテーブルに、レコードが作成されているはずです。

$ python -m beam.sample_gcp \
 --runner DataflowRunner \
 --project project-id-xxx \
 --region asia-northeast1 \
 --job_name sample \
 --service_account_email kiritan@project-id-xxx.iam.gserviceaccount.com \
 --input gs://project-id-xxx-test/input/sample_sjis.csv \
 --temp_location gs://project-id-xxx-test/tmp/ \
 --dataset sample \
 --table sample

Dataflow で動かすには runner として DataflowRunner を指定すれば良いわけですが、その他に指定可能なパラメーターとして、以下があります。

他の Cloud Dataflow パイプライン オプションの設定