• home
  • AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい

AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい

書くこと

「プロジェクトごとに何度も作るような処理の一部をマイクロサービス化する」という仕事があって、その一環でStep Functionを使ったので、途中までやっていたことまとめ。
 
 
example project

ピタゴラスイッチ、その辛み

AWS Lambdaは便利だが、個別の非常に短いFunctionを繋いで1つの大きなフローを組む場合、状態管理に非常に気を使う側面があった。
エントリーポイントになるFunctionがそれぞれの個別タスクの処理結果を待ち受けするような処理にすれば話は簡単になるが、
親タスクが子タスクの状態を管理するような構成にすると、エントリーポイントそのもののタイムアウト時間を気にする必要も出てくる。
基本的にはタスク発火のエントリーポイントでは「処理依頼を投げたら投げっぱなし」にしてさっさとその役割を終えたい。

だが、こういうピタゴラスイッチ的な構成を組もうとすると、動作させたい個別のタスクごとにいちいちSQSやらDynamoやらでタスクキュー的な実装をする必要があった。1つの役割をするためだけに、なんか個々のサービスの関係性が複雑になってくる。
 

StepFunctions

昨年末発表のAWS Step Functions で、使ってみたらこの辺がだいぶ楽になりそうだった。
詳細は弊社橋本さんのこれを参照。
 

serverless-stepfunctions

Serverless Framework上でStep Functionsを定義するプラグインを早速作っていた方がいて、とても便利なので使っていく。

テスト的に組んだフローはこんな感じ

  • エントリーポイントのfunctionsではタスク設定を書いたobjectを生成する
  • Parallelステートで同じFunctionに処理を分岐。Filters機能を使って、エントリーポイントのoutputからで反応すべきkey指定
  • 個々のFunctionは確率的にFail、リトライ
  • 並行的に走らせたすべてのタスクが終わったら最後のFunctionsを起動
     

Parallelでは動的に並列タスクを増減できないのが弱点。
 
ここまでが出来るようになったServerlessFrameworkのexampleプロジェクトを、許可を取ったのでリポジトリ公開する。

処理の定義はこんな感じ。yaml見やすい。
 
serverless.yml
 

service: step-test

provider:
  name: aws
  runtime: nodejs4.3
  stage: dev # replace your setting
  profile: dev # replace your setting
  region: ap-northeast-1 # replace your setting

package:
  exclude:
   - src/**
   - node_modules/** # 今回は特に実行時依存モジュールないので外しておく
   - .local/**
   - .git/**

plugins:
  - serverless-step-functions
  - serverless-offline

stepFunctions:
 stateMachines:
   testStateMachine-v5:
     Comment: "Input and Parallel Function"
     StartAt: EntryState
     States:
       EntryState:
         Type: Task
         Resource: entry
         Next: ChildState
       ChildState:
         Type: Parallel
         Next: EndState
         Branches:
          - StartAt: Child0
            States:
              Child0:
                Type: Task
                InputPath: "$.0"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child1
            States:
              Child1:
                Type: Task
                InputPath: "$.1"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child2
            States:
              Child2:
                Type: Task
                InputPath: "$.2"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
       EndState:
         Type: Task
         Resource: lastTask
         End: true

functions:
  entry:
    handler: build/src/testFuncHandler.entry
    timeout :30
    memorySize : 128

  childTask:
    handler: build/src/testFuncHandler.childTask
    timeout : 30
    memorySize : 128

  lastTask:
    handler: build/src/testFuncHandler.lastTask
    timeout : 30
    memorySize : 128



 

エントリーポイント、個別タスクのFunctionの定義はこんな感じ。

module.exports.entry = async (event ,context , callback)=>{

    const hoge = {
        0:{
            id : 0,
            name: "hoge"
        },
        1:{
            id : 1,
            name: "fuga"
        },
        2:{
            id : 2,
            name: "piyo"
        }
    };

    callback(null , hoge);
}

/*
 serverless logs -f childTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
 * */
module.exports.childTask = async (event ,context , callback)=>{

    const num = Math.random();
    if (num < 0.25){
        console.log("ERROR!!!")
        const error = new Error("something is wrong");
        return callback(error);
    }

    const name= event.name
    callback(null , {Name : name});
}

/*
 serverless logs -f lastTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.lastTask = async (event ,context , callback)=>{
    console.log(event)
    callback(null , event);
}

 
AWS console上でこのフローをイメージ化するとこんな感じになる。
 


 

いざタスクをinvokeすると、

sls invoke stepf --state testStateMachine-v5 --stage dev --profile dev --regeon ap-northeast-1 

 
最後のタスクのeventには各並列処理の実行結果の配列が入ってくる。
 

{ executionArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:execution:step-test-dev-testStateMachine-v5:xxxxxxxxx',
  stateMachineArn: 'arn:aws:states:ap-northeast-1:xxxxxxxxxx:stateMachine:step-test-dev-testStateMachine-v5',
  name: 'xxxxxxxxxxxxxxxxxxx',
  status: 'SUCCEEDED',
  startDate: xxxxxx,
  stopDate: xxxxxx,
  input: '{}',
  output: '[{"Name":"hoge"},{"Name":"fuga"},{"Name":"piyo"}]' }

 

まとめ、ToDo

各Functionのリトライ周りがすっきりした。すごい。
今回の要件が限定的なこと、StepFunctionsのテスト的なこともあって「常に複数のParallel Functionを静的に確保する」構成を作ってみたが、やはりここは何か気持ち悪い。
やっぱこれ、動的にParallel叩けるべきじゃないっすかねえ…。

次はこの辺考える。

  • フロー自体がFailした時どこに投げるのが使用側が楽か
  • マイクロサービス化する上で使用側のAPIはどうするべきなのか
  • 最終的にActivtyステートでGoのワーカーを待ち受けさせ、RDSにデータを入れる