Data Pipeline に関する備忘録
Data Pipeline
を触っていた時の備忘録です。
(1) Data Pipelineコンソールからは、作成できない設定が多数
GUIからは Preconditions
や Resources
は設定できない。JSONの定義ファイルを作成して、インポートしてあげる。というか、GUIがそもそも分かりにくいので、定義ファイル書いた方が楽。
(2) 定義ファイルにスケジュール情報は記載不要
定義ファイルをData Pipelineコンソールにインポートした際、コンソール上でスケジュール情報を入力することになるので、定義ファイルに記載は冗長になるだけでした。
(3) デフォルトの Ec2Resource でLauchされるAMIは超古い
利用するAMIを指定しない場合、PV AMI 仮想化タイプを利用して起動するので、PV AMIに対応していないインスタンスタイプを利用しようとするとエラーになります。デフォルトのAMIでなく、最近のAMIを指定しておくことが無難と思います。
下記はデフォルトのAMI。2013年のものが起動されてくる。
Ec2Resource - AWS Data Pipeline
(4) Data Pipeline のログ出力先S3には、KEYをちゃんと用意する
Keyを指定してあげないと、以下のようなエラーメッセージを出して、Activityが必ずFailedする。
Unable to upload file:file:/mnt/taskRunner/output/logs/df-aaaaaaaaaaaa/ShellCommandActivity/@ShellCommandActivity_2018-05-01T00:00:00/@ShellCommandActivity_2018-05-01T00:00:00_Attempt=2/StdOutput to bucket: test-datapipeline-log-test key:
(5) parameterのidは my
を頭文字に持つ
my
から始まるidでないと、pipeline作成時に怒られる。
パラメータ化されたテンプレートを使用したパイプラインの作成 - AWS Data Pipeline
パイプライン定義ファイルを作成する場合、#{myVariable} という構文を使用して変数を指定します。変数の前にプレフィックス my を付ける必要があります。
(6) lateAfterTimeout
は、ondemandの設定では使えない
lateAfterTimeout
オプション、及びこいつに紐づく onLateAction
オプションは、パイプラインのスケジュールタイプが cron
の時に利用可能で、 ondemand
の時に利用しようとしても、パイプライン作成時に怒られます。
(7) SqlActivityの実行SQL内で変数を利用する方法
変数としたい値を parameter
で用意してあげて、 script
オプションにて #{パラメータ名}
で指定してあげればOK。
RedshiftのVacuum処理の場合、 vacuum #{myRedshiftSchemaName}.#{myRedshiftTableName}
みたいな感じ。以下はサンプルJSON。
{ "objects": [ (略) { "id": "SqlActivity-test", "type": "SqlActivity", "name": "SqlActivity-test", "database": { "ref": "RedshiftCluster" }, "script": "#{mySQLScriptString}", "runsOn": { "ref": "Ec2Instance" } }, (略) ], "parameters": [ (略) { "id": "mySQLScriptString", "type": "String", "description": "SQL String" }, (略) ], "values": { "myRedshiftSchemaName": "test", "myRedshiftTableName": "test_table", "mySQLScriptString": "vacuum #{myRedshiftSchemaName}.#{myRedshiftTableName}", } }
マニュアルを読んでいると、 RedshiftDataNode
を用意すれば、 #{output.tableName}
みたいにデータノードにあたるテーブルを指定できそうな事が記載されていますが、これは HiveActivity
でないと利用できない。
(8) ShellCommandActivityのCommandで複数の処理を実行する
(処理1) && (処理2)
みたく繋げてあげると、複数処理を実行してくれます。
"command": "(sudo pip install --upgrade --user awscli 2>/dev/null) && (aws s3 mv s3://test-bucket1/test s3://test-bucket2/test)",
(9) ShellCommandActivityで、データのステージングを利用する
ShellCommandActivity
では、 S3DataNode
をステージングとして利用できます。ステージングって、いまいち分かりにくいですが、変数みたいに扱えるって理解で問題ないです。
下記のようなjson定義を作成した場合、commandオプションで指定している cp {INPUT1_STAGING_DIR} {OUTPUT1_STAGING_DIR})
は、 {INPUT1_STAGING_DIR}
はinputオプションで指定したS3フォルダを、 {OUTPUT1_STAGING_DIR}
はoutputオプションで指定したS3フォルダを、参照することになります。
この機能を利用するためには、 "stage": "true"
とstageオプションを有効にしないといけません。
{ "id": "ShellCommandActivity", "type": "ShellCommandActivity", "name": "ShellCommandActivity", "command": "cp {INPUT1_STAGING_DIR} {OUTPUT1_STAGING_DIR})", "stage": "true", "runsOn": { "ref": "Ec2Instance" }, "input": { "ref": "S3HulftDataNode" }, "output": { "ref": "S3RedshiftDataNode" } },
下記は公式ドキュメント。