goodbyegangsterのブログ

備忘録的な

Data Pipeline に関する備忘録

Data Pipeline を触っていた時の備忘録です。

(1) Data Pipelineコンソールからは、作成できない設定が多数

GUIからは PreconditionsResourcesは設定できない。JSONの定義ファイルを作成して、インポートしてあげる。というか、GUIがそもそも分かりにくいので、定義ファイル書いた方が楽。

(2) 定義ファイルにスケジュール情報は記載不要

定義ファイルをData Pipelineコンソールにインポートした際、コンソール上でスケジュール情報を入力することになるので、定義ファイルに記載は冗長になるだけでした。

(3) デフォルトの Ec2Resource でLauchされるAMIは超古い

利用するAMIを指定しない場合、PV AMI 仮想化タイプを利用して起動するので、PV AMIに対応していないインスタンスタイプを利用しようとするとエラーになります。デフォルトのAMIでなく、最近のAMIを指定しておくことが無難と思います。

下記はデフォルトのAMI。2013年のものが起動されてくる。

Ec2Resource - AWS Data Pipeline

(4) Data Pipeline のログ出力先S3には、KEYをちゃんと用意する

Keyを指定してあげないと、以下のようなエラーメッセージを出して、Activityが必ずFailedする。

Unable to upload file:file:/mnt/taskRunner/output/logs/df-aaaaaaaaaaaa/ShellCommandActivity/@ShellCommandActivity_2018-05-01T00:00:00/@ShellCommandActivity_2018-05-01T00:00:00_Attempt=2/StdOutput to bucket: test-datapipeline-log-test key:
(5) parameterのidは my を頭文字に持つ

my から始まるidでないと、pipeline作成時に怒られる。

パラメータ化されたテンプレートを使用したパイプラインの作成 - AWS Data Pipeline

パイプライン定義ファイルを作成する場合、#{myVariable} という構文を使用して変数を指定します。変数の前にプレフィックス my を付ける必要があります。

(6) lateAfterTimeout は、ondemandの設定では使えない

lateAfterTimeout オプション、及びこいつに紐づく onLateAction オプションは、パイプラインのスケジュールタイプが cron の時に利用可能で、 ondemand の時に利用しようとしても、パイプライン作成時に怒られます。

(7) SqlActivityの実行SQL内で変数を利用する方法

変数としたい値を parameter で用意してあげて、 script オプションにて #{パラメータ名} で指定してあげればOK。

RedshiftのVacuum処理の場合、 vacuum #{myRedshiftSchemaName}.#{myRedshiftTableName} みたいな感じ。以下はサンプルJSON

{
  "objects": [
    (略)
    {
      "id": "SqlActivity-test",
      "type": "SqlActivity",
      "name": "SqlActivity-test",
      "database": {
        "ref": "RedshiftCluster"
      },
      "script": "#{mySQLScriptString}",
      "runsOn": {
        "ref": "Ec2Instance"
      }
    },
    (略)
  ],
  "parameters": [
    (略)
    {
      "id": "mySQLScriptString",
      "type": "String",
      "description": "SQL String"
    },
    (略)
  ],
  "values": {
    "myRedshiftSchemaName": "test",
    "myRedshiftTableName": "test_table",
    "mySQLScriptString": "vacuum #{myRedshiftSchemaName}.#{myRedshiftTableName}",
  }
}

マニュアルを読んでいると、 RedshiftDataNode を用意すれば、 #{output.tableName} みたいにデータノードにあたるテーブルを指定できそうな事が記載されていますが、これは HiveActivity でないと利用できない。

(8) ShellCommandActivityのCommandで複数の処理を実行する

(処理1) && (処理2) みたく繋げてあげると、複数処理を実行してくれます。

"command": "(sudo pip install --upgrade --user awscli 2>/dev/null) && (aws s3 mv s3://test-bucket1/test s3://test-bucket2/test)",
(9) ShellCommandActivityで、データのステージングを利用する

ShellCommandActivity では、 S3DataNode をステージングとして利用できます。ステージングって、いまいち分かりにくいですが、変数みたいに扱えるって理解で問題ないです。

下記のようなjson定義を作成した場合、commandオプションで指定している cp {INPUT1_STAGING_DIR} {OUTPUT1_STAGING_DIR}) は、 {INPUT1_STAGING_DIR} はinputオプションで指定したS3フォルダを、 {OUTPUT1_STAGING_DIR} はoutputオプションで指定したS3フォルダを、参照することになります。

この機能を利用するためには、 "stage": "true" とstageオプションを有効にしないといけません。

{
  "id": "ShellCommandActivity",
  "type": "ShellCommandActivity",
  "name": "ShellCommandActivity",
  "command": "cp {INPUT1_STAGING_DIR} {OUTPUT1_STAGING_DIR})",
  "stage": "true",
  "runsOn": {
    "ref": "Ec2Instance"
  },
  "input": {
    "ref": "S3HulftDataNode"
  },
  "output": {
    "ref": "S3RedshiftDataNode"
  }
},

下記は公式ドキュメント。

パイプラインのアクティビティによるデータとテーブルのステージング - AWS Data Pipeline