Apache Beam のプログラムを、Google Cloud の Dataflow 上で動かしてみます。GCS 上の SJIS のファイルを、BigQuery にロードする処理を試してみます。
下記の資料を参考にしています。
環境
事前準備
インプット用の CSV ファイル
以下の CSV ファイルを、適当な GCS 上のパスにアップロードしておきます。
gs://project-id-xxx-test/input/sample_sjis.csv
"氏名","世代","デビュー日" "さくらみこ","0","2018-08-01" "兎田ぺこら","3","2019-07-17"
$ nkf --guess ./sample_sjis.csv Shift_JIS (CRLF)
ロード先の BigQuery テーブル情報
データをロードする BigQuery テーブルを作成しておきます。Apache Beam 側の処理でテーブルを作成することもできますが、今回は単純な INSERT の処理を試してみます。
- データセット名
- sample
- テーブル名
- sample
スキーマ定義。
Field name | Type | Mode |
---|---|---|
name | STRING | REQUIRED |
gen | INTEGER | REQUIRED |
debut_date | DATE | REQUIRED |
IAM 設定
実行にあたり必要となる IAM 権限を設定します。
Dataflow にジョブを投入するユーザーアカウントに、必要となるロールを付与します。
$ gcloud projects add-iam-policy-binding project-id-xxx --member="user:zunko@example.com" --role="roles/iam.serviceAccountUser" $ gcloud projects add-iam-policy-binding project-id-xxx --member="user:zunko@example.com" --role="roles/dataflow.developer"
Dataflow のジョブが投入された後、Apache Beam での処理で利用されるサービスアカウントを作成します。このアカウントが Cloud Storage や BigQuery にアクセスすることになります。ジョブ投入時に指定しない場合、Compute Engine のデフォルトサービスアカウント PROJECT_NUMBER-compute@developer.gserviceaccount.com
が利用されます。
$ # サービスアカウントの新規作成 $ gcloud iam service-accounts create kiritan --display-name="Dataflow test service account" $ gcloud iam service-accounts list | grep kiritan Dataflow test service account kiritan@project-id-xxx.iam.gserviceaccount.com False $ $ # ロールを付与 $ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/dataflow.admin $ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/dataflow.worker $ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/storage.objectAdmin $ gcloud projects add-iam-policy-binding project-id-xxx --member="serviceAccount:kiritan@project-id-xxx.iam.gserviceaccount.com" --role=roles/bigquery.admin
Google Cloud 上のパイプラインのセキュリティと権限
コード
./beam/sample_gcp.py
import csv import datetime import logging import apache_beam from apache_beam.coders.coders import Coder from apache_beam.io import BigQueryDisposition, ReadFromText, WriteToBigQuery from apache_beam.io.gcp.internal.clients import bigquery from apache_beam.options.pipeline_options import (GoogleCloudOptions, PipelineOptions, SetupOptions, StandardOptions) class CustomOptions(PipelineOptions): @classmethod def _add_argparse_args(cls, parser): parser.add_argument("--input", required=True) parser.add_argument("--dataset", required=True) parser.add_argument("--table", required=True) class StrCp932Coder(Coder): """SJIS向けの Coder を作成 https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.coders.coders.html#apache_beam.coders.coders.Coder """ def encode(self, value): return value.encode("cp932") def decode(self, value): return value.decode("cp932") def is_deterministic(self): return True class TableSchema: """データロード先 BigQuery テーブルのスキーマ定義""" table_schema = { "fields": [ {"name": "name", "type": "STRING", "mode": "REQUIRED"}, {"name": "gen", "type": "INTEGER", "mode": "REQUIRED"}, {"name": "debut_date", "type": "DATE", "mode": "REQUIRED"} ] } class CsvToDict: @classmethod def csv_to_dict(cls, element): """与えられた element を BigQuery のカラム名のペアとなる辞書にして返す""" keys = [] for field in TableSchema.table_schema["fields"]: for key, value in field.items(): if key == "name": keys.append(value) return {key: value for key, value in zip(keys, element)} def get_now() -> str: """現在日時を YYYYMMDDHHMMSS で返す""" now = datetime.datetime.now(datetime.timezone(datetime.timedelta(hours=9))) return now.strftime("%Y%m%d%H%M%S") def main(): options = CustomOptions() custom_args = options.view_as(CustomOptions) options.view_as(SetupOptions).save_main_session = True # 標準オプション # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.StandardOptions standard_args = options.view_as(StandardOptions) print(f"runner: {standard_args.runner}") # runner: DataflowRunner print(f"streaming: {standard_args.streaming}") # streaming: False # Dataflow 実行用オプション # Dataflowで実行する時用のオプションクラスが既に用意されている # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.GoogleCloudOptions gcp_args = options.view_as(GoogleCloudOptions) gcp_args.job_name = gcp_args.job_name + "-" + get_now() print(f"job_name: {gcp_args.job_name}") # job_name: sample-20221016233208 print(f"project: {gcp_args.project}") # project: project-id-xxx print(f"region: {gcp_args.region}") # region: asia-northeast1 print(f"service_account_email: {gcp_args.service_account_email}") # service_account_email: kiritan@project-id-xxx.iam.gserviceaccount.com print(f"temp_location: {gcp_args.temp_location}") # temp_location: gs://project-id-xxx-test/tmp/ # パイプラインの作成 with apache_beam.Pipeline(runner=standard_args.runner, options=options) as pipeline: # noqa: E501 # 読込処理 input = ( pipeline | "Read" >> ReadFromText( custom_args.input, # SJIS用の Coder を指定 coder=StrCp932Coder(), skip_header_lines=1 ) ) # 変換処理 transforming = ( input # elementを分けずに、カンマ区切りで 1 つのリストにしてあげる | "read_csv" >> apache_beam.Map(lambda line: next(csv.reader([line], delimiter=',', quotechar='"'))) # noqa: E501 # 辞書に変換 # BigQuery にデータロードするには、{BigQueryカラム名: 値} という形の辞書にする必要がある | "csv_to_dict" >> apache_beam.Map(CsvToDict.csv_to_dict) ) # ロード先の BigQuery テーブル名を設定 # https://beam.apache.org/documentation/io/built-in/google-bigquery/#using-a-tablereference table_spec = bigquery.TableReference( projectId=gcp_args.project, datasetId=custom_args.dataset, tableId=custom_args.table ) # データロード処理 # https://beam.apache.org/documentation/io/built-in/google-bigquery/#writing-to-bigquery ( transforming | "load_to_bq" >> WriteToBigQuery( table_spec, schema=TableSchema.table_schema, # テーブルがない場合の動作を指定(今回は作らない) # https://beam.apache.org/documentation/io/built-in/google-bigquery/#create-disposition create_disposition=BigQueryDisposition.CREATE_NEVER, # データロード時の動作を指定(今回は追記) # https://beam.apache.org/documentation/io/built-in/google-bigquery/#write-disposition write_disposition=BigQueryDisposition.WRITE_APPEND ) ) if __name__ == "__main__": logging.getLogger().setLevel(logging.INFO) main()
実行
BigQueryのテーブルに、レコードが作成されているはずです。
$ python -m beam.sample_gcp \ --runner DataflowRunner \ --project project-id-xxx \ --region asia-northeast1 \ --job_name sample \ --service_account_email kiritan@project-id-xxx.iam.gserviceaccount.com \ --input gs://project-id-xxx-test/input/sample_sjis.csv \ --temp_location gs://project-id-xxx-test/tmp/ \ --dataset sample \ --table sample
Dataflow で動かすには runner として DataflowRunner を指定すれば良いわけですが、その他に指定可能なパラメーターとして、以下があります。