[AWS Summit Tokyo 2017 Day3] AWSが支えるEightのリコメンデーションエンジンの裏側

名刺管理で有名な株式会社SanSanさんのアプリ、「Eight」での事例です。
とても濃い内容で、StepFunctionsの待ち受け運用とか非常に参考になりました。

ビジネス

  • 個人向け名刺管理アプリ、Eight
  • 名刺の唾がりをビジネスに変える/つながった相手とコミュニケーション
  • 日本の名刺交換の10%をさばいている!(!)
  • 名刺交換のUpdate.相手が転職しても繋がりを維持する
  • 「つながり」をリコメンデーションエンジンで

問題/旧リコメンデーション

  • RedshiftとEC2上のリコメンド計算エンジンで計算、CloudSearchでリコメンド
  • Redshift、複雑怪奇なクエリ、集計性能に依存
  • CloudSearch,全文検索エンジンとしてではなく関係性スコアのソートのみに使っていた
  • パフォーマンスダウン!
  • オペレーションコスト!

リコメンドアーキテクチャ刷新。

  • データ分析
  • アルゴリズム
  • リアルタイム性 <= もっともここを大事に

リアルタイムリコメンデーションエンジンを作り直そう

  • ログデータは今まで通りRedShift
  • 中間データ更新にKinesis + Lambda
  • 中間データはDynamoDBをストレージに

  • => 2ヶ月でできた!

構成

ストリーム
* Kinesis,DynamoDB Stream
コンシューマ
* Lambda
ストレージ
* RedShift,DynamoDB,ElastuCache

  • 名刺間のレーテイングデータをひたすらDynamoとLambdaで生成する。
  • データが更新されたらSQS発火、EC2上のワーカーが中間データから実際のリコメンドデータを生成

コツ

  • KinesisはPut Recordsでリクエストをまとめる
  • メモリ設定でリソースを増やす
  • DynamoDBはなるべくBatchWriteを使う
  • パフォーマンスが大幅に改善された

  • Function数の増加問題

    • Functionをシンプルにしすぎると数が増えすぎて管理できない問題
    • ある意味Function内でルーティングすることでストリームの種類によって処理を分岐する
  • ストリーム – コンシューマ問題
    • StreamとLambdaがお互いを維持できるバランスにならない
    • ストリームをLambdaが書き直して差戻すFunctionを作る(分身の術)

残った問題と解決

  • レーティングデータの陳腐化
  • アルゴリズムが変わり中間データの価値がなくなる
  • RedShiftにある過去ログから全て中間データを作り直した! => Data Pipelineを利用して数時間で再生成!
  • リコメンドのマイグレーション

    • まずLambdaを停止
    • 再生成してから再起動
    • まるで心臓バイパス手術
    • ダウンタイムなしでやるにはワークフロー大事
  • そこでちょうどリリースされたAWS Step Function

    • Lambdaの処理完了
    • DataPipilineの処理まち
    • タイムアウトはStepFunctionの定間隔リトライが使える!!!!!!!!

サーバーレスの利点

* スケーラブル
* 柔軟性
* 気軽さ

Read More

[AWS Summit Tokyo 2017 Day3] AWS Lambdaで変わるバッチの世界 – CPUトータル100時間を10分で終わらせるには –

ERPの開発をしている 株式会社ワークスアプリケーションさんでの事例です。
どちらかというとエンタープライズ向けっぽい会社さんですね。
ClosureコンパイラをLambda上に載せる、VPC多用の必要がどうしてもある、あたりがならではという感じで面白かったです。

事例

  • 画面の高速描画のための前処理

問題

  • HTML,JS,CSSの最適化
  • HTMLテンプレートの事前コンパイル
  • よくある構成
    • Hadoop,Spark,インスタンス並列化
    • ガチでやると100時間
    • 長い!コスト高い!
    • 終わらないインスタンス最適化作業
    • 10分で支度しな(意訳)

そこでLambda

Lambdaの特徴

  • スケーリング管理コスト0
  • インスタンス管理コスト0
  • 100ms単位の課金
  • 選べるランタイム / 箇所ごとに柔軟に言語を選ぼう
  • 流動的な分散数と処理時間にマッチ
    => やってみようと思った

Lambda

  • Before: 以前の処理
    • javaのwebフレームワークが画面のリストを作成/HTMLを最適化/jsのコンパイル/cddのコンパイル
    • この辺のフローが同期的
    • 画面が大きくなるとスケールしない
    • Closure CompilerをSpringFWで動かしていた
    • Less(CSS)コンパイル => SpringFW上で動作
  • After: 最適化
    • jar作成時にコードにキャッシュしとける情報はする
    • HTMLを最適化するだけのfunctionとして整理
    • GoogleClojureコンパイラ単体で動作するFunctionに
    • LessのコンパイルのFunctionはnodejsとして分離

SpringFrameworkをLambdaで動かすか否か

  • ガチで動かすと占有時間が長い
    • => そこでコンストラクタだけを使う
  • コンテナのコールドスタートも適切に利用
    • => 初期化処理にシングルトンを利用して余計な処理を省く

起動とプロセス管理

  • S3にファイルをputすることで処理を与える。
  • 実行ファイル名のサフィックスで処理の分岐を行う
  • ピタゴラ処理のエビデンスにPending,RunnningなどステータスをS3に随時吐く(!)
  • 進捗はs3上に置いたプロセス進捗ファイルだけをlistすることで確認する

Trouble Shooting

  • 速度は思ったより早くない

    • 128MB => 1546MB
    • CPUパワーはメモリ容量に比例 1.5GBだと2コア/3コア利用可能
  • 同時実行数上限3000で運用
    だが、実行数が上がらない,なぜ。。。 => VPCのせい

  • Lessのコンパイルがエラー

    • Lambdaの起動よりS3のPut時間が後の場合があった <= !?
    • s3のイベントを使う場合は結果整合性であることを忘れないように
  • s3のイベントが登録されていない

    • Ansibleで1つのbucketに100Function設定しているはず
    • 1この登録を1回回すんじゃなく1度で100イベント登録したらうまくいった
  • 想定金額より多い

    • Cloudwatch Logsのログのせい

デプロイ

  • nodeはzip,javaはjar
  • お客さんごとに別VPC/1bucket。イベント当てるのはAnsibleでやる

まとめ

  • 10分半になった!

課題

  • 今度はDBがスケールしなくて高負荷に耐えられない問題

Read More

[AWS Summit Tokyo Day3] AWS Lambdaを使ったモバイルバックエンドのサーバーレス開発事例

「株式会社ワイヤアンドワイヤレス」さんでのモバイルアプリのバックエンドのサーバーレス化の事例をお聞きしてきました。
ロギングやデプロイフローなど、みんなどうしてるんだろー?って思ってところが概観できたと思います。

ビジネス

  • 公衆WIFI事業者
  • TRABEL_JAPAN WIFI
  • 外国人旅行者向け。国内にはユーザー動向や広告配信を提供

システム構成

  • v1
    • ELB,EC2等を利用した一般的な構成
    • ロジックがクライアントに集中してしまっていいた
  • v2
    • ELBをもうひとつ前に
  • v3
    • API GatewayとLambdaを全面に出した

サーバーレス?

  • 用意するものはコードと構成パラメータ
  • API Gateway通った上でEC2で
  • 基本はAPI Gatewayを前に置くベース
  • Dynamo,S3への書き込み

API Gareway / Lambdaの実装方針

  • Lambdaファンクション自体は同じものを使って、ビルドスクリプト側で環境を分ける

苦労した点

  • Lambdaの起動時間 (Java)
    • VPC内の運用、ENIをアタッチする時間が膨大
    • CloudwatchのイベントでVPCないのLambdaを定期的に起動してあっためる
    • 起動時間が短縮される
  • Gatewayのレイテンシ
    • 常に同期処理になるのでAPI Gatewayとの相性は良い
    • なのでLambdaを非同期でつなぐことで処理が一定になる
  • E2Eテスト問題
  • ログ/エラー箇所の問題

APIのリソースとLambdaのデプロイ

  • API GatewayはSwaggerで管理できる
  • APIリソースとlambdaを1対1にした
  • 負荷が高い時は例外的に別の関数
  • SAMを使っている

Lambda関数の言語間検証

  • JVM系はまあ遅い
  • CPUがっつり使う処理があればJavaとかのが早いかも

Lambdaのテスト

  • ユニットテスト
    • AWSサービスのモックを内製
    • pythonのMagic Mock
    • あるいはAWS上に環境を準備
  • クラウド使ってるんだから
    • モック使わずに
    • SAMでサービスをデプロイ
    • テスト終わってから殺す

パッケージング/デプロイ

  • プレフィックスをつけて試験/本番をデプロイ
  • SAM => GitLab => Jenkins

ロギング

  • aws-cli/jqでパースする
  • アプリケーションログにはrequestIdを出力
  • X-Rayでトランザクション数/Durationが確認可能
  • agentが入れられない代わりにx-rayがある

まとめ

  • 導入に関しては開発のシステム構成の概念が違うためそれなりの学習は必要になる

Read More

[AWS Summit Tokyo 2017 Day3] EPSONにおけるサーバーレスアーキテクチャ導入事例

Day3です。コンピューターのバッテリーがやばいです。
ブロックチェーンやKinesisに関してのセッションを今日は拝聴していました。
本稿では、EPSON社のサーバーレスの導入事例のセッションの様子を記そうと思います。

サービス

プリンターからのレシートを解析、DB上にストアするアプリケーション
その情報をネイティブアプリから見れるサービス

課題

AWS上でサービスを展開する上で、オンプレの思考のまま設計
Web API => EC2インスタンスにwebアプリを置いてビジネスロジックを持ち込む => RDSにレシートデータ蓄積

  • 密結合
  • 毎回インフラ側にスケール依頼をしていて立ち行かなくなってきた
  • 同期処理でのブロック処理

プリンシプル

  • スケーラブルにする
  • サービス間を疎結合にする
  • マネージドサービスの活用

アーキテクチャ

AWSのマネージドサービスをフル活用することで活用する
(ほぼ)世界の各ロケーションでフルスケーラブル/高可用の環境が構築できた。

  • 情報の格納

    • プリンター => EC2 => Kinesis => lambdaでレシートをS3に格納
    • s3のレシートのputからSQSを発火、AWS Beanstalk上のworkerで解析。
    • 解析結果をさらにKinesis Streamに入れ込み、Auroraへ格納
  • ユーザーアプリからサービスにアクセスする

    • 実際にユーザーのクライアントアプリからレシートの検索結果を見る。APIGateWay/LambdaでAPIを書いている。
    • このLambdaはS3やAuroraにアクセスしてユーザーに情報を返す。
  • システムのOverViewを作っておくと、自分が今何をしているのかとか、

  • 本格実装して半年で軌道に乗った

「新しい作り方」に会社として向き合うには

  • 啓発/習得/権限
    • 背中を押す経験 -> ワークショップの開催
    • ベンダーロックインの議論にははじめの実績で答えを出す
    • 権限と責任範囲を明確にした
    • 細かく作って細かく壊す
    • ビジネスロジックを最適に分割/コスト資産

まとめ

  • ビジネスは容赦なく変化し、アーキテクチャは破綻する
  • 踏み出す勇気と受けいれる心
  • 現実的なフルスタックエンジニアリング
  • サーバーレスによって「担当範囲のスコープ」だけでなく「価値提供に集中できるチーム/メンバーが自己成長できるチーム」が出来上がった

Read More

[AWS Summit Tokyo Day2] 中堅企業での事例に学ぶAWS活用サクセスストーリー

AWS Summit Tokyoが始まりました!
これから3日間、気になった講演ごとにメモしたことをレポしようかな!と思います!

早速始めます。

表題の通り、最初の記事は 中堅企業での事例に学ぶAWS活用サクセスストーリーです。


Rekognition活用事例 – 千株式会社

AWSへの移行

  • イベントフォトのネット販売サービス
  • 2013-2016にオンプレから移行
  • 大容量の画像をNFSに保持していたのを、 s3に移行した

Rekognitionの活用

  • 深層学習による顔認識サービス

  • 運動会などの膨大な画像の中で、「自分の子供だけこの写真の中から集めたい」という要求への対応

  • 顔に対してcreate_indexする()

  • 現在Tokyo Regeonでサポートされていない => Rekognitionは同リージョンのs3しか使えない
    => そこで、オレゴンで顔index作成後、オレゴンのs3からは画像のレプリカを消すことにした

  • 顔検索APIはcroudfront => API Gateway => Lambda => Rekognition の流れで使う


AWS Direct Connectの活用 コールダイレクト株式会社

  • コールセンターシステム をオンプレからAWS Direct Connectに移行
  • Amazon Connectというクラウド型のコンタクトセンターを使う

東横インIT集客ソリューション

東横インの予約システムの開発をしている会社さん。

セキュリティの担保

  • PCI DSSというセキュリティ要件に準拠した決済システムを構築する必要がある。

  • PCI DSSに準拠してクレジットカード情報等を安全に保持するには? => AWS KMSの利用!

  • オンプレで暗号化すると開発・運用コストが辛い

  • IAMで取り出し、Cloudtrailに取り出し記録が残るのでPCI DSSに準拠できた!


株式会社千代田グラビヤ

社内システムのAWSヘの移行

各SaaSの接続

  • 勤怠システムや社内スケジュールを様々な外部サービスをまたいで使っている。
  • そのHUbとしてMasterのDBをRDS Oracleで保持
  • EBSボリュームの使用

株式会社アトラ工

求人メディアGreenのオンプレからAWSヘの移行

AWS DMSの使用

  • 他社クラウドRDBをマイグレーションするサービスを使って、随時データベースをAWSのAuroraに移行した。
  • ダウンタイムがほとんどなく移行が完了

まとめ

xxxがしたい!という要求があれば、AWSバートナーにどんどん聞いてください!とのことでした。

Read More

AWSマルチアカウントのS3間のcopyのメモ

AWSアカウントAにIAMユーザーA1,
AWSアカウントBにIAMユーザーB1がいるとする。

アカウントAのS3からアカウントBのS3にオブジェクトのコピーを行うには、通常、ユーザーA1としてアカウントAのS3からオブジェクトをgetし、ユーザーB1としてアカウントBのS3にputする方法が考えられる。
 
そうではなく、S3からS3に直接オブジェクトをcopyしたい。
 
そこで、コピー元の対象bucketのバケットポリシーに以下を追加する。
 

{
   "Version": "2012-10-17",
   "Statement" : {
      "Effect":"Allow",
      "Sid":"ReadAccess",
      "Principal" : {
          "AWS":["arn:aws:iam::アカウントBのユーザーB1"]
      },
      "Action":"s3:GetObject",
      "Resource":"arn:aws:s3:::コピー元対象bucket/*"
   }
}

 
cliでのアクセスは以下の通り。クライアントのコンピュータを介することなく複数のprofileを跨いだcopyやsyncができる。

 

aws s3 cp --profile user_A1 s3://A-bucket/object.png --profile user_B1 s3://B-bucket/

Read More

AWS Elastic TranscoderとKey Management Serviceを使って素敵にHTTP Live Streaming

HTTP Live Streamingとは

HTTP Live Streaming(HLS)というのがあります。
https://developer.apple.com/streaming/

Apple神が作った映像や音声のストリーミングプロトコルです。
ざっくり言うと、音声ファイルを短く分割したリソースファイル(.ts)と、分割したファイルを管理するプレイリストファイル(.m3u8)の2つを使って、HTTPプロトコルにのっとってダウンロードしつつ再生すればいいじゃん、的なやつです。
既存のプロトコルベースだし実装が単純なので色んな所で使われています。
最近だとAbemaTVとか。

これが便利なのは、リソースの暗号化と、回線に応じたリソースファイルの出し分けが規定されているのです。
https://tools.ietf.org/html/draft-pantos-http-live-streaming-20#section-4.3.2.4
https://tools.ietf.org/html/draft-pantos-http-live-streaming-20#section-4.3.4.2

この規定にのっとって、複数のビットレートのリソースファイルと暗号化キーを用意しておけば、あとは何も考えずに対応するプレイヤーにぶち込めが勝手にいい感じに再生してくれる、と。
あら便利。
というわけで、コイツをAWSのElastic Transcoderと、暗号化キーを管理するKey Management Serviceを使って、リソースファイルと暗号化を自動化できないかなー、と思ってやってみたらできたのでメモ。

作り方

  1. Identity and Access Management(IAM)のページのメニューにある、Encryption keys(日本語だと暗号化キー)の項目からKey Management Serviceに飛ぶ。(わかるかこんなの)
  2. KMSでで暗号化キーを作る。キモはTranscoderで使うS3のバケットと同じリージョンで作ることと、キー管理者とキーユーザーに”Elastic_Transcoder_Default_Role”を指定すること。これをやらないと何度やってもエンコードエラーになる。
  3. TranscoderでPipelineを作る。ここではEncryptionの項目に、先程KMSで設定したマスターキーのARNの設定をすること。

  1. TranscoderのJobを作る。出力するビットレート毎にOutput Detailsを作り、Playlistの項目は一つだけでOutputs in Master Playlistの項目に、Output Detailsで設定したOutputをすべて追加する。

  1. おもむろにCreate New Jobを押して出来上がりを待つ

確認

ファイルの確認

今回エンコードされたファイルはこんな感じ。

Test001
├── 160k
│   ├── track.key
│   ├── track.m3u8
│   ├── track00000.ts
│   ├── track00001.ts
│   ├── track00002.ts
│   ├...
│   └── track00064.ts
├── 64k
│   ├── track.key
│   ├── track.m3u8
│   ├── track00000.ts
│   ├── track00001.ts
│   ├── track00002.ts
│   ├...
│   └── track00064.ts
└── Test001.m3u8

マスタープレイリストのTest001.m3u8の中身はこんな。

#EXTM3U
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=189000,CODECS="mp4a.40.2"
160k/track.m3u8
#EXT-X-STREAM-INF:PROGRAM-ID=1,BANDWIDTH=89000,CODECS="mp4a.40.5"
64k/track.m3u8

それぞれのリソースのプレイリストはこんな感じ。

#EXTM3U
#EXT-X-VERSION:3
#EXT-X-MEDIA-SEQUENCE:0
#EXT-X-ALLOW-CACHE:YES
#EXT-X-TARGETDURATION:11
#EXT-X-KEY:METHOD=AES-128,URI="track.key",IV=0x420473ab30beeaabfe1aa878fda4b312
#EXTINF:10.007800,
track00000.ts
#EXTINF:10.007789,
track00001.ts
#EXTINF:9.984567,
track00002.ts
...
#EXTINF:0.116089,
track00064.ts
#EXT-X-ENDLIST

問題なさげ。

再生テスト

Test001ディレクトリー以下を全部ダウンロードしてきて、マスタープレイリストをVLCメディアプレイヤーにつっこんだらちゃんと再生できるか確認。

あと、
http://dev.classmethod.jp/smartphone/iphone/network-link-conditioner/
にあるNetwork Link Conditionerを使って回線速度を制限して、ちゃんと64kbpsのファイルがダウンロードされるかどうかを串刺して確認。

できた。

Read More

Golang or NodeJSで AWS Step Functions のActivityを書く

AWS Step Functionsによるピタゴラスイッチの終点で、「通常のEC2上でStep Functionsを待ち受け、RDS等になんか処理をするためのタスク」を書いている。

{
  TaskToken:xxxxx
  Input:<前段のタスクで処理したresponse>
}

ワーカーを作動させていると、こういう感じのデータがStepFunctionsから渡ってくるので、それを元に好きに処理して、SuccessやFailureリクエストをStep Functions側に返せばいい。
とりあえずDBへの登録処理とか、データの読み取りなしで実装してみる。

NodeJS

単純にNodeJSでやるならこういう感じ。

src/worker.ts

const AWS = require("aws-sdk");

const stepfunctions  : AWS.StepFunctions = new AWS.StepFunctions({
    apiVersion: '2016-11-23',
    region : "ap-northeast-1"
});


(async ()=>{
    while (1) {
        await new Promise(async (resolve) => {
            const stepFunctionParam = {
                activityArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:activity:StepFunction-Called', /* required */
            };

            const data = await stepfunctions.getActivityTask(stepFunctionParam).promise();

            if (data != null && data.taskToken != null){
                console.log(data)

         /* ... Do Something !!!!!!! */

                const sendParams = {
                    output: "true", /* required */
                    taskToken: data.taskToken, /* required */
                };   

                await stepfunctions.sendTaskSuccess(sendParams).promise()

            }
            resolve();
        }).catch(error => console.error(error))
    }
})()

 
走れ。
 

AWS_PROFILE=my_aws_profile NODE_ENV=test node build/worker.js

 

タスク部分並列処理にしたかったらawaitやらresolveの位置いじっておく。
とりあえずNodeJSで動作確認したが、この手のコードはgolangがポ〜タビリティ高くて良さそうなので、次はGo言語の入門がてら書いてみる。

Go

あとgoのパッケージマネージャツール(vendor管理)glideも使ってみた。

glide create
glide get github.com/aws/aws-sdk-go

しておく。

main.go

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/service/sfn"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/aws"
    "time"
)

func asyncTask(sfnSession *sfn.SFN , resp *sfn.GetActivityTaskOutput){

        /* ... Do Something !!!!!!! */

    params := &sfn.SendTaskSuccessInput{
        Output:    aws.String("true"),      // Required
        TaskToken: resp.TaskToken, // Required
    }
    sfnSession.SendTaskSuccess(params)

}

func loop() {

    awsConfig :=  &aws.Config{Region: aws.String("ap-northeast-1")}

    sess, err := session.NewSession(awsConfig)
    if err != nil {
        fmt.Println("failed to create session,", err)
        return
    }

    sfnSession := sfn.New(sess)

    params := &sfn.GetActivityTaskInput{
        ActivityArn: aws.String("arn:aws:states:ap-northeast-1:xxxxxxxxx:activity:StepFunction-Called"), // Required
    }
    resp, err := sfnSession.GetActivityTask(params)

    if err != nil {
        fmt.Println(err.Error())
        return
    } else if resp.TaskToken != nil {
        fmt.Println("success")

        // 並列処理
        go asyncTask(sfnSession , resp)
    }

    fmt.Println(resp)

}

func main() {

    for {
        loop()
    }

}

 
走れ。

AWS_PROFILE=my_aws_profile go run main.go

awsのconfigとかloopの外でよかったかな。
Go初めてなので参照周りやセッションとかもうちょっと調べる。

 

Read More

mozjpeg3.1 を Amazon Linux用にフルビルドするメモ

 
ローカルのMac上で開発してると、一部のnpmモジュールはインストール時に自動でMac用のバイナリ生成のコードをインストールしてパスも通してくれちゃうので、そのまま同梱するとAWS Lambda上では当然動かない。
今回は画像の圧縮でよく使われる mozjpegpngquant がそれに当たった。
Lambdaをコールするたび、いちいちコンテナ上でインストールするのも微妙なので出来ればあらかじめ全部入りの実行ファイルをmakeしておく。
地味にいろいろ迷ったので作業メモする。
 
 
Amazon LinuxのDockerImageかubuntuのImage あたりをpullしてきて,docker-machine上で作業。
 

# 基本
apt-get update
apt-get install -y gcc make wget dpkg 

# mozjpeg用
apt-get install -y yasm nasm

wget https://github.com/mozilla/mozjpeg/releases/download/v3.1/mozjpeg-3.1-release-source.tar.gz
tar -xf mozjpeg-3.1-release-source.tar.gz
cd mozjpeg
./configure --disable-shared --enable-static 
make
make deb

 
ここ大事。 普通にやると多分実行時に symbol jpeg_float_quality_scaling, version LIBJPEG_6.2 not defined in file libjpeg.so.62 with link time reference とかで怒られる。

./configure --disable-shared --enable-static

 
これで可逆圧縮用の jpegtran や非可逆圧縮の cjpg が出来ている。
これらを同梱して既存のモジュールにエイリアス貼ったり愚直に置き換えるなりして Lambda上でも使える。
どうしてもLambda上で動いて欲しいものは1ファイルで実行できる形に持っていきたい。

Read More

AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい

書くこと

「プロジェクトごとに何度も作るような処理の一部をマイクロサービス化する」という仕事があって、その一環でStep Functionを使ったので、途中までやっていたことまとめ。
 
 
example project

ピタゴラスイッチ、その辛み

AWS Lambdaは便利だが、個別の非常に短いFunctionを繋いで1つの大きなフローを組む場合、状態管理に非常に気を使う側面があった。
エントリーポイントになるFunctionがそれぞれの個別タスクの処理結果を待ち受けするような処理にすれば話は簡単になるが、
親タスクが子タスクの状態を管理するような構成にすると、エントリーポイントそのもののタイムアウト時間を気にする必要も出てくる。
基本的にはタスク発火のエントリーポイントでは「処理依頼を投げたら投げっぱなし」にしてさっさとその役割を終えたい。

だが、こういうピタゴラスイッチ的な構成を組もうとすると、動作させたい個別のタスクごとにいちいちSQSやらDynamoやらでタスクキュー的な実装をする必要があった。1つの役割をするためだけに、なんか個々のサービスの関係性が複雑になってくる。
 

StepFunctions

昨年末発表のAWS Step Functions で、使ってみたらこの辺がだいぶ楽になりそうだった。
詳細は弊社橋本さんのこれを参照。
 

serverless-stepfunctions

Serverless Framework上でStep Functionsを定義するプラグインを早速作っていた方がいて、とても便利なので使っていく。

テスト的に組んだフローはこんな感じ

  • エントリーポイントのfunctionsではタスク設定を書いたobjectを生成する
  • Parallelステートで同じFunctionに処理を分岐。Filters機能を使って、エントリーポイントのoutputからで反応すべきkey指定
  • 個々のFunctionは確率的にFail、リトライ
  • 並行的に走らせたすべてのタスクが終わったら最後のFunctionsを起動
     

Parallelでは動的に並列タスクを増減できないのが弱点。
 
ここまでが出来るようになったServerlessFrameworkのexampleプロジェクトを、許可を取ったのでリポジトリ公開する。

処理の定義はこんな感じ。yaml見やすい。
 
serverless.yml
 

service: step-test

provider:
  name: aws
  runtime: nodejs4.3
  stage: dev # replace your setting
  profile: dev # replace your setting
  region: ap-northeast-1 # replace your setting

package:
  exclude:
   - src/**
   - node_modules/** # 今回は特に実行時依存モジュールないので外しておく
   - .local/**
   - .git/**

plugins:
  - serverless-step-functions
  - serverless-offline

stepFunctions:
 stateMachines:
   testStateMachine-v5:
     Comment: "Input and Parallel Function"
     StartAt: EntryState
     States:
       EntryState:
         Type: Task
         Resource: entry
         Next: ChildState
       ChildState:
         Type: Parallel
         Next: EndState
         Branches:
          - StartAt: Child0
            States:
              Child0:
                Type: Task
                InputPath: "$.0"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child1
            States:
              Child1:
                Type: Task
                InputPath: "$.1"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child2
            States:
              Child2:
                Type: Task
                InputPath: "$.2"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
       EndState:
         Type: Task
         Resource: lastTask
         End: true

functions:
  entry:
    handler: build/src/testFuncHandler.entry
    timeout :30
    memorySize : 128

  childTask:
    handler: build/src/testFuncHandler.childTask
    timeout : 30
    memorySize : 128

  lastTask:
    handler: build/src/testFuncHandler.lastTask
    timeout : 30
    memorySize : 128



 

エントリーポイント、個別タスクのFunctionの定義はこんな感じ。

module.exports.entry = async (event ,context , callback)=>{

    const hoge = {
        0:{
            id : 0,
            name: "hoge"
        },
        1:{
            id : 1,
            name: "fuga"
        },
        2:{
            id : 2,
            name: "piyo"
        }
    };

    callback(null , hoge);
}

/*
 serverless logs -f childTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
 * */
module.exports.childTask = async (event ,context , callback)=>{

    const num = Math.random();
    if (num < 0.25){
        console.log("ERROR!!!")
        const error = new Error("something is wrong");
        return callback(error);
    }

    const name= event.name
    callback(null , {Name : name});
}

/*
 serverless logs -f lastTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.lastTask = async (event ,context , callback)=>{
    console.log(event)
    callback(null , event);
}

 
AWS console上でこのフローをイメージ化するとこんな感じになる。
 


 

いざタスクをinvokeすると、

sls invoke stepf --state testStateMachine-v5 --stage dev --profile dev --regeon ap-northeast-1 

 
最後のタスクのeventには各並列処理の実行結果の配列が入ってくる。
 

{ executionArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:execution:step-test-dev-testStateMachine-v5:xxxxxxxxx',
  stateMachineArn: 'arn:aws:states:ap-northeast-1:xxxxxxxxxx:stateMachine:step-test-dev-testStateMachine-v5',
  name: 'xxxxxxxxxxxxxxxxxxx',
  status: 'SUCCEEDED',
  startDate: xxxxxx,
  stopDate: xxxxxx,
  input: '{}',
  output: '[{"Name":"hoge"},{"Name":"fuga"},{"Name":"piyo"}]' }

 

まとめ、ToDo

各Functionのリトライ周りがすっきりした。すごい。
今回の要件が限定的なこと、StepFunctionsのテスト的なこともあって「常に複数のParallel Functionを静的に確保する」構成を作ってみたが、やはりここは何か気持ち悪い。
やっぱこれ、動的にParallel叩けるべきじゃないっすかねえ…。

次はこの辺考える。

  • フロー自体がFailした時どこに投げるのが使用側が楽か
  • マイクロサービス化する上で使用側のAPIはどうするべきなのか
  • 最終的にActivtyステートでGoのワーカーを待ち受けさせ、RDSにデータを入れる

Read More