Cloud Storageが発行してくれるNotificationを、Cloud Pub/Subを介して、Cloud Run上で起動しているFlaskで受け取ります。
環境
構成
構成です。
Cloud Run上にコンテナを建てる
Cloud Pub/SubからのPush通知を受け取るFlaskコンテナを、Cloud Run上に起動します。公式で用意されているチュートリアルを参考にします。
コンテナ用ファイルの作成
Flaskのコンテナを用意します。公式チュートリアルでサンプルコードを用意してくれているため、それを利用します。
run/sample/main.py
Flask向けのmain.py
。
import os import base64 import json from datetime import datetime from flask import Flask, request app = Flask(__name__) @app.route('/', methods=['POST']) def print_notification(): push_message = request.get_json(force=False) data = base64.b64decode(push_message['message']['data']).decode('utf-8') notification = json.loads(data) print(f"{datetime.now()} GCS Notification gs://{notification['bucket']}/{notification['name']}") return ('', 202) if __name__ == '__main__': app.run(debug=True, host='0.0.0.0', port=int(os.environ.get('PORT', 8080)))
Flaskでリッスンするポートにコンテナ環境変数PORT
を指定しています。Cloud Run上コンテナでは、リッスンするポートをコンテナ・デプロイ時に指定することになり、その値は環境変数PORT
に渡されます。デフォルトでは8080
ポートでリッスンするようデプロイされるらしいです。
The container must listen for requests on
0.0.0.0
on the port to which requests are sent. By default, requests are sent to 8080, but you can configure Cloud Run to send requests to the port of your choice. Cloud Run injects thePORT
environment variable into the container. Inside Cloud Run container instances, the value of thePORT
environment variable always reflects the port to which requests are sent. It defaults to8080
.
Listening for requests on the correct port
また、Cloud Pub/SubからPushされるメッセージは以下のフォーマットとなります。実際の本文は[message][data]
部分となり、base64でエンコードされているため、受け取ったメッセージをデコードする処理を追加しています。
{ "message": { "attributes": { "key": "value" }, "data": "SGVsbG8gQ2xvdWQgUHViL1N1YiEgSGVyZSBpcyBteSBtZXNzYWdlIQ==", "messageId": "2070443601311540", "message_id": "2070443601311540", "publishTime": "2021-02-26T19:13:55.749Z", "publish_time": "2021-02-26T19:13:55.749Z" }, "subscription": "projects/myproject/subscriptions/mysubscription" }
[message][data]
に含まれるCloud StorageのNotificationは、以下のフォーマットとなります。
{ "kind": "storage#object", "id": "bucket-name/object-name/1234567890123456", "selfLink": "https://www.googleapis.com/storage/v1/b/bucket-name/o/object-name", "name": "object-name", "bucket": "bucket-name", "generation": "1234567890123456", "metageneration": "1", "contentType": "application/octet-stream", "timeCreated": "2021-07-18T15:58:35.031Z", "updated": "2021-07-18T15:58:35.031Z", "storageClass": "STANDARD", "timeStorageClassUpdated": "2021-07-18T15:58:35.031Z", "size": "398", "md5Hash": "7IpIZhLyyTZt6rOXhjD/2Q==", "mediaLink": "https://www.googleapis.com/download/storage/v1/b/bucket-name/o/object-name?generation=1234567890123456&alt=media", "contentLanguage": "en", "crc32c": "GzLPmQ==", "etag": "CKbKjaX+7PECEAE=" }
run/sample/Dockerfile
利用するDockerfileです。
# Use the official lightweight Python image. # https://hub.docker.com/_/python FROM python:3.9-slim # Allow statements and log messages to immediately appear in the Knative logs ENV PYTHONUNBUFFERED True # Copy local code to the container image. ENV APP_HOME /app WORKDIR $APP_HOME COPY . ./ # Install production dependencies. RUN pip install Flask~=2.0.1 gunicorn~=20.1.0 # Run the web service on container startup. Here we use the gunicorn # webserver, with one worker process and 8 threads. # For environments with multiple CPU cores, increase the number of workers # to be equal to the cores available. # Timeout is set to 0 to disable the timeouts of the workers to allow Cloud Run to handle instance scaling. CMD exec gunicorn --bind :$PORT --workers 1 --threads 8 --timeout 0 main:app
Flaskのほかgunicornをインストールして、CMD命令文ではgunicornの起動コマンドを指定しています。
Gunicorn 'Green Unicorn' is a Python WSGI HTTP Server for UNIX. It's a pre-fork worker model. The Gunicorn server is broadly compatible with various web frameworks, simply implemented, light on server resources, and fairly speedy.
gunicornは、PythonのWSGI Application Serverです。上の引用の通り、HTTP Serverとしてhttpのリクエストを受け取ることができますが、リバースプロキシとして設置したNginxの背後に置くことを推奨されており、純粋なWSGI Application Serverとして利用される事が多い、のでしょうか。今回の構成では、このgunicornがCloud Pub/Subからのpush通知(httpリクエスト)を受け取ることになります。
gunicorn起動コマンドのパラメーターについては下記を参照。--bind
パラメーターで環境変数PORT
が指定されていますが、この値はCloud Runデプロイ時に指定することになります。
run/sample/.dockerignore
.dockerignore
ファイルも用意。
Dockerfile README.md *.pyc *.pyo *.pyd __pycache__ .pytest_cache
コンテナのビルドとプッシュ
コンテナpush先のリポジトリとなる、Artifact RegistryのDockerリポジトリを作成します。Artifact Registryとは、Container Registryを拡張したもので、コンテナイメージのほか、アプリケーションのビルド・デプロイ時に利用するArtifactファイルの管理も行えるフルマネージドサービスとのこと。
以下コマンドにて、Dockerリポジトリを作成します。
$ gcloud artifacts repositories create sample --repository-format=docker --location=asia --description='sample'
[location]-central1-docker.pkg.dev/[project-id]/[リポジトリ名]
gcloud artifacts repositories create
Container Analysisの自動脆弱性スキャンを有効にしておきます。新規push時や新しいCVE公開時に、Artifact Registry内の脆弱性有無を検査してくれます。
$ gcloud services enable containerscanning.googleapis.com
Enabling and disabling automated scanning
Cloud Buildを利用して、Docker buildとpushを行います。Dockerfileの置かれた同一フォルダにて以下コマンドを実行すると、Cloud Build上でDocker Buildされ、タグで指定したArtifact Registryにpushされます。
$ gcloud builds submit --tag asia-docker.pkg.dev/[project-id]/sample/flask:ver1
Cloud Runへコンテナをデプロイ
Artifact Registryにあるコンテナイメージを利用して、Cloud Runにsampleというサービス名にてFlaskをデプロイします。
$ gcloud run deploy sample \ --image=asia-docker.pkg.dev/[project-id]/sample/flask:ver1 \ --port=8080 \ --ingress=internal \ --no-allow-unauthenticated \ --region=asia-northeast1
アクセス元(--ingress
パラメーター)をinternalに限定し、認証のないアクセスを不許可(--no-allow-unauthenticated
パラメーター)とするコンテナをデプロイしています。
Cloud Pub/Subの設定
Cloud Run上のFlaskにpush通知を行うCloud Pub/Subの設定を行います。以下、参考となる公式マニュアル。
Cloud RunへPush用のサービスアカウントの作成
Cloud RunへPush通知するサブスクリプション用のサービスアカウントを作成します。
$ gcloud iam service-accounts create sa-cloud-run-pubsub-invoker \ --display-name="Cloud Run Pub/Sub Invoker"
gcloud iam service-accounts create
Cloud Runのsampleサービスに対して、作成したサービスアカウントよりInvokeできる権限を付与します。
$ gcloud run services add-iam-policy-binding sample \ --member=serviceAccount:sa-cloud-run-pubsub-invoker@[project-id].iam.gserviceaccount.com \ --role=roles/run.invoker \ --region=asia-northeast1
gcloud run services add-iam-policy-binding
トピックの作成
Cloud Pub/Subのトピックを作成します。
$ gcloud pubsub topics create sample-gcs-notification
サブスクリプションの作成
Cloud Pub/Subのサブスクリプションを作成します。
まず、Cloud Pub/Subで利用されるサービスアカウントに、サービスアカウントトークンを作成できる権限を付与します。Cloud Runへ送られるPush通知は、このCloud Pub/Subにて作成されたトークンにて認証処理される事になります。
$ gcloud projects add-iam-policy-binding [project-id] \ --member=serviceAccount:service-123456789012@gcp-sa-pubsub.iam.gserviceaccount.com \ --role=roles/iam.serviceAccountTokenCreator
gcloud projects add-iam-policy-binding
サブスクリプションを作成します。
$ gcloud pubsub subscriptions create sample-push-cloud-run \ --topic=sample-gcs-notification \ --push-endpoint="https://sample-xxxxxxxxxx-an.a.run.app" \ --push-auth-service-account=sa-cloud-run-pubsub-invoker@[project-id].iam.gserviceaccount.com
gcloud pubsub subscriptions create
Cloud Storage Notificationの設定
オブジェクト更新情報をCloud Pub/Subへ連携する、Cloud Storage Notification設定を行います。以下、参考ページです。
サービスエージェントにCloud Pub/Sub向け権限を付与
Cloud Storageで利用されているサービスエージェントに、Cloud Pub/SubへPublishする権限を付与します。サービスエージェントとは、Google Cloudの各サービスをアクセスするために、Google側で管理されたサービスアカウントのことです。
Some Google Cloud services have Google-managed service accounts that allow the services to access your resources. These service accounts are sometimes known as service agents. You might see evidence of these service agents in several different places, including a project's Identity and Access Management (IAM) policy and audit log entries for various services.
以下のコマンドにより、利用されているサービスエージェントを確認します。
$ gsutil kms serviceaccount -p [project-id] service-123456789012@gs-project-accounts.iam.gserviceaccount.com
kms - Configure Cloud KMS encryption
サービスエージェントに権限を付与します。
$ gcloud projects add-iam-policy-binding [project-id] \ --member=serviceAccount:service-123456789012@gs-project-accounts.iam.gserviceaccount.com \ --role=roles/pubsub.publisher
gcloud projects add-iam-policy-binding
Cloud Storageの設定
Cloud Storage Notificationを設定したいバケットを作成します。
$ gsutil mb -l asia-northeast1 gs://pekomiko-chuchu
作成したBucketに、Notificationの設定をします。なお、Notificationのイベント発行条件は以下より選択でき、今回は新規オブジェクト作成時にイベント発行されるOBJECT_FINALIZE
を設定します。
- OBJECT_FINALIZE
- OBJECT_METADATA_UPDATE
- OBJECT_DELETE
- OBJECT_ARCHIVE
$ gsutil notification create -t sample-gcs-notification -f json -e OBJECT_FINALIZE gs://pekomiko-chuchu
テスト
実際にファイルをアップロードして、正しく動作されるか確認してみます。
$ gsutil cp testfile.txt gs://pekomiko-chuchu/sample/
Flaskコンテナ上で出力されるログを、Cloud Loggingより確認します。
$ gcloud logging read "resource.type=cloud_run_revision AND resource.labels.service_name=sample" --project=[project-id] | grep textPayload | grep 'GCS Notification' textPayload: 2021-07-18 16:12:37.294488 GCS Notification gs://pekomiko-chuchu/sample/testfile.txt
ちゃんと出力されています。