Cloud Storage NotificationをCloud Run上コンテナで受け取る

Cloud Storageが発行してくれるNotificationを、Cloud Pub/Subを介して、Cloud Run上で起動しているFlaskで受け取ります。

環境

構成

構成です。

f:id:goodbyegangster:20210722171815p:plain

Cloud Run上にコンテナを建てる

Cloud Pub/SubからのPush通知を受け取るFlaskコンテナを、Cloud Run上に起動します。公式で用意されているチュートリアルを参考にします。

Python サービスをビルドしてデプロイする

コンテナ用ファイルの作成

Flaskのコンテナを用意します。公式チュートリアルでサンプルコードを用意してくれているため、それを利用します。

run/sample/main.py

Flask向けのmain.py

import os
import base64
import json
from datetime import datetime 

from flask import Flask, request

app = Flask(__name__)

@app.route('/', methods=['POST'])
def print_notification():
    push_message = request.get_json(force=False)
    data = base64.b64decode(push_message['message']['data']).decode('utf-8')
    notification = json.loads(data)
    print(f"{datetime.now()}  GCS Notification gs://{notification['bucket']}/{notification['name']}")
    return ('', 202)

if __name__ == '__main__':
    app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))

Flaskでリッスンするポートにコンテナ環境変数PORTを指定しています。Cloud Run上コンテナでは、リッスンするポートをコンテナ・デプロイ時に指定することになり、その値は環境変数PORTに渡されます。デフォルトでは8080ポートでリッスンするようデプロイされるらしいです。

The container must listen for requests on 0.0.0.0 on the port to which requests are sent. By default, requests are sent to 8080, but you can configure Cloud Run to send requests to the port of your choice. Cloud Run injects the PORT environment variable into the container. Inside Cloud Run container instances, the value of the PORT environment variable always reflects the port to which requests are sent. It defaults to 8080.

Listening for requests on the correct port

また、Cloud Pub/SubからPushされるメッセージは以下のフォーマットとなります。実際の本文は[message][data]部分となり、base64エンコードされているため、受け取ったメッセージをデコードする処理を追加しています。

{
    "message": {
        "attributes": {
            "key": "value"
        },
        "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==",
        "messageId": "2070443601311540",
        "message_id": "2070443601311540",
        "publishTime": "2021-02-26T19:13:55.749Z",
        "publish_time": "2021-02-26T19:13:55.749Z"
    },
    "subscription": "projects/myproject/subscriptions/mysubscription"
}

push サブスクリプションの使用

[message][data]に含まれるCloud StorageのNotificationは、以下のフォーマットとなります。

{
  "kind": "storage#object",
  "id": "bucket-name/object-name/1234567890123456",
  "selfLink": "https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name",
  "name": "object-name",
  "bucket": "bucket-name",
  "generation": "1234567890123456",
  "metageneration": "1",
  "contentType": "application/octet-stream",
  "timeCreated": "2021-07-18T15:58:35.031Z",
  "updated": "2021-07-18T15:58:35.031Z",
  "storageClass": "STANDARD",
  "timeStorageClassUpdated": "2021-07-18T15:58:35.031Z",
  "size": "398",
  "md5Hash": "7IpIZhLyyTZt6rOXhjD/2Q==",
  "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucket-name/o/object-name?generation=1234567890123456&alt=media",
  "contentLanguage": "en",
  "crc32c": "GzLPmQ==",
  "etag": "CKbKjaX+7PECEAE="
}

run/sample/Dockerfile

利用するDockerfileです。

# Use the official lightweight Python image.
# https://hub.docker.com/_/python
FROM python:3.9-slim

# Allow statements and log messages to immediately appear in the Knative logs
ENV PYTHONUNBUFFERED True

# Copy local code to the container image.
ENV APP_HOME /app
WORKDIR $APP_HOME
COPY . ./

# Install production dependencies.
RUN pip install Flask~=2.0.1 gunicorn~=20.1.0

# Run the web service on container startup. Here we use the gunicorn
# webserver, with one worker process and 8 threads.
# For environments with multiple CPU cores, increase the number of workers
# to be equal to the cores available.
# Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling.
CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app

Flaskのほかgunicornをインストールして、CMD命令文ではgunicornの起動コマンドを指定しています。

Gunicorn 'Green Unicorn' is a Python WSGI HTTP Server for UNIX. It's a pre-fork worker model. The Gunicorn server is broadly compatible with various web frameworks, simply implemented, light on server resources, and fairly speedy.

gunicorn

gunicornは、PythonWSGI Application Serverです。上の引用の通り、HTTP Serverとしてhttpのリクエストを受け取ることができますが、リバースプロキシとして設置したNginxの背後に置くことを推奨されており、純粋なWSGI Application Serverとして利用される事が多い、のでしょうか。今回の構成では、このgunicornがCloud Pub/Subからのpush通知(httpリクエスト)を受け取ることになります。

gunicorn起動コマンドのパラメーターについては下記を参照。--bindパラメーターで環境変数PORTが指定されていますが、この値はCloud Runデプロイ時に指定することになります。

Running Gunicorn

run/sample/.dockerignore

.dockerignoreファイルも用意。

Dockerfile
README.md
*.pyc
*.pyo
*.pyd
__pycache__
.pytest_cache

コンテナのビルドとプッシュ

コンテナpush先のリポジトリとなる、Artifact RegistryのDockerリポジトリを作成します。Artifact Registryとは、Container Registryを拡張したもので、コンテナイメージのほか、アプリケーションのビルド・デプロイ時に利用するArtifactファイルの管理も行えるフルマネージドサービスとのこと。

以下コマンドにて、Dockerリポジトリを作成します。

$ gcloud artifacts repositories create sample --repository-format=docker --location=asia --description='sample'

以下の命名規則リポジトリが作成されます。

[location]-central1-docker.pkg.dev/[project-id]/[リポジトリ名]

gcloud artifacts repositories create

Container Analysisの自動脆弱性スキャンを有効にしておきます。新規push時や新しいCVE公開時に、Artifact Registry内の脆弱性有無を検査してくれます。

$ gcloud services enable containerscanning.googleapis.com

Enabling and disabling automated scanning

Cloud Buildを利用して、Docker buildとpushを行います。Dockerfileの置かれた同一フォルダにて以下コマンドを実行すると、Cloud Build上でDocker Buildされ、タグで指定したArtifact Registryにpushされます。

$ gcloud builds submit --tag asia-docker.pkg.dev/[project-id]/sample/flask:ver1

Dockerfile によるビルド

Cloud Runへコンテナをデプロイ

Artifact Registryにあるコンテナイメージを利用して、Cloud Runにsampleというサービス名にてFlaskをデプロイします。

$ gcloud run deploy sample \
--image=asia-docker.pkg.dev/[project-id]/sample/flask:ver1 \
--port=8080 \
--ingress=internal \
--no-allow-unauthenticated \
--region=asia-northeast1

アクセス元(--ingressパラメーター)をinternalに限定し、認証のないアクセスを不許可(--no-allow-unauthenticatedパラメーター)とするコンテナをデプロイしています。

gcloud run deploy

Cloud Pub/Subの設定

Cloud Run上のFlaskにpush通知を行うCloud Pub/Subの設定を行います。以下、参考となる公式マニュアル。

Pub/Sub の push によるトリガー

Cloud RunへPush用のサービスアカウントの作成

Cloud RunへPush通知するサブスクリプション用のサービスアカウントを作成します。

$ gcloud iam service-accounts create sa-cloud-run-pubsub-invoker \
--display-name="Cloud Run Pub/Sub Invoker"

gcloud iam service-accounts create

Cloud Runのsampleサービスに対して、作成したサービスアカウントよりInvokeできる権限を付与します。

$ gcloud run services add-iam-policy-binding sample \
--member=serviceAccount:sa-cloud-run-pubsub-invoker@[project-id].iam.gserviceaccount.com \
--role=roles/run.invoker \
--region=asia-northeast1

gcloud run services add-iam-policy-binding

トピックの作成

Cloud Pub/Subのトピックを作成します。

$ gcloud pubsub topics create sample-gcs-notification

gcloud pubsub topics create

サブスクリプションの作成

Cloud Pub/Subのサブスクリプションを作成します。

まず、Cloud Pub/Subで利用されるサービスアカウントに、サービスアカウントトークンを作成できる権限を付与します。Cloud Runへ送られるPush通知は、このCloud Pub/Subにて作成されたトークンにて認証処理される事になります。

push エンドポイントによる認証と承認

$ gcloud projects add-iam-policy-binding [project-id] \
--member=serviceAccount:service-123456789012@gcp-sa-pubsub.iam.gserviceaccount.com \
--role=roles/iam.serviceAccountTokenCreator

gcloud projects add-iam-policy-binding

サブスクリプションを作成します。

$ gcloud pubsub subscriptions create sample-push-cloud-run \
--topic=sample-gcs-notification \
--push-endpoint="https://sample-xxxxxxxxxx-an.a.run.app" \
--push-auth-service-account=sa-cloud-run-pubsub-invoker@[project-id].iam.gserviceaccount.com

gcloud pubsub subscriptions create

Cloud Storage Notificationの設定

オブジェクト更新情報をCloud Pub/Subへ連携する、Cloud Storage Notification設定を行います。以下、参考ページです。

Cloud Storage の Pub/Sub 通知の構成

サービスエージェントにCloud Pub/Sub向け権限を付与

Cloud Storageで利用されているサービスエージェントに、Cloud Pub/SubへPublishする権限を付与します。サービスエージェントとは、Google Cloudの各サービスをアクセスするために、Google側で管理されたサービスアカウントのことです。

Some Google Cloud services have Google-managed service accounts that allow the services to access your resources. These service accounts are sometimes known as service agents. You might see evidence of these service agents in several different places, including a project's Identity and Access Management (IAM) policy and audit log entries for various services.

Service agents

以下のコマンドにより、利用されているサービスエージェントを確認します。

$ gsutil kms serviceaccount -p [project-id]
service-123456789012@gs-project-accounts.iam.gserviceaccount.com

kms - Configure Cloud KMS encryption

サービスエージェントに権限を付与します。

$ gcloud projects add-iam-policy-binding [project-id] \
--member=serviceAccount:service-123456789012@gs-project-accounts.iam.gserviceaccount.com \
--role=roles/pubsub.publisher

gcloud projects add-iam-policy-binding

Cloud Storageの設定

Cloud Storage Notificationを設定したいバケットを作成します。

$ gsutil mb -l asia-northeast1 gs://pekomiko-chuchu

gsutil mb

作成したBucketに、Notificationの設定をします。なお、Notificationのイベント発行条件は以下より選択でき、今回は新規オブジェクト作成時にイベント発行されるOBJECT_FINALIZEを設定します。

  • OBJECT_FINALIZE
  • OBJECT_METADATA_UPDATE
  • OBJECT_DELETE
  • OBJECT_ARCHIVE

イベントの種類

$ gsutil notification create -t sample-gcs-notification -f json -e OBJECT_FINALIZE gs://pekomiko-chuchu

gsutil notification

テスト

実際にファイルをアップロードして、正しく動作されるか確認してみます。

$ gsutil cp testfile.txt gs://pekomiko-chuchu/sample/

cp - Copy files and objects

Flaskコンテナ上で出力されるログを、Cloud Loggingより確認します。

$ gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=sample" --project=[project-id] | grep textPayload | grep 'GCS Notification'
textPayload: 2021-07-18 16:12:37.294488  GCS Notification gs://pekomiko-chuchu/sample/testfile.txt

ちゃんと出力されています。