Golang or NodeJSで AWS Step Functions のActivityを書く

AWS Step Functionsによるピタゴラスイッチの終点で、「通常のEC2上でStep Functionsを待ち受け、RDS等になんか処理をするためのタスク」を書いている。

{
  TaskToken:xxxxx
  Input:<前段のタスクで処理したresponse>
}

ワーカーを作動させていると、こういう感じのデータがStepFunctionsから渡ってくるので、それを元に好きに処理して、SuccessやFailureリクエストをStep Functions側に返せばいい。
とりあえずDBへの登録処理とか、データの読み取りなしで実装してみる。

NodeJS

単純にNodeJSでやるならこういう感じ。

src/worker.ts

const AWS = require("aws-sdk");

const stepfunctions  : AWS.StepFunctions = new AWS.StepFunctions({
    apiVersion: '2016-11-23',
    region : "ap-northeast-1"
});


(async ()=>{
    while (1) {
        await new Promise(async (resolve) => {
            const stepFunctionParam = {
                activityArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:activity:StepFunction-Called', /* required */
            };

            const data = await stepfunctions.getActivityTask(stepFunctionParam).promise();

            if (data != null && data.taskToken != null){
                console.log(data)

         /* ... Do Something !!!!!!! */

                const sendParams = {
                    output: "true", /* required */
                    taskToken: data.taskToken, /* required */
                };   

                await stepfunctions.sendTaskSuccess(sendParams).promise()

            }
            resolve();
        }).catch(error => console.error(error))
    }
})()

 
走れ。
 

AWS_PROFILE=my_aws_profile NODE_ENV=test node build/worker.js

 

タスク部分並列処理にしたかったらawaitやらresolveの位置いじっておく。
とりあえずNodeJSで動作確認したが、この手のコードはgolangがポ〜タビリティ高くて良さそうなので、次はGo言語の入門がてら書いてみる。

Go

あとgoのパッケージマネージャツール(vendor管理)glideも使ってみた。

glide create
glide get github.com/aws/aws-sdk-go

しておく。

main.go

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/service/sfn"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/aws"
    "time"
)

func asyncTask(sfnSession *sfn.SFN , resp *sfn.GetActivityTaskOutput){

        /* ... Do Something !!!!!!! */

    params := &sfn.SendTaskSuccessInput{
        Output:    aws.String("true"),      // Required
        TaskToken: resp.TaskToken, // Required
    }
    sfnSession.SendTaskSuccess(params)

}

func loop() {

    awsConfig :=  &aws.Config{Region: aws.String("ap-northeast-1")}

    sess, err := session.NewSession(awsConfig)
    if err != nil {
        fmt.Println("failed to create session,", err)
        return
    }

    sfnSession := sfn.New(sess)

    params := &sfn.GetActivityTaskInput{
        ActivityArn: aws.String("arn:aws:states:ap-northeast-1:xxxxxxxxx:activity:StepFunction-Called"), // Required
    }
    resp, err := sfnSession.GetActivityTask(params)

    if err != nil {
        fmt.Println(err.Error())
        return
    } else if resp.TaskToken != nil {
        fmt.Println("success")

        // 並列処理
        go asyncTask(sfnSession , resp)
    }

    fmt.Println(resp)

}

func main() {

    for {
        loop()
    }

}

 
走れ。

AWS_PROFILE=my_aws_profile go run main.go

awsのconfigとかloopの外でよかったかな。
Go初めてなので参照周りやセッションとかもうちょっと調べる。

 

Read More

AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい

書くこと

「プロジェクトごとに何度も作るような処理の一部をマイクロサービス化する」という仕事があって、その一環でStep Functionを使ったので、途中までやっていたことまとめ。
 
 
example project

ピタゴラスイッチ、その辛み

AWS Lambdaは便利だが、個別の非常に短いFunctionを繋いで1つの大きなフローを組む場合、状態管理に非常に気を使う側面があった。
エントリーポイントになるFunctionがそれぞれの個別タスクの処理結果を待ち受けするような処理にすれば話は簡単になるが、
親タスクが子タスクの状態を管理するような構成にすると、エントリーポイントそのもののタイムアウト時間を気にする必要も出てくる。
基本的にはタスク発火のエントリーポイントでは「処理依頼を投げたら投げっぱなし」にしてさっさとその役割を終えたい。

だが、こういうピタゴラスイッチ的な構成を組もうとすると、動作させたい個別のタスクごとにいちいちSQSやらDynamoやらでタスクキュー的な実装をする必要があった。1つの役割をするためだけに、なんか個々のサービスの関係性が複雑になってくる。
 

StepFunctions

昨年末発表のAWS Step Functions で、使ってみたらこの辺がだいぶ楽になりそうだった。
詳細は弊社橋本さんのこれを参照。
 

serverless-stepfunctions

Serverless Framework上でStep Functionsを定義するプラグインを早速作っていた方がいて、とても便利なので使っていく。

テスト的に組んだフローはこんな感じ

  • エントリーポイントのfunctionsではタスク設定を書いたobjectを生成する
  • Parallelステートで同じFunctionに処理を分岐。Filters機能を使って、エントリーポイントのoutputからで反応すべきkey指定
  • 個々のFunctionは確率的にFail、リトライ
  • 並行的に走らせたすべてのタスクが終わったら最後のFunctionsを起動
     

Parallelでは動的に並列タスクを増減できないのが弱点。
 
ここまでが出来るようになったServerlessFrameworkのexampleプロジェクトを、許可を取ったのでリポジトリ公開する。

処理の定義はこんな感じ。yaml見やすい。
 
serverless.yml
 

service: step-test

provider:
  name: aws
  runtime: nodejs4.3
  stage: dev # replace your setting
  profile: dev # replace your setting
  region: ap-northeast-1 # replace your setting

package:
  exclude:
   - src/**
   - node_modules/** # 今回は特に実行時依存モジュールないので外しておく
   - .local/**
   - .git/**

plugins:
  - serverless-step-functions
  - serverless-offline

stepFunctions:
 stateMachines:
   testStateMachine-v5:
     Comment: "Input and Parallel Function"
     StartAt: EntryState
     States:
       EntryState:
         Type: Task
         Resource: entry
         Next: ChildState
       ChildState:
         Type: Parallel
         Next: EndState
         Branches:
          - StartAt: Child0
            States:
              Child0:
                Type: Task
                InputPath: "$.0"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child1
            States:
              Child1:
                Type: Task
                InputPath: "$.1"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child2
            States:
              Child2:
                Type: Task
                InputPath: "$.2"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
       EndState:
         Type: Task
         Resource: lastTask
         End: true

functions:
  entry:
    handler: build/src/testFuncHandler.entry
    timeout :30
    memorySize : 128

  childTask:
    handler: build/src/testFuncHandler.childTask
    timeout : 30
    memorySize : 128

  lastTask:
    handler: build/src/testFuncHandler.lastTask
    timeout : 30
    memorySize : 128



 

エントリーポイント、個別タスクのFunctionの定義はこんな感じ。

module.exports.entry = async (event ,context , callback)=>{

    const hoge = {
        0:{
            id : 0,
            name: "hoge"
        },
        1:{
            id : 1,
            name: "fuga"
        },
        2:{
            id : 2,
            name: "piyo"
        }
    };

    callback(null , hoge);
}

/*
 serverless logs -f childTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
 * */
module.exports.childTask = async (event ,context , callback)=>{

    const num = Math.random();
    if (num < 0.25){
        console.log("ERROR!!!")
        const error = new Error("something is wrong");
        return callback(error);
    }

    const name= event.name
    callback(null , {Name : name});
}

/*
 serverless logs -f lastTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.lastTask = async (event ,context , callback)=>{
    console.log(event)
    callback(null , event);
}

 
AWS console上でこのフローをイメージ化するとこんな感じになる。
 


 

いざタスクをinvokeすると、

sls invoke stepf --state testStateMachine-v5 --stage dev --profile dev --regeon ap-northeast-1 

 
最後のタスクのeventには各並列処理の実行結果の配列が入ってくる。
 

{ executionArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:execution:step-test-dev-testStateMachine-v5:xxxxxxxxx',
  stateMachineArn: 'arn:aws:states:ap-northeast-1:xxxxxxxxxx:stateMachine:step-test-dev-testStateMachine-v5',
  name: 'xxxxxxxxxxxxxxxxxxx',
  status: 'SUCCEEDED',
  startDate: xxxxxx,
  stopDate: xxxxxx,
  input: '{}',
  output: '[{"Name":"hoge"},{"Name":"fuga"},{"Name":"piyo"}]' }

 

まとめ、ToDo

各Functionのリトライ周りがすっきりした。すごい。
今回の要件が限定的なこと、StepFunctionsのテスト的なこともあって「常に複数のParallel Functionを静的に確保する」構成を作ってみたが、やはりここは何か気持ち悪い。
やっぱこれ、動的にParallel叩けるべきじゃないっすかねえ…。

次はこの辺考える。

  • フロー自体がFailした時どこに投げるのが使用側が楽か
  • マイクロサービス化する上で使用側のAPIはどうするべきなのか
  • 最終的にActivtyステートでGoのワーカーを待ち受けさせ、RDSにデータを入れる

Read More

AWS Step FunctionsでLambda Functionでのサムネイル画像作成をシンプルに

この記事はWanoグループ Advent Calendar 2016の23日目の記事です。
Wanoグループ内でのLambda推しエンジニアとして、前回のLambda@Edgeに引き続き、2016年Re:Inventで発表された、AWS Step Functionsについての書いてみたいと思います。

AWS Step Functionsで解決できること

なんと言っても以下の2点だと思います。

  • Lambda Functionを小さい単位で維持できる
  • Lambda Function間の連携にSQSやDynamoDB等を使用する必要がなくなる

Wanoでも複数のLambda Functionを活用していますが、1つのFunctionの中でいろいろなことをやりすぎたため、他のサービスにそのまま転用できなかったり、コードの見通しが悪くなったりと言ったことが出てきました。
1つ1つのFunctionを細かく分けて、SQS等を使用して連携されることも検討しましたが、リトライや分岐のためにFunctionを用意しないといけないといったことを考えると、腰が重くなっていました。
その問題点を解決してくれるのがAWS Step Functionsです。

Lambda Functionの事例で見かけるサムネイル画像作成をStep Functionsで

Lambda Functionsの事例として、サムネイル画像を作成するサンプルをよく見かけます。
この記事でもサムネイル画像作成を例にし、以下のサムネイル画像あるあるを解決する処理をLambda FunctionとStep Functionsで実現したいと思います。

サムネイル画像あるあるとは

  1. せっかくサムネイル画像で小さくしたのに、Google PageSpeedで調べたら、ロスレス圧縮したら46%小さくなるよと言われる
  2. 適切なContent-Typeヘッダーが付いておらずブラウザで想定外の挙動をする
  3. Cache-Controllヘッダーが付いておらずブラウザキャッシュが効かない

勝手にあるあると言っているだけで、私が良く忘れるだけという可能性が大きいです。。。が、今回はこれを解決することを前提として、進めたいと思います。

Lambda Functionを実装する

Lambda Functionを小さい単位で維持することが重要なので、以下の機能で分けてLambda Functionを実装したいと思います。

  1. ResizeImage: 画像をリサイズするFunction
  2. MinifyJpeg: JPEG画像をロスレス圧縮するFunction
  3. MinifyPng: PNG画像をロスレス圧縮するFunction
  4. SetMeta: メタデータを設定するFunction

実装できたら、これらのLambda Functionを予めデプロイしておきます。
ちなみに私は以前apexを使っていましたが、最近serverless frameworkに乗り換えを進めています。
その内この辺りも記事にできたらなと。

AWS Step Functionsを設定する

State Machineを作る

Step Functionsの設定画面でState Machineを作ります。
設定ファイルは以下です。

注意
2016年12月23日時点で、State Machineの修正はできません。State Machineの設定ファイルもバージョン管理対象としておくことを強くおすすめします(泣きを見ました。。。)
{
  "Comment": "An example of the Amazon States Language using a choice state.",
  "StartAt": "ResizeImage",
  "States": {
    "ResizeImage": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:FUNCTION_NAME",
      "Next": "JpegOrPng"
    },
    "JpegOrPng": {
      "Type" : "Choice",
      "Choices": [
        {
          "Variable": "$.object.fileType",
          "StringEquals": "jpeg",
          "Next": "MinifyJpeg"
        },
        {
          "Variable": "$.object.fileType",
          "StringEquals": "png",
          "Next": "MinifyPng"
        }
      ],
      "Default": "UnsupportedImage"
    },
    "MinifyJpeg": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:MinifyJpeg",
      "Next": "SetMeta"
    },
    "MinifyPng": {
      "Type" : "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:MinifyPng",
      "Next": "SetMeta"
    },
    "UnsupportedImage": {
      "Type": "Fail",
      "Cause": "No Matches!"
    },
    "SetMeta": {
      "Type": "Task",
      "Resource": "arn:aws:lambda:REGION:ACCOUNT_ID:function:SetMeta",
      "End": true
    }
  }
}

上記ファイルでの設定結果として以下のState Machineができます。

State Machineを実行する

State Machineができたら、早速実行してみましょう。
Lambda Function同様コンソールからJSONを指定して実行できますので、そちらで実行します。

動きました!!
今回は、JPEG画像をイベントとして渡したので、MinifyJpegに遷移しています。
PNG画像を渡せばMinifyPngに遷移する。。。はず(もう23日が終わりそうなので、試す時間がなかったり。。。)

State Machine実行時の注意点

注意点としては、Lambda Functionの実行ではなく、State Machineの実行という点です。
今回StartAtに指定したResizeImageが起動するイベントを実行してもState Machineは実行されず、State Machineの起動が必要になります。
残念ながら、現時点ではState Machineをイベントドリブンにすることはできないようです。
実運用では監視したいイベントを割り当てたLambda Functionを用意して、そちらからState Machineを実行するような構成にする必要があります。

まとめ

いくつかの課題はあるものの、AWS Step Functionsを使うことで、Lambda Function間の連携がシンプルになることは間違いありません。
課題自体も恐らく時間の問題で改善されると思いますし、serverless frameworkといった周辺ツールも続々対応してくると思うので、今後も注目です!!

Read More