AWS StepFunctionsのParametersフィールド対応で何が実現されるのか
前回の記事で述べた通り、今年のre:inventではAWS StepFunctionsに対して多くのアップデートがありました。
多くの各種サービスと直接やりとりができるようになりましたが、その副次的な恩恵(?)として、Taskの命令に Parameters
フィールドが対応しました。
よく考えたらこれは結構求めていたものじゃないのか?という気づきがあり、本稿を書いています。
正直何のサービスと繋がろうがこれの方が嬉しいかも。
今回は、これによって何が実現できるのか説明したいと思います。
Parametersフィールドとは
公式の解説
今回追加されたParametersフィールドでは、 ステートマシン上でハードコードしたjson構造と渡ってきたinputの合成ができます。
こんな感じです。
{
"パッケージング": {
"Type": "Task",
"Parameters": {
"query": "/delivery/package",
"payload": {
"original_viideo": "$[0].result.task1",
"preview_video": "$[0].result.task2",
"thumbnail": "$[1].result.task3"
}
},
"Resource": "arn:aws:states:ap-northeast-1:xxxx:activity:blog-demo-activity",
"ResultPath": "$",
"End": true
}
}
要するに今までは相当無理やりなことしないと実現できなかったわけですが、ステートマシン上で定義したjsonと合成できることで、次のフローが組めるようになったかと思います。
- Activityが1つで済む (つまりワーカーが1つで済む)
- ワーカーに渡ってくるのはParameters部分だけなので、タスク実行側はアプリ1つで柔軟に分岐処理を実行でき、かつ自分の責務に専念できる
ActivityTaskと合わせることによるコンテキストと各タスクの処理の分離化
弊社でありそうな次のユースケースを考えてみました。
- 動画と画像を納品するサービスである
- 納品形式のフォーマットはストアによって違い、それぞれ独立したステートマシン
- あるストア向けのステートマシンで、
- オリジナル長の動画をエンコードする
- オリジナル長の動画からプレビュー用に切り出す
- サムネイルをこのストア向けのフォーマットにエンコードする
生成されたストアA納品用ステートマシン図
超スッキリ
ストアA定義
{
"StartAt": "ダミー入力",
"States": {
"ダミー入力": {
"Type": "Pass",
"Result": {
"context_id": 123456789,
"resources": {
"image": {
"bucket": "xxxxxxx",
"key": "yyyyyyy"
},
"video": {
"bucket": "xxxxxxx",
"key": "yyyyyyy"
}
},
"result": {}
},
"Next": "all-task"
},
"all-task": {
"Type": "Parallel",
"Next": "パッケージング",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "失敗"
}
],
"Branches": [
{
"StartAt": "動画のエンコード",
"States": {
"動画のエンコード": {
"Type": "Task",
"Parameters": {
"query": "/video/encode/original/",
"payload": {
"format" : "storeA",
"resource": "$.resources.video"
}
},
"Resource": "arn:aws:states:ap-northeast-1:xxx:activity:blog-demo-activity",
"ResultPath": "$.result.task1",
"Next": "短縮版動画のエンコード"
},
"短縮版動画のエンコード": {
"Type": "Task",
"Parameters": {
"query": "/video/encode/preview/",
"payload": {
"start_time": "12",
"end_time": "42",
"resource": "$.result.task1"
}
},
"Resource": "arn:aws:states:ap-northeast-1:xxx:activity:blog-demo-activity",
"ResultPath": "$.result.task2",
"End": true
}
}
},
{
"StartAt": "サムネイルのエンコード",
"States": {
"サムネイルのエンコード": {
"Type": "Task",
"Parameters": {
"query": "/image/encode/task1",
"payload": {
"output_width": 640,
"output-height": 480,
"resource": "$.resources.image"
}
},
"Resource": "arn:aws:states:ap-northeast-1:xxx:activity:blog-demo-activity",
"ResultPath": "$.result.task3",
"End": true
}
}
}
]
},
"パッケージング": {
"Type": "Task",
"Parameters": {
"query": "/delivery/package",
"payload": {
"original_viideo": "$[0].result.task1",
"preview_video": "$[0].result.task2",
"thumbnail": "$[1].result.task3"
}
},
"Resource": "arn:aws:states:ap-northeast-1:330798132035:activity:blog-demo-activity",
"ResultPath": "$",
"End": true
},
"失敗": {
"Type": "Pass",
"End": true
}
}
}
Activityワーカー側
柔軟な入力処理のおかげで、Activityワーカーアプリの購読Arnは1つで済むようになります。
今回はテスト用にLambda(nodejs)のコードで書きましたが、通常通りEC2上などで実行しても話は一緒です。
const AWS = require("aws-sdk");
const stepfunctions = new AWS.StepFunctions();
exports.handler = async (event) => {
const params = {
activityArn: process.env.ACTIVITY_ARN, /* required */
};
const startTimeSec = new Date().getTime() / 1000;
const limit = 1;
let nonExistCount = 0;
while (((new Date().getTime() / 1000) - startTimeSec < 300) && nonExistCount < limit ){
const receiveEvent = await stepfunctions.getActivityTask(params).promise();
console.log(JSON.stringify(receiveEvent));
if (!receiveEvent.taskToken){
nonExistCount++;
continue;
}
const taskToken = receiveEvent.taskToken;
const input = JSON.parse(receiveEvent.input);
const query = input.query;
const payload = input.payload;
const result = await handleTaskRouting(query , payload , taskToken);
console.log(result);
if (result.isSyncTask) {
console.log("処理の成功を通知します");
await stepfunctions.sendTaskSuccess({
taskToken : taskToken,
output : JSON.stringify(result.output)
}).promise();
}
}
console.log("処理を終了します");
};
const handleTaskRouting = async (query , payload , taskToken)=>{
// queryによって命令を分岐し、同期処理を実行したり、他のサービスに移譲したり...
// ここでは同期的レスポンスのダミーを返す
return {
isSyncTask : true,
output : {
bucket : "@@@@@@@@@@",
key : "[[[[[[[[[[[["
}
}
};
ここでは、ワーカー側はquery
フィールドによって処理のルーティングが柔軟にできるようにしました。
ここから先は、同期的ににSuccess
を返そうが、MediaConvertや外部APIなど他の非同期サービスにtaskToken
を渡して処理を移譲しようがやれることは自由となります。
実行
いいぞ
まとめ
入出力に柔軟性がなさすぎて複雑なワークフローエンジンとしてのStepFunctionsの使用を見送っていたところもあったのですが、今回の合成のアップデート1つででかなり使いやすくなったかと思います、
ステートマシンが事情ありきの謎のポーリングループで埋まると可読性も悪いですし、自分は割とこのパターンが好きかなあ、というところです。