Golang or NodeJSで AWS Step Functions のActivityを書く
AWS Step Functionsによるピタゴラスイッチの終点で、「通常のEC2上でStep Functionsを待ち受け、RDS等になんか処理をするためのタスク」を書いている。
{
TaskToken:xxxxx
Input:<前段のタスクで処理したresponse>
}
ワーカーを作動させていると、こういう感じのデータがStepFunctionsから渡ってくるので、それを元に好きに処理して、SuccessやFailureリクエストをStep Functions側に返せばいい。
とりあえずDBへの登録処理とか、データの読み取りなしで実装してみる。
NodeJS
単純にNodeJSでやるならこういう感じ。
src/worker.ts
const AWS = require("aws-sdk");
const stepfunctions : AWS.StepFunctions = new AWS.StepFunctions({
apiVersion: '2016-11-23',
region : "ap-northeast-1"
});
(async ()=>{
while (1) {
await new Promise(async (resolve) => {
const stepFunctionParam = {
activityArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:activity:StepFunction-Called', /* required */
};
const data = await stepfunctions.getActivityTask(stepFunctionParam).promise();
if (data != null && data.taskToken != null){
console.log(data)
/* ... Do Something !!!!!!! */
const sendParams = {
output: "true", /* required */
taskToken: data.taskToken, /* required */
};
await stepfunctions.sendTaskSuccess(sendParams).promise()
}
resolve();
}).catch(error => console.error(error))
}
})()
走れ。
AWS_PROFILE=my_aws_profile NODE_ENV=test node build/worker.js
タスク部分並列処理にしたかったらawaitやらresolveの位置いじっておく。
とりあえずNodeJSで動作確認したが、この手のコードはgolangがポ〜タビリティ高くて良さそうなので、次はGo言語の入門がてら書いてみる。
Go
あとgoのパッケージマネージャツール(vendor管理)glideも使ってみた。
glide create
glide get github.com/aws/aws-sdk-go
しておく。
main.go
package main
import (
"fmt"
"github.com/aws/aws-sdk-go/service/sfn"
"github.com/aws/aws-sdk-go/aws/session"
"github.com/aws/aws-sdk-go/aws"
"time"
)
func asyncTask(sfnSession *sfn.SFN , resp *sfn.GetActivityTaskOutput){
/* ... Do Something !!!!!!! */
params := &sfn.SendTaskSuccessInput{
Output: aws.String("true"), // Required
TaskToken: resp.TaskToken, // Required
}
sfnSession.SendTaskSuccess(params)
}
func loop() {
awsConfig := &aws.Config{Region: aws.String("ap-northeast-1")}
sess, err := session.NewSession(awsConfig)
if err != nil {
fmt.Println("failed to create session,", err)
return
}
sfnSession := sfn.New(sess)
params := &sfn.GetActivityTaskInput{
ActivityArn: aws.String("arn:aws:states:ap-northeast-1:xxxxxxxxx:activity:StepFunction-Called"), // Required
}
resp, err := sfnSession.GetActivityTask(params)
if err != nil {
fmt.Println(err.Error())
return
} else if resp.TaskToken != nil {
fmt.Println("success")
// 並列処理
go asyncTask(sfnSession , resp)
}
fmt.Println(resp)
}
func main() {
for {
loop()
}
}
走れ。
AWS_PROFILE=my_aws_profile go run main.go
awsのconfigとかloopの外でよかったかな。
Go初めてなので参照周りやセッションとかもうちょっと調べる。