Cloud TasksのワーカーとしてCloud Runを利用する

Cloud TasksのQueueに投入されたタスクを、Cloud Run上のコンテナ環境で実行する手順を調べます。

構成

構成図です。

f:id:goodbyegangster:20210731205826p:plain

環境

Cloud Runにコンテナのデプロイ

Cloud Run上のコンテナは、Cloud TasksからのPOST Requestを受ける必要があるため、POSTを受け取れるAPIサーバーのコンテナイメージを用意しておく必要があります。本内容では、コンテナイメージ作成部分の手順は割愛してしまいます。

作成したコンテナイメージをArtifact Registryにpushした後、Cloud Runへデプロイする部分のみ備忘として記載します。

以下コマンドでコンテナへデプロイします。

$ gcloud run deploy worker \
--concurrency=80 \
--cpu=1000m \
--memory=512Mi \
--image=asia-docker.pkg.dev/[prject-id]/sample/flask:ver1 \
--ingress=all \
--no-allow-unauthenticated \
--max-instances=1 \
--port=8080 \
--timeout=60m \
--region=asia-northeast1

gcloud run deploy

--ingress=all--no-allow-unauthenticatedの設定でデプロイしています。Cloud TasksからCloud Pub/SubへのPost Requestをセキュアにしたい訳ですが、以下の理由によりingressの通信をinternalのみに限定することはできません。認証を必須とすることで、セキュアなアクセスを担保しようとしています。

VPC ネットワークから発信されていないトラフィック ソースから内部サービスを呼び出す方法はありません(ただし Pub/Sub または Eventarc は除きます)。つまり、Cloud Scheduler、Cloud Tasks、Workflow では内部サービスを呼び出すことができません。

内部サービスへのアクセス

Cloud TasksのQueueを作成

Queueを作成します。

$ gcloud tasks queues create sample-queue \
--log-sampling-ratio=1.0 \
--max-concurrent-dispatches 1000 \
--max-dispatches-per-second 500 \
--max-attempts=10 \
--max-backoff=3600s \
--min-backoff=10s \
--max-doublings=3

gcloud tasks queues create

--max-doublingsは、タスク失敗時の再試行間隔秒数が、一定になるまでの回数です。

MAX_DOUBLINGS is the maximum number of times that the interval between failed task retries will be doubled before the increase becomes constant.

再試行間隔は--min-backoffの秒数を、再試行の都度2倍されていき、--max-backoffの間隔となるまで拡大します。2倍になる回数は、--max-doublingsで指定した値となります。上記の設定例では、以下の間隔で拡大していきます。

(初回試行) -[10s]-> (再試行1) -[20s]-> (再試行2) -[40s]-> (再試行3) -[80s]-> (再試行4) -[160s]-> (再試行5) -[240s]-> (再試行6)

Cloud Tasks キューの構成

--log-sampling-ratioで指定する値が、Cloud Loggingに出力されるログの割合となります。1.0はすべてのログをCloud Loggingに出力します。

Cloud Logging を使用する

QueueにTaskを投げるサービスアカウントを用意

Task投入用プログラム実行時に利用するサービスアカウントを用意しておきます。

サービスアカウントを作成します。

$ gcloud iam service-accounts create sa-task-pubsub --description="for test"

gcloud iam service-accounts create

作成したサービスアカウントに、必要となるIAM Roleを付与しておきます。サービスアカウント用の権限のほか、Cloud TasksにEnqueueする権限とWorkerであるCloud RunをInvokeする権限を与えておきます。

$ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/iam.serviceAccountUser'
$ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/cloudtasks.enqueuer'
$ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/run.invoker'

gcloud projects add-iam-policy-binding

作成したサービスアカウントの秘密鍵をダウンロードしておきます。

$ gcloud iam service-accounts keys create ./key-sa-task-pubsub.json --iam-account=sa-task-pubsub@[prject-id].iam.gserviceaccount.com --key-file-type=json

gcloud iam service-accounts keys create

Task投入用のプログラムの作成

Cloud TasksのQueue投入用のプログラムを作成します。下記の公式ドキュメントを参考にしています。

HTTP Target タスクの作成

非同期タスクの実行

コンフィグファイルを用意。

config.ini

[GENERAL]
PROJECT = [prject-id]
QUEUE = sample-queue
LOCATION = asia-northeast1
URL = https://sample-abcdefg-an.a.run.app
TASK_NAME_PREFIX = sample-task-
SERVICE_ACCOUNT_EMAIL = sa-task-pubsub@[prject-id].iam.gserviceaccount.com
IN_SECONDS = 0

Cloud TasksのClient Libraryを利用して、Taskを投入します。

CloudTasks

main.py

from google.cloud import tasks_v2
from google.protobuf import timestamp_pb2, duration_pb2
from google import api_core
import datetime
import json
import configparser
import uuid

# config setting
config = configparser.ConfigParser()
config.read('config.ini', encoding='utf-8')

# Create a client.
client = tasks_v2.CloudTasksClient()

# Construct the fully qualified queue name.
parent = client.queue_path(config["GENERAL"]["PROJECT"], config["GENERAL"]["LOCATION"], config["GENERAL"]["QUEUE"])

# Construct the request body.
task = {
    "http_request": {  # Specify the type of request.
        "http_method": tasks_v2.HttpMethod.POST,
        "url": config["GENERAL"]["URL"],  # The full url path that the task will be sent to.
        "oidc_token": {"service_account_email": config["GENERAL"]["SERVICE_ACCOUNT_EMAIL"]},  # authentication for pub/sub
    }
}

# Add request body to task.
# Convert dict to JSON string
payload = json.dumps({ "message": "test message" })
# The API expects a payload of type bytes.
converted_payload = payload.encode()
# Add the payload to the request.
task["http_request"]["body"] = converted_payload
# specify http content-type to application/json
task["http_request"]["headers"] = {"Content-type": "application/json"}

# Add schedule time to task.
in_seconds = int(config["GENERAL"]["IN_SECONDS"])
# Convert "seconds from now" into an rfc3339 datetime string.
d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds)
# Create Timestamp protobuf.
timestamp = timestamp_pb2.Timestamp()
timestamp.FromDatetime(d)
# Add the timestamp to the tasks.
task["schedule_time"] = timestamp

# Add the deadline to task
dispatch_deadline = duration_pb2.Duration(seconds=1800)
task["dispatch_deadline"] = dispatch_deadline

# Add the name to tasks.
task_name = config["GENERAL"]["TASK_NAME_PREFIX"] + str(uuid.uuid4())[:8]
task["name"] = client.task_path(config["GENERAL"]["PROJECT"], config["GENERAL"]["LOCATION"], config["GENERAL"]["QUEUE"], task_name)

# Define retry
retry = api_core.retry.Retry(
    initial=5.0,  # seconds (default: 0.1)
    maximum=10.0,  # seconds (default: 60.0)
    multiplier=1.0,  # default: 1.3
    deadline=30.0  # seconds (default: 60.0)
)

# Use the client to build and send the task.
response = client.create_task(
    request={"parent": parent, "task": task},
    retry=retry
    )

print(f"Created task {response.name}")

実行

サービスアカウントの秘密鍵環境変数に定義して、Task投入プログラムを実行します。

$ export GOOGLE_APPLICATION_CREDENTIALS=./key-sa-task-pubsub.json
$ python3 main.py

Cloud Loggingにもログが出力されています。

$ gcloud logging read 'resource.type="cloud_tasks_queue" resource.labels.queue_id="sample-queue" resource.labels.target_type="HTTP" timestamp>="2021-07-31T11:00:00Z"'
insertId: 2iclncc2y3
jsonPayload:
  '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog
  attemptResponseLog:
    dispatchCount: '1'
    maxAttempts: 0
    responseCount: '1'
    scheduleTime: '2021-07-31T11:45:59.060021Z'
    status: OK
    targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/
    targetType: HTTP
  task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a
logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log
receiveTimestamp: '2021-07-31T11:46:01.490219832Z'
resource:
  labels:
    location: asia-northeast1
    project_id: [prject-id]
    queue_id: sample-queue
    target_type: HTTP
  type: cloud_tasks_queue
severity: INFO
timestamp: '2021-07-31T11:46:00.271681428Z'
---
insertId: 2iclncc2y2
jsonPayload:
  '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog
  attemptDispatchLog:
    dispatchCount: '1'
    dispatchReason: PUSH_QUEUE
    maxAttempts: 0
    responseCount: '0'
    scheduleTime: '2021-07-31T11:45:59.060021Z'
    targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/
    targetType: HTTP
  task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a
logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log
receiveTimestamp: '2021-07-31T11:46:00.491909215Z'
resource:
  labels:
    location: asia-northeast1
    project_id: [prject-id]
    queue_id: sample-queue
    target_type: HTTP
  type: cloud_tasks_queue
severity: INFO
timestamp: '2021-07-31T11:45:59.061067362Z'
---
insertId: 81p50jcfky
jsonPayload:
  '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog
  task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a
  taskCreationLog:
    scheduleTime: '2021-07-31T11:45:59.060021Z'
    status: OK
    targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/
    targetType: HTTP
logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log
receiveTimestamp: '2021-07-31T11:45:59.177742220Z'
resource:
  labels:
    location: asia-northeast1
    project_id: [prject-id]
    queue_id: sample-queue
    target_type: HTTP
  type: cloud_tasks_queue
severity: INFO
timestamp: '2021-07-31T11:45:58.475179789Z'