AWS Stepfunctionsのエラー捕捉のデザインパターン
この記事は WanoグループAdvent Calendar 2018 1日目の記事になります。
なんとまあ今年もアドベントカレンダーの季節です。1年速い……..
re:invent 2018で発表されたStepfunctionsのアップデート
今年も re:invent 2018で大量のサーバーレス周りのアップデートが施されました。
StepFunctionsもその一つで、複数のサービスとの連携が新たに発表されました。
New – Compute, Database, Messaging, Analytics, and Machine Learning Integration for AWS Step Functions
個人的に気になっているのは、FargateタスクやBatchタスクの直接の実行と待ち受けがステートマシン上で可能になったことです。
実行ワークフロー上でコンテナを使うタスクが必要になった時でも、ActivityによるTaskTokenの引き渡しや、やイケてないポーリングループによる実行結果待ちなどが不要になります。
また、DynamoDBと直接の値の参照と変更がステートマシンの文法で可能になったことで、ワークフロー固有ロジック/context的な部分をユースケース層的な感じでステートマシン上に閉じ込め、各実行タスクは自身の作業に集中できるようになり、DRYな環境が保ちやすくなったのかな、と思います。
不満点
とはいえ、まだまだワークフローエンジンとして自分が欲しかったものとして「もうちょっと惜しいなー」という環境であることも確かです。
- Activity や Lambda のイベントにデフォルトで「context_id」的なものを引き継ぎたい
- 「待ち受け」をスッキリ書くためのステートマシン自体の入れ子/再利用機能
- 動的なパラレル機構(これはフォーラムでも前向きに検討中とされていた)
- ステートメント言語自体のアップデート。ステート毎の入出力の柔軟化
- runtime error捕捉もしくはステート遷移のCloudWatch Event対応
…等々。
StepFunctionsにおけるエラー捕捉
なんか色々書いてしまいましたが、今回は、最後のエラー捕捉の話と現状での対策について書いてみようかな、と思います。
StepFunctionsでステートマシン実行時のエラーを捕捉するにはいくつかの方法があるのですが、ここでなかなかハマりどころがありました。
- try/catchパターン
- Runtimeエラー捕捉パターン
この2つについてご紹介します。
try/catchパターン実装
愚直なcatchの問題
Step Functionsではタスクごとにエラーをcatchし、独自の例外処理に分岐できる機構があります。
その時にはもちろん、各タスクごとにエラー処理を書く必要があるのですが、やっとみるとこれがなかなか曲者です。
つらつら書いていくと以下のようになります。
{
"StartAt": "ランダムにエラーを起こすタスク1",
"TimeoutSeconds": 10,
"States": {
"ランダムにエラーを起こすタスク1": {
"InputPath": "$",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxx:function:lambda-for-blog",
"Next": "ランダムにエラーを起こすタスク2",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "失敗時になんかするタスク"
}
]
},
"ランダムにエラーを起こすタスク2": {
"InputPath": "$",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxx:function:lambda-for-blog",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "失敗時になんかするタスク"
}
],
"Next": "ランダムにエラーを起こすタスク3"
},
"ランダムにエラーを起こすタスク3": {
"InputPath": "$",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxx:function:lambda-for-blog",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "失敗時になんかするタスク"
}
],
"Next": "成功"
},
"失敗時になんかするタスク": {
"Type": "Pass",
"Next": "失敗"
},
"失敗": {
"Type": "Fail"
},
"成功": {
"Type": "Pass",
"End": true
}
}
}
実行
同じ例外処理に行きつきたいだけなのに、Catch部分の設定が頻出し、冗長です。
ステートマシン図上で見たときも、なにやらごちゃごちゃとしてしまいます。
とりあえずParallelタスクで包む
そこで、シンプルな try/catchを実装するには以下のようなパターンがあります。
直列実行でもとりあえずParallelで包んじゃうことです。
{
"StartAt": "初期タスク",
"TimeoutSeconds": 10,
"States": {
"初期タスク": {
"Type": "Parallel",
"Next": "成功",
"Catch": [
{
"ErrorEquals": [
"States.ALL"
],
"Next": "失敗時になんかするタスク"
}
],
"Branches": [
{
"StartAt": "ランダムにエラーを起こすタスク1",
"States": {
"ランダムにエラーを起こすタスク1": {
"InputPath": "$",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxx:function:lambda-for-blog",
"Next": "ランダムにエラーを起こすタスク2"
},
"ランダムにエラーを起こすタスク2": {
"InputPath": "$",
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxx:function:lambda-for-blog",
"End" : true
}
}
}
]
},
"失敗時になんかするタスク": {
"Type": "Pass",
"Next": "失敗"
},
"失敗": {
"Type": "Fail"
},
"成功": {
"Type": "Pass",
"End": true
}
}
}
実行
Runtimeエラー捕捉パターン実装
runtimeエラーが捕捉できない
しかし、これでも1つ問題があります。
エラー待ち受けパターンの "States.ALL"
でもRuntimeエラーの捕捉は出来ないということです。
Runtimeエラーというのは、各種タスクの実行の「中」で出力されたエラーではありません。
多くはステート間のjsonの引き渡し中に発生し、値の捕捉に失敗するケースで発生します。
基本的には起こさないのが当然だろう、という見方もあるのですが、複雑なアプリケーションになり、大きなjsonを引き渡し加工していくようなユースケースになると、十分起こりえるかな、とも思います。
その際、確実にこのワークフローのエラー自体を捕捉し、何か個別にアクションを起こす必要が生じます。
ですが、Cloudwatchアラームでは個別のステートマシンの実行内容については感知できませんし、 CloudWatch EventでもStepFunctionsのステートの変更イベントは 個別に捕捉できない ので、そちらで対応するのも現在のところ無理そうです。
ステートマシンのエラー捕捉用のステートマシン
苦肉の策として、ステートマシンの特定の実行自体をモニタリングするステートマシンを作る、というパターンがあります。
具体的には、実行したいStepFunctions(A)の実行時に、同時にそのarnを引き渡し、監視用のステートマシンを実行させます。
監視用ステートマシンは、引き渡されたarnから監視対象のステートマシンが完了かエラー状態になるまで随時ループで読み取っていきます。
{
"StartAt": "Wait",
"States": {
"Wait": {
"Type": "Wait",
"Seconds": 10,
"Next": "WatchStateMachineStatus",
"OutputPath": "$"
},
"WatchStateMachineStatus": {
"Type": "Task",
"Resource": "arn:aws:lambda:ap-northeast-1:xxxxxx:function:stepfuncrions-runtime-error-kansi",
"Next": "StatusCheck",
"ResultPath": "$",
"OutputPath": "$"
},
"StatusCheck": {
"Type": "Choice",
"OutputPath": "$",
"Choices": [
{
"Variable": "$.status",
"StringEquals": "RUNNING",
"Next": "Wait"
},
{
"Variable": "$.status",
"StringEquals": "SUCCEEDED",
"Next": "End"
},
{
"Variable": "$.status",
"StringEquals": "ERROR",
"Next": "DispatchFail"
}
],
"Default": "DispatchFail"
},
"DispatchFail": {
"Type": "Pass",
"Next": "End"
},
"End": {
"Type": "Pass",
"End": true
}
}
}
監視用Lambda Functions
const AWS = require("aws-sdk");
const stepfunctions = new AWS.StepFunctions({apiVersion: '2016-11-23' , region : "ap-northeast-1"});
exports.handler = async (event) => {
const params = {
executionArn: event.arn
};
const r = await stepfunctions.describeExecution(params).promise();
const status = r.status;
if (status == `RUNNING`){
return {
arn: event.arn,
status : `RUNNING`
}
}
if (status == `SUCCEEDED`){
return {
arn: event.arn,
status : `SUCCEEDED`
}
}
return {
arn: event.arn,
status : `ERROR`
}
};
まとめ
以上2パターンをご紹介しました。
StepFunctionsはちょっとした非同期タスクつなぎのユースケースでは使えるのですが、
もっと複雑なワークフローを構築する必要が出てくると、まだまだごちゃごちゃとしたハックが必要になってしまうので、業務ではSQSベースのオレオレワークフローエンジンを実装してしまいました。
ですが、「どこでなにが起こっているか」の図示やワークフローごとのログの集約が図れるので、基本的にはとてもいいツールだと思っています。
他で挙げた不満点に関しても実は度々フォーラムで上がっているのを見かけるので、対応されるととっても嬉しいな!と期待しております。