AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい
書くこと
「プロジェクトごとに何度も作るような処理の一部をマイクロサービス化する」という仕事があって、その一環でStep Functionを使ったので、途中までやっていたことまとめ。
example project
ピタゴラスイッチ、その辛み
AWS Lambdaは便利だが、個別の非常に短いFunctionを繋いで1つの大きなフローを組む場合、状態管理に非常に気を使う側面があった。
エントリーポイントになるFunctionがそれぞれの個別タスクの処理結果を待ち受けするような処理にすれば話は簡単になるが、
親タスクが子タスクの状態を管理するような構成にすると、エントリーポイントそのもののタイムアウト時間を気にする必要も出てくる。
基本的にはタスク発火のエントリーポイントでは「処理依頼を投げたら投げっぱなし」にしてさっさとその役割を終えたい。
だが、こういうピタゴラスイッチ的な構成を組もうとすると、動作させたい個別のタスクごとにいちいちSQSやらDynamoやらでタスクキュー的な実装をする必要があった。1つの役割をするためだけに、なんか個々のサービスの関係性が複雑になってくる。
StepFunctions
昨年末発表のAWS Step Functions で、使ってみたらこの辺がだいぶ楽になりそうだった。
詳細は弊社橋本さんのこれを参照。
serverless-stepfunctions
Serverless Framework上でStep Functionsを定義するプラグインを早速作っていた方がいて、とても便利なので使っていく。
テスト的に組んだフローはこんな感じ
- エントリーポイントのfunctionsではタスク設定を書いたobjectを生成する
- Parallelステートで同じFunctionに処理を分岐。Filters機能を使って、エントリーポイントのoutputからで反応すべきkey指定
- 個々のFunctionは確率的にFail、リトライ
- 並行的に走らせたすべてのタスクが終わったら最後のFunctionsを起動
Parallelでは動的に並列タスクを増減できないのが弱点。
ここまでが出来るようになったServerlessFrameworkのexampleプロジェクトを、許可を取ったのでリポジトリ公開する。
処理の定義はこんな感じ。yaml見やすい。
serverless.yml
service: step-test
provider:
name: aws
runtime: nodejs4.3
stage: dev # replace your setting
profile: dev # replace your setting
region: ap-northeast-1 # replace your setting
package:
exclude:
- src/**
- node_modules/** # 今回は特に実行時依存モジュールないので外しておく
- .local/**
- .git/**
plugins:
- serverless-step-functions
- serverless-offline
stepFunctions:
stateMachines:
testStateMachine-v5:
Comment: "Input and Parallel Function"
StartAt: EntryState
States:
EntryState:
Type: Task
Resource: entry
Next: ChildState
ChildState:
Type: Parallel
Next: EndState
Branches:
- StartAt: Child0
States:
Child0:
Type: Task
InputPath: "$.0"
Resource: childTask
End: true
Retry:
- ErrorEquals:
- HandledError
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
- ErrorEquals:
- States.TaskFailed
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
- StartAt: Child1
States:
Child1:
Type: Task
InputPath: "$.1"
Resource: childTask
End: true
Retry:
- ErrorEquals:
- HandledError
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
- ErrorEquals:
- States.TaskFailed
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
- StartAt: Child2
States:
Child2:
Type: Task
InputPath: "$.2"
Resource: childTask
End: true
Retry:
- ErrorEquals:
- HandledError
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
- ErrorEquals:
- States.TaskFailed
IntervalSeconds: 5
MaxAttempts: 6
BackoffRate: 1
EndState:
Type: Task
Resource: lastTask
End: true
functions:
entry:
handler: build/src/testFuncHandler.entry
timeout :30
memorySize : 128
childTask:
handler: build/src/testFuncHandler.childTask
timeout : 30
memorySize : 128
lastTask:
handler: build/src/testFuncHandler.lastTask
timeout : 30
memorySize : 128
エントリーポイント、個別タスクのFunctionの定義はこんな感じ。
module.exports.entry = async (event ,context , callback)=>{
const hoge = {
0:{
id : 0,
name: "hoge"
},
1:{
id : 1,
name: "fuga"
},
2:{
id : 2,
name: "piyo"
}
};
callback(null , hoge);
}
/*
serverless logs -f childTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.childTask = async (event ,context , callback)=>{
const num = Math.random();
if (num < 0.25){
console.log("ERROR!!!")
const error = new Error("something is wrong");
return callback(error);
}
const name= event.name
callback(null , {Name : name});
}
/*
serverless logs -f lastTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.lastTask = async (event ,context , callback)=>{
console.log(event)
callback(null , event);
}
AWS console上でこのフローをイメージ化するとこんな感じになる。
いざタスクをinvokeすると、
sls invoke stepf --state testStateMachine-v5 --stage dev --profile dev --regeon ap-northeast-1
最後のタスクのeventには各並列処理の実行結果の配列が入ってくる。
{ executionArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:execution:step-test-dev-testStateMachine-v5:xxxxxxxxx',
stateMachineArn: 'arn:aws:states:ap-northeast-1:xxxxxxxxxx:stateMachine:step-test-dev-testStateMachine-v5',
name: 'xxxxxxxxxxxxxxxxxxx',
status: 'SUCCEEDED',
startDate: xxxxxx,
stopDate: xxxxxx,
input: '{}',
output: '[{"Name":"hoge"},{"Name":"fuga"},{"Name":"piyo"}]' }
まとめ、ToDo
各Functionのリトライ周りがすっきりした。すごい。
今回の要件が限定的なこと、StepFunctionsのテスト的なこともあって「常に複数のParallel Functionを静的に確保する」構成を作ってみたが、やはりここは何か気持ち悪い。
やっぱこれ、動的にParallel叩けるべきじゃないっすかねえ…。
次はこの辺考える。
- フロー自体がFailした時どこに投げるのが使用側が楽か
- マイクロサービス化する上で使用側のAPIはどうするべきなのか
- 最終的にActivtyステートでGoのワーカーを待ち受けさせ、RDSにデータを入れる