Dataflow(Streaming) に入門する

バッチ処理は前回試したので、Streaming の処理を作成してみます。Cloud Pub/Sub からデータを Subscribe して、Cloud Storage 上に Object を作成してみます。

参考です。

Dataflow を使用して Pub/Sub からメッセージをストリーミングする

上記チュートリアルで利用される beam のコード

8. Windowing - Apache Beam Programming Guide

環境

事前作業

Cloud Storage のバケットを作成。

$ gsutil mb -c standard -l asia-northeast1 -b on gs://project-id-xxx-test

mb - Make buckets

Cloud Pub/Sub の Topic を作成。

$ gcloud pubsub topics create sample-streaming \
 --message-retention-duration=30m

gcloud pubsub topics create

コード

beam.sample_streaming.py

import datetime
import logging
import random  # noqa: F401

import apache_beam
from apache_beam.io import ReadFromPubSub
from apache_beam.io.gcp.gcsio import GcsIO
from apache_beam.options.pipeline_options import (PipelineOptions,
                                                  SetupOptions,
                                                  StandardOptions)
from apache_beam.transforms.window import SlidingWindows


class CustomOptions(PipelineOptions):
    @classmethod
    def _add_argparse_args(cls, parser):
        parser.add_argument("--input_topic", required=True)
        parser.add_argument("--output", required=True)


class Format(apache_beam.DoFn):
    """Subscribe したメッセージをフォーマットする"""
    def process(self, element):
        """"""
        yield {
            "body": element.data.decode("utf8"),
            "message_id": element.message_id,
            "attributes": element.attributes
        }


class WriteGCS(apache_beam.DoFn):
    """GCSに書き出す処理"""
    def __init__(self, output):
        self.output = output

    # DoFn より window 関連のパラメーターを利用する
    # https://beam.apache.org/documentation/programming-guide/#other-dofn-parameters
    def process(self, element, window=apache_beam.DoFn.WindowParam):
        # window.start, window.end のパラメーターは、Beam 内で定義されたタイムスタンプ型となる
        # https://beam.apache.org/releases/pydoc/current/apache_beam.transforms.window.html#apache_beam.transforms.window.BoundedWindow
        # 定義されているタイムスタンプ型
        # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.utils.timestamp.html#apache_beam.utils.timestamp.Timestamp
        JST = datetime.timedelta(hours=9)
        window_start = window.start.to_utc_datetime() + JST
        window_end = window.end.to_utc_datetime() + JST
        FORMAT = "%Y%m%d%H%M%S"
        filename = "-".join([
            self.output,
            window_start.strftime(FORMAT),
            window_end.strftime(FORMAT)
        ])

        # key 情報は不要のため捨ててしまう
        _, msgs = element
        # Google Cloud Storage client を用意して、オブジェクトを作成する
        # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.io.gcp.gcsio.html#apache_beam.io.gcp.gcsio.GcsIO
        google_storage_client = GcsIO()
        with google_storage_client.open(filename, "w") as f:
            for msg in msgs:
                f.write(f"{msg}\n".encode("utf8"))


def main():
    options = CustomOptions()
    custom_args = options.view_as(CustomOptions)
    options.view_as(SetupOptions).save_main_session = True

    standard_args = options.view_as(StandardOptions)
    standard_args.streaming = True

    # パイプラインのオブジェクトを生成
    pipeline = apache_beam.Pipeline(runner=standard_args.runner, options=options)  # noqa: E501

    # 読込処理
    input = (
        pipeline
        # subscriber となる
        # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.io.gcp.pubsub.html#apache_beam.io.gcp.pubsub.ReadFromPubSub
        | "Read from Pub/Sub" >> ReadFromPubSub(
            topic=custom_args.input_topic,
            id_label="message_id",  # DirectRunner で実行時は invalid になる
            with_attributes=True
        )
    )

    # windowing 関数
    windowing = (
        input
        # WindowIntoは、windowing 関数を element に適用するもの
        # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.transforms.core.html#apache_beam.transforms.core.WindowInto
        # SlidingWindowsは、sliding window 処理を行う
        # https://beam.apache.org/releases/pydoc/2.41.0/apache_beam.transforms.window.html#apache_beam.transforms.window.SlidingWindows
        | "Window" >> apache_beam.WindowInto(SlidingWindows(size=60, period=30))  # noqa: E501
    )

    # 集約処理
    grouping = (
        windowing
        | "Format" >> apache_beam.ParDo(Format())
        # 各 element に一意なkeyを付与
        | "Add key" >> apache_beam.WithKeys(lambda _: "key")
        # 仮にランダムな値を key とすると、window 内でさらに key 毎にグルーピングされることになる
        # | "Add key" >> apache_beam.WithKeys(lambda _: random.randint(0, 1))
        # window と key 単位でグルーピングされる。Key は一意なので、単純に Window 単位でグルーピングされる
        | "Group by key" >> apache_beam.GroupByKey()
    )

    # 出力処理
    (
        grouping
        | "Write" >> apache_beam.ParDo(WriteGCS(custom_args.output))
    )

    # パイプラインの実行
    pipeline.run()
    # pipeline.run().wait_until_finish()  # DirectRunner でのテスト用


if __name__ == "__main__":
    logging.getLogger().setLevel(logging.INFO)
    main()

Dataflow のジョブ投入

$ python -m beam.sample_streaming \
 --runner DataflowRunner \
 --project project-id-xxx \
 --region asia-northeast1 \
 --job_name sample-streaming \
 --service_account_email kiritan@project-id-xxx.iam.gserviceaccount.com \
 --max_num_workers 2 \
 --input_topic projects/project-id-xxx/topics/sample-streaming \
 --output gs://project-id-xxx-test/streaming/output/output \
 --temp_location gs://project-id-xxx-test/streaming/temp

Cloud Pub/Sub に Publish

Cloud Pub/Sub に Publish するコードです。10 秒間隔で Publish します。

publisher.py

gist.github.com

実行すると、Dataflow 側がデータを拾い始めます。

$ python -m publisher --project project-id-xxx --topic sample-streaming

Cloud Storage 上のファイルの確認

gs://project-id-xxx-test/streaming/output/output-20221020074500-20221020074600

{'body': '{"uid": "2851a98c-cd50-4bc8-bbf3-9a17060e477e", "datetime": "2022-10-20 07:45:08", "no": "0", "value": "49"}', 'message_id': '6016843242117485', 'attributes': {'org_uid': '2851a98c-cd50-4bc8-bbf3-9a17060e477e'}}
{'body': '{"uid": "6ccf2300-26b6-45d1-b1d4-47aa2c25d53e", "datetime": "2022-10-20 07:45:38", "no": "0", "value": "57"}', 'message_id': '6016847762304497', 'attributes': {'org_uid': '6ccf2300-26b6-45d1-b1d4-47aa2c25d53e'}}
{'body': '{"uid": "562a9efb-72ef-4d8f-af5a-3c44d1a7ee1d", "datetime": "2022-10-20 07:45:18", "no": "1", "value": "30"}', 'message_id': '6016846110586740', 'attributes': {'org_uid': '562a9efb-72ef-4d8f-af5a-3c44d1a7ee1d'}}
{'body': '{"uid": "8320043b-d2e6-4a42-b098-7ee105c3baf3", "datetime": "2022-10-20 07:45:28", "no": "2", "value": "74"}', 'message_id': '6016842701412836', 'attributes': {'org_uid': '8320043b-d2e6-4a42-b098-7ee105c3baf3'}}
{'body': '{"uid": "b9ac4b6d-94db-47e9-afcb-79dd4dd3367f", "datetime": "2022-10-20 07:45:48", "no": "1", "value": "37"}', 'message_id': '6016842238058663', 'attributes': {'org_uid': 'b9ac4b6d-94db-47e9-afcb-79dd4dd3367f'}}
{'body': '{"uid": "db7e1658-3a2b-4dab-a19e-a5f5f9044e2c", "datetime": "2022-10-20 07:45:58", "no": "2", "value": "91"}', 'message_id': '6016843002430175', 'attributes': {'org_uid': 'db7e1658-3a2b-4dab-a19e-a5f5f9044e2c'}}

gs://project-id-xxx-test/streaming/output/output-20221020074530-20221020074630

{'body': '{"uid": "6ccf2300-26b6-45d1-b1d4-47aa2c25d53e", "datetime": "2022-10-20 07:45:38", "no": "0", "value": "57"}', 'message_id': '6016847762304497', 'attributes': {'org_uid': '6ccf2300-26b6-45d1-b1d4-47aa2c25d53e'}}
{'body': '{"uid": "b9ac4b6d-94db-47e9-afcb-79dd4dd3367f", "datetime": "2022-10-20 07:45:48", "no": "1", "value": "37"}', 'message_id': '6016842238058663', 'attributes': {'org_uid': 'b9ac4b6d-94db-47e9-afcb-79dd4dd3367f'}}
{'body': '{"uid": "db7e1658-3a2b-4dab-a19e-a5f5f9044e2c", "datetime": "2022-10-20 07:45:58", "no": "2", "value": "91"}', 'message_id': '6016843002430175', 'attributes': {'org_uid': 'db7e1658-3a2b-4dab-a19e-a5f5f9044e2c'}}
{'body': '{"uid": "7a1dffc0-bbfe-44dc-960c-d7caea22a491", "datetime": "2022-10-20 07:46:08", "no": "0", "value": "20"}', 'message_id': '6016846545057854', 'attributes': {'org_uid': '7a1dffc0-bbfe-44dc-960c-d7caea22a491'}}
{'body': '{"uid": "989719f7-f54f-45ef-9122-c24ffb908d37", "datetime": "2022-10-20 07:46:18", "no": "1", "value": "67"}', 'message_id': '6016841863443605', 'attributes': {'org_uid': '989719f7-f54f-45ef-9122-c24ffb908d37'}}
{'body': '{"uid": "0d1e8322-3e1e-48e3-975e-2ab1b8acc1a0", "datetime": "2022-10-20 07:46:28", "no": "2", "value": "46"}', 'message_id': '6016846308842395', 'attributes': {'org_uid': '0d1e8322-3e1e-48e3-975e-2ab1b8acc1a0'}}

message_id 6016847762304497 6016842238058663 6016843002430175 は両ファイルに存在し、sliding window の window 戦略により多重に処理されていることを確認できます。

概念たち

Streaming のジョブを作成する際に出てくる Apache Beam の概念たちをざっくり確認します。

Windowing

windowing の処理は、1 つの PCollecition 内の個々の element に対し、主にタイムスタンプに応じて細分化していくものです。言い換えると、PCollection のデータに対し、window という単位を作ってグルーピングしていくイメージです。例えば、ストリーミングデータを処理していて、集計処理を行う PTransform があった場合、どの単位で集計がなされるのか謎ですよね。window という単位は、そーゆー時に利用されるデータの単位になります。

Windowing subdivides a PCollection into windows according to the timestamps of its individual elements. Windows enable grouping operations over unbounded collections by dividing the collection into windows of finite collections.

windowing 関数は、window の単位の戦略を定義する箇所となります。PCollection 内の個々の element は、基本的には 1 つの window 内にしか存在できないものですが、windowing 関数が指示をした場合、element はそれぞれ異なる window 内に複製することもできます。また、この Windowing 関数も、ユーザー定義関数として自分でつくることもできます。

A windowing function tells the runner how to assign elements to one or more initial windows, and how to merge windows of grouped elements. Each element in a PCollection can only be in one window, so if a windowing function specifies multiple windows for an element, the element is conceptually duplicated into each of the windows and each element is identical except for its window.

高度なコンセプト

ビルトインされている windowing 関数です。

種類名 説明 利用ケース
Fixed time windows (“tumbling windows”) PCollection に与えられたタイムスタンプを基に、一定の時間間隔で、データを重なり無く、window にわけていくもの
Sliding time windows (“hopping windows”) これも時間間隔で window を管理するが、period という概念のもと、一定の期間の element を複数の window に重複して持つことになる
例えば、window の間隔が 60 秒で、period の間隔が 30 秒の場合、各 window は 30 秒ごとに開始され、30 秒分の element が複数の window に存在することになる
平均値を求める処理等
Per-session windows タイムスタンプで window が管理されるものでなく、element 内の key により window を分けるもの
結果として、全く window されないギャップの期間が生まれる
アプリ上のユーザーのマウス動作といったユーザーの行動分析等
Single global window デフォルト
単一のグローバルな window で管理される

文章を読むより、以下の公式ページの図を見る方が分かりやすいです。

8.2.1. Fixed time windows

8.2.2. Sliding time windows

8.2.3. Session windows

Watermarks

watermark とは、データイベントが発生した時刻(event time)と、パイプラインでデータを処理した時刻 (processing time) 間の、ラグを管理するものとなります。言い換えると、本来より遅れて到達・処理されたデータがきた時、どの程度の猶予を windowing に許容するか、という水準になります。

以下の例が分かりやすいです。1 つの window が閉じるまでの間で、遊びを持たせておいてくれるのですね。

For example, let’s say we have a PCollection that’s using fixed-time windowing, with windows that are five minutes long. For each window, Beam must collect all the data with an event time timestamp in the given window range (between 0:00 and 4:59 in the first window, for instance). Data with timestamps outside that range (data from 5:00 or later) belong to a different window.

However, data isn’t always guaranteed to arrive in a pipeline in time order, or to always arrive at predictable intervals. Beam tracks a watermark, which is the system’s notion of when all data in a certain window can be expected to have arrived in the pipeline. Once the watermark progresses past the end of a window, any further element that arrives with a timestamp in that window is considered late data.

From our example, suppose we have a simple watermark that assumes approximately 30s of lag time between the data timestamps (the event time) and the time the data appears in the pipeline (the processing time), then Beam would close the first window at 5:30. If a data record arrives at 5:34, but with a timestamp that would put it in the 0:00-4:59 window (say, 3:38), then that record is late data.

8.4. Watermarks and late data

この猶予時間である wartermark の水準は、データソースによりデフォルトで設定されるらしいですが、明示的に指定することもできるようです。

8.4.1. Managing late data

Trigger

window に集められたデータに Aggregation の変換を行う際、どのタイミングで集計結果を定めるのか、その条件を定義したものが trigger となります。デフォルトでは、watermark で決められたタイミングが trigger のタイミングとなりますが、これを変更することもできます。

When collecting and grouping data into windows, Beam uses triggers to determine when to emit the aggregated results of each window (referred to as a pane). If you use Beam’s default windowing configuration and default trigger, Beam outputs the aggregated result when it estimates all data has arrived, and discards all subsequent data for that window.

trigger 種類 説明
Event time triggers デフォルト
各 element のタイムスタンプに基づき動作
Processing time triggers パイプラインの(指定されてステージにおける)処理時間に基づき動作
Data-driven triggers 到着したデータがある特定の条件に満たしている場合に動作
Composite triggers 上記複数のトリガーを組み合わせて動作

9. Triggers