Cloud TasksのQueueに投入されたタスクを、Cloud Run上のコンテナ環境で実行する手順を調べます。
構成
構成図です。
環境
Cloud Runにコンテナのデプロイ
Cloud Run上のコンテナは、Cloud TasksからのPOST Requestを受ける必要があるため、POSTを受け取れるAPIサーバーのコンテナイメージを用意しておく必要があります。本内容では、コンテナイメージ作成部分の手順は割愛してしまいます。
作成したコンテナイメージをArtifact Registryにpushした後、Cloud Runへデプロイする部分のみ備忘として記載します。
以下コマンドでコンテナへデプロイします。
$ gcloud run deploy worker \ --concurrency=80 \ --cpu=1000m \ --memory=512Mi \ --image=asia-docker.pkg.dev/[prject-id]/sample/flask:ver1 \ --ingress=all \ --no-allow-unauthenticated \ --max-instances=1 \ --port=8080 \ --timeout=60m \ --region=asia-northeast1
--ingress=all
で--no-allow-unauthenticated
の設定でデプロイしています。Cloud TasksからCloud Pub/SubへのPost Requestをセキュアにしたい訳ですが、以下の理由によりingressの通信をinternalのみに限定することはできません。認証を必須とすることで、セキュアなアクセスを担保しようとしています。
VPC ネットワークから発信されていないトラフィック ソースから内部サービスを呼び出す方法はありません(ただし Pub/Sub または Eventarc は除きます)。つまり、Cloud Scheduler、Cloud Tasks、Workflow では内部サービスを呼び出すことができません。
Cloud TasksのQueueを作成
Queueを作成します。
$ gcloud tasks queues create sample-queue \ --log-sampling-ratio=1.0 \ --max-concurrent-dispatches 1000 \ --max-dispatches-per-second 500 \ --max-attempts=10 \ --max-backoff=3600s \ --min-backoff=10s \ --max-doublings=3
--max-doublings
は、タスク失敗時の再試行間隔秒数が、一定になるまでの回数です。
MAX_DOUBLINGS
is the maximum number of times that the interval between failed task retries will be doubled before the increase becomes constant.
再試行間隔は--min-backoff
の秒数を、再試行の都度2倍されていき、--max-backoff
の間隔となるまで拡大します。2倍になる回数は、--max-doublings
で指定した値となります。上記の設定例では、以下の間隔で拡大していきます。
(初回試行) -[10s]-> (再試行1) -[20s]-> (再試行2) -[40s]-> (再試行3) -[80s]-> (再試行4) -[160s]-> (再試行5) -[240s]-> (再試行6)
--log-sampling-ratio
で指定する値が、Cloud Loggingに出力されるログの割合となります。1.0
はすべてのログをCloud Loggingに出力します。
QueueにTaskを投げるサービスアカウントを用意
Task投入用プログラム実行時に利用するサービスアカウントを用意しておきます。
サービスアカウントを作成します。
$ gcloud iam service-accounts create sa-task-pubsub --description="for test"
gcloud iam service-accounts create
作成したサービスアカウントに、必要となるIAM Roleを付与しておきます。サービスアカウント用の権限のほか、Cloud TasksにEnqueueする権限とWorkerであるCloud RunをInvokeする権限を与えておきます。
$ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/iam.serviceAccountUser' $ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/cloudtasks.enqueuer' $ gcloud projects add-iam-policy-binding [prject-id] --member='serviceAccount:sa-task-pubsub@[prject-id].iam.gserviceaccount.com' --role='roles/run.invoker'
gcloud projects add-iam-policy-binding
作成したサービスアカウントの秘密鍵をダウンロードしておきます。
$ gcloud iam service-accounts keys create ./key-sa-task-pubsub.json --iam-account=sa-task-pubsub@[prject-id].iam.gserviceaccount.com --key-file-type=json
gcloud iam service-accounts keys create
Task投入用のプログラムの作成
Cloud TasksのQueue投入用のプログラムを作成します。下記の公式ドキュメントを参考にしています。
コンフィグファイルを用意。
config.ini
[GENERAL] PROJECT = [prject-id] QUEUE = sample-queue LOCATION = asia-northeast1 URL = https://sample-abcdefg-an.a.run.app TASK_NAME_PREFIX = sample-task- SERVICE_ACCOUNT_EMAIL = sa-task-pubsub@[prject-id].iam.gserviceaccount.com IN_SECONDS = 0
Cloud TasksのClient Libraryを利用して、Taskを投入します。
main.py
from google.cloud import tasks_v2 from google.protobuf import timestamp_pb2, duration_pb2 from google import api_core import datetime import json import configparser import uuid # config setting config = configparser.ConfigParser() config.read('config.ini', encoding='utf-8') # Create a client. client = tasks_v2.CloudTasksClient() # Construct the fully qualified queue name. parent = client.queue_path(config["GENERAL"]["PROJECT"], config["GENERAL"]["LOCATION"], config["GENERAL"]["QUEUE"]) # Construct the request body. task = { "http_request": { # Specify the type of request. "http_method": tasks_v2.HttpMethod.POST, "url": config["GENERAL"]["URL"], # The full url path that the task will be sent to. "oidc_token": {"service_account_email": config["GENERAL"]["SERVICE_ACCOUNT_EMAIL"]}, # authentication for pub/sub } } # Add request body to task. # Convert dict to JSON string payload = json.dumps({ "message": "test message" }) # The API expects a payload of type bytes. converted_payload = payload.encode() # Add the payload to the request. task["http_request"]["body"] = converted_payload # specify http content-type to application/json task["http_request"]["headers"] = {"Content-type": "application/json"} # Add schedule time to task. in_seconds = int(config["GENERAL"]["IN_SECONDS"]) # Convert "seconds from now" into an rfc3339 datetime string. d = datetime.datetime.utcnow() + datetime.timedelta(seconds=in_seconds) # Create Timestamp protobuf. timestamp = timestamp_pb2.Timestamp() timestamp.FromDatetime(d) # Add the timestamp to the tasks. task["schedule_time"] = timestamp # Add the deadline to task dispatch_deadline = duration_pb2.Duration(seconds=1800) task["dispatch_deadline"] = dispatch_deadline # Add the name to tasks. task_name = config["GENERAL"]["TASK_NAME_PREFIX"] + str(uuid.uuid4())[:8] task["name"] = client.task_path(config["GENERAL"]["PROJECT"], config["GENERAL"]["LOCATION"], config["GENERAL"]["QUEUE"], task_name) # Define retry retry = api_core.retry.Retry( initial=5.0, # seconds (default: 0.1) maximum=10.0, # seconds (default: 60.0) multiplier=1.0, # default: 1.3 deadline=30.0 # seconds (default: 60.0) ) # Use the client to build and send the task. response = client.create_task( request={"parent": parent, "task": task}, retry=retry ) print(f"Created task {response.name}")
実行
サービスアカウントの秘密鍵を環境変数に定義して、Task投入プログラムを実行します。
$ export GOOGLE_APPLICATION_CREDENTIALS=./key-sa-task-pubsub.json $ python3 main.py
Cloud Loggingにもログが出力されています。
$ gcloud logging read 'resource.type="cloud_tasks_queue" resource.labels.queue_id="sample-queue" resource.labels.target_type="HTTP" timestamp>="2021-07-31T11:00:00Z"'
insertId: 2iclncc2y3 jsonPayload: '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog attemptResponseLog: dispatchCount: '1' maxAttempts: 0 responseCount: '1' scheduleTime: '2021-07-31T11:45:59.060021Z' status: OK targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/ targetType: HTTP task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log receiveTimestamp: '2021-07-31T11:46:01.490219832Z' resource: labels: location: asia-northeast1 project_id: [prject-id] queue_id: sample-queue target_type: HTTP type: cloud_tasks_queue severity: INFO timestamp: '2021-07-31T11:46:00.271681428Z' --- insertId: 2iclncc2y2 jsonPayload: '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog attemptDispatchLog: dispatchCount: '1' dispatchReason: PUSH_QUEUE maxAttempts: 0 responseCount: '0' scheduleTime: '2021-07-31T11:45:59.060021Z' targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/ targetType: HTTP task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log receiveTimestamp: '2021-07-31T11:46:00.491909215Z' resource: labels: location: asia-northeast1 project_id: [prject-id] queue_id: sample-queue target_type: HTTP type: cloud_tasks_queue severity: INFO timestamp: '2021-07-31T11:45:59.061067362Z' --- insertId: 81p50jcfky jsonPayload: '@type': type.googleapis.com/google.cloud.tasks.logging.v1.TaskActivityLog task: projects/[prject-id]/locations/asia-northeast1/queues/sample-queue/tasks/sample-task-faae211a taskCreationLog: scheduleTime: '2021-07-31T11:45:59.060021Z' status: OK targetAddress: POST https://worker-exb6xe7e5a-an.a.run.app/ targetType: HTTP logName: projects/[prject-id]/logs/cloudtasks.googleapis.com%2Ftask_operations_log receiveTimestamp: '2021-07-31T11:45:59.177742220Z' resource: labels: location: asia-northeast1 project_id: [prject-id] queue_id: sample-queue target_type: HTTP type: cloud_tasks_queue severity: INFO timestamp: '2021-07-31T11:45:58.475179789Z'