Apache Beam に入門する

Apache Beam に入門してみます。ローカル上で Apache Beam を使った簡単なプログラムが動くまでを試してみます。

Apache Beam

GitHub - Apache Beam

Apache Beam SDK for Python

下記あたりを参考にしています。

beam/sdks/python/apache_beam/examples/wordcount.py

Apache Beam Programming Guide

環境

PythonSDK を利用しましたが、Java で書いたほういいのだろうなあ、と学習しながら感じる事はありました。。。

SDK のインストール

SDK は pip でインストールできます。下記は poetry 経由。

$ poetry add apache-beam
$ python
>>> import apache_beam
>>> print(apache_beam.__version__)
2.41.0

テストデータの作成

./testdata/input というテスト用のファイルを用意しておきます。

$ mkdir testdata
$ echo "AAA BBB BBB CCC CCC CCC" > ./testdata/input

コード

./beam/sample.py

import logging
import os

import apache_beam
from apache_beam.io import ReadFromText, WriteToText
from apache_beam.options.pipeline_options import PipelineOptions

RUNNER = "DirectRunner"


class MyOptions(PipelineOptions):
    """コマンドラインオプションを定義したクラス
    https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions
    """
    @classmethod
    def _add_argparse_args(cls, parser):
        """引数 parser は、Python標準モジュールの argparse.ArgumentParser の wrapper として動作する
        """
        parser.add_argument(
            "--input",
            default="./testdata/input.txt",
            required=True
        )
        parser.add_argument(
            "--output",
            default="./testdata/output.txt",
            required=True
        )


class WordLengthFn(apache_beam.DoFn):
    """ユーザー定義の DoFn。文字数をカウントしているだけの処理。
    apache_beam.DoFn を継承して、カスタムな変換処理を定義することができる
    https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.core.html#apache_beam.transforms.core.DoFn
    """
    def process(self, element):
        """具体的処理を記述するメソッド。processというメソッド名にする"""
        for word in element:
            yield (len(word), word)  # DoFn の戻り値はイテラーブルなものでないといけない

    # 下記のようなメソッドも定義できる
    # def setup(self):
    #     """Called to prepare an instance for processing bundles of elements."""  # noqa: E501

    # def teardown(self):
    #     """Called to use to clean up this instance before it is discarded."""

    # def start_bundle(self):
    #     """Called before a bundle of elements is processed on a worker."""

    # def finish_bundle(self):
    #     """Called after a bundle of elements is processed on a worker."""


def main():
    # オプションの初期化
    options = MyOptions()
    # MyOptions クラスで定義されているオプションを取得
    # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.options.pipeline_options.html#apache_beam.options.pipeline_options.PipelineOptions.view_as
    args = options.view_as(MyOptions)

    # パイプライン・オブジェクトの作成。コンテキストマネージャーの形だと実行もされる
    # パイプライン宣言時にオプションを引数で渡してあげる
    # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.pipeline.html#apache_beam.pipeline.Pipeline
    with apache_beam.Pipeline(runner=RUNNER, options=options) as pipeline:

        # 読込処理
        input = (
            pipeline
            # ReadFromTextはテキストファイルを読んでくれるPTransform
            # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.io.textio.html#apache_beam.io.textio.ReadFromText
            | "Read" >> ReadFromText(args.input)
        )
        print(type(input))
        # <class 'apache_beam.pvalue.PCollection'>

        # 変換処理
        transforming = (
            input
            # ReadFromText は 1 行を 1 element として読み取るので、それを split している
            # https://beam.apache.org/documentation/transforms/python/elementwise/map/
            | "Split" >> apache_beam.Map(lambda row: row.split())
            # ParDo 関数を利用して、作成した DoFn を呼び出している
            # https://beam.apache.org/documentation/transforms/python/elementwise/pardo/
            | "WordLength" >> apache_beam.ParDo(WordLengthFn())
            # GroupByKey は (111, 222) という形のタプルに対して、111 の値をKeyとしてグルーピングする
            # https://beam.apache.org/documentation/transforms/python/aggregation/groupbykey/
            | "GroupByKey" >> apache_beam.GroupByKey()
        )
        transforming | apache_beam.Map(print)
        # (1, ['A', 'Z'])
        # (2, ['BB', 'YY'])
        # (3, ['CCC', 'XXX'])
        # (4, ['DDDD'])

        # 出力処理
        out_file_name_prefix, out_file_name_suffix = os.path.splitext(args.output)  # noqa: E501
        (
            transforming
            # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.io.textio.html#apache_beam.io.textio.WriteToText
            | "Write" >> WriteToText(
                file_path_prefix=out_file_name_prefix,
                file_name_suffix=out_file_name_suffix
            )
        )


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    main()

実行

$ python -m beam.sample --input ./testdata/input.txt --output ./testdata/output.txt
$ cat ./testdata/output-00000-of-00001.txt
(1, ['A', 'Z'])
(2, ['BB', 'YY'])
(3, ['CCC', 'XXX'])
(4, ['DDDD'])

概念たち

Apache Beam の基本的な概念について、簡単に触れておきます。下記は公式サイト。

Basics of the Beam model

下記の Dataflow のページが、短くまとめられていてわかりやすかったです。

Apache Beam のコンセプト

Pipeline

コード内では with apache_beam.Pipeline(runner=RUNNER, options=options) as pipeline: で作っているところ。

パイプラインは、入力データの読み取り、そのデータの変換、出力データの書き込みに関連する、一連の計算全体をカプセル化したものです。入力ソースと出力シンクは同一でも別の型でもかまいません。ユーザーはデータの形式を変換できます。Apache Beam プログラムでは、まず、Pipeline オブジェクトを構築します。そのオブジェクトを基礎として使用し、パイプラインのデータセットを作成します。各パイプラインは、反復可能な 1 つのジョブを表します。

パイプライン - Apache Beam のコンセプト

PCollection

PCollection とは、分散処理が可能な抽象的なデータセットにあたるものです。Spark における DataFrame(RDD, DataSet) みたいなものか、と考えています。

A PCollection is an unordered bag of elements. Each PCollection is a potentially distributed, homogeneous data set or data stream, and is owned by the specific Pipeline object for which it is created. Multiple pipelines cannot share a PCollection. Beam pipelines process PCollections, and the runner is responsible for storing these elements.

PCollections - Basics of the Beam model

で、PCollection は element の集合となります。element が分かりずらかったのですが、element とは変換処理される際のデータの単位を指しているのかな、みたいな理解をしています。

PTransform/Aggregation

PTransform とは、パイプライン内で実行される処理ステップのこと。PCollection にたいする変換処理/集計処理のほか、データの読込処理やデータの書込処理のことも PTransform と呼んでいます。

宣言したパイプライン内で、データの読込処理の PTransform を実行することで PCollection が作成され、データ変換や集計といった様々な PTransform の処理の間を PCollection のデータがバケツリレーよろしく実行されていき、最後にデータの書込処理の PTransform が実行されるイメージです。上のコードを見た時、apache_beam.Pipeline() でパイプラインを宣言し、そのオブジェクト内で諸々処理をパイプで繋いで記述している点と、上述イメージは繋がると思います。


データの変換や集計処理で利用できる関数として、以下のページにあるものが用意されています。

Python transform catalog overview

ビルトインされた関数以外に、自分で変換処理の関数をつくることもできます。よく利用されるのだろう、というものに DoFn というユーザー定義関数があり、自分で作った関数(実際にはクラスですが)を ParDo 関数にわたすことで自作の処理を行えます。このあたりはコードを見たほうが分かりやすいです。

その他、ユーザーで作成でき、パイプラインの処理内で利用できるものとして下記があります。

User-defined function (UDF) - Basics of the Beam model

Runner

Beam のプログラムが動く環境のこと。ローカル上で動かす場合 DirectRunner で、Dataflow 上の場合、DataflowRunner を利用します。

Beam Capability Matrix