Golang or NodeJSで AWS Step Functions のActivityを書く

AWS Step Functionsによるピタゴラスイッチの終点で、「通常のEC2上でStep Functionsを待ち受け、RDS等になんか処理をするためのタスク」を書いている。

{
  TaskToken:xxxxx
  Input:<前段のタスクで処理したresponse>
}

ワーカーを作動させていると、こういう感じのデータがStepFunctionsから渡ってくるので、それを元に好きに処理して、SuccessやFailureリクエストをStep Functions側に返せばいい。
とりあえずDBへの登録処理とか、データの読み取りなしで実装してみる。

NodeJS

単純にNodeJSでやるならこういう感じ。

src/worker.ts

const AWS = require("aws-sdk");

const stepfunctions  : AWS.StepFunctions = new AWS.StepFunctions({
    apiVersion: '2016-11-23',
    region : "ap-northeast-1"
});


(async ()=>{
    while (1) {
        await new Promise(async (resolve) => {
            const stepFunctionParam = {
                activityArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:activity:StepFunction-Called', /* required */
            };

            const data = await stepfunctions.getActivityTask(stepFunctionParam).promise();

            if (data != null && data.taskToken != null){
                console.log(data)

         /* ... Do Something !!!!!!! */

                const sendParams = {
                    output: "true", /* required */
                    taskToken: data.taskToken, /* required */
                };   

                await stepfunctions.sendTaskSuccess(sendParams).promise()

            }
            resolve();
        }).catch(error => console.error(error))
    }
})()

 
走れ。
 

AWS_PROFILE=my_aws_profile NODE_ENV=test node build/worker.js

 

タスク部分並列処理にしたかったらawaitやらresolveの位置いじっておく。
とりあえずNodeJSで動作確認したが、この手のコードはgolangがポ〜タビリティ高くて良さそうなので、次はGo言語の入門がてら書いてみる。

Go

あとgoのパッケージマネージャツール(vendor管理)glideも使ってみた。

glide create
glide get github.com/aws/aws-sdk-go

しておく。

main.go

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/service/sfn"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/aws"
    "time"
)

func asyncTask(sfnSession *sfn.SFN , resp *sfn.GetActivityTaskOutput){

        /* ... Do Something !!!!!!! */

    params := &sfn.SendTaskSuccessInput{
        Output:    aws.String("true"),      // Required
        TaskToken: resp.TaskToken, // Required
    }
    sfnSession.SendTaskSuccess(params)

}

func loop() {

    awsConfig :=  &aws.Config{Region: aws.String("ap-northeast-1")}

    sess, err := session.NewSession(awsConfig)
    if err != nil {
        fmt.Println("failed to create session,", err)
        return
    }

    sfnSession := sfn.New(sess)

    params := &sfn.GetActivityTaskInput{
        ActivityArn: aws.String("arn:aws:states:ap-northeast-1:xxxxxxxxx:activity:StepFunction-Called"), // Required
    }
    resp, err := sfnSession.GetActivityTask(params)

    if err != nil {
        fmt.Println(err.Error())
        return
    } else if resp.TaskToken != nil {
        fmt.Println("success")

        // 並列処理
        go asyncTask(sfnSession , resp)
    }

    fmt.Println(resp)

}

func main() {

    for {
        loop()
    }

}

 
走れ。

AWS_PROFILE=my_aws_profile go run main.go

awsのconfigとかloopの外でよかったかな。
Go初めてなので参照周りやセッションとかもうちょっと調べる。

 

Read More

ブラウザからS3に巨大なファイルを低メモリで送りつけるアレ

ブラウザから8GB以上の巨大なファイルや多量のファイルをS3に送りつけるには、MultiPart Uploadの機能を使っていく必要がある。
具体的には、データを5MB以上のchunkに分け、分割して送りつけることになる。
データが8GBいかないケースでも、ファイル読み込み量を分割できる機構が使えるので、メモリにも優しい。
それなりに使う場面はありそう。

1.とりあえずドキュメント通り実装して徳を積む

マルチパートアップロードの概要
Multipart Upload API を使用したオブジェクトのアップロード

(1) S3のCORS設定

  • ExposeHeader => ETag を指定する。
  • AllowHeader
<AllowedHeader>Authorization</AllowedHeader>
<AllowedHeader>Content-Type</AllowedHeader>
<AllowedHeader>User-Agent</AllowedHeader>
<AllowedHeader>x-amz-date</AllowedHeader>
<AllowedHeader>x-amz-user-agent</AllowedHeader>
<AllowedHeader>x-amz-storage-class</AllowedHeader>
<AllowedHeader>x-amz-acl</AllowedHeader>

この辺を設定する。PUT権とかはあとで動的に付与する。

(2) ブラウザのfile api

ブラウザのfile api でファイル名やbyte数取得。
 
この段階ではFileReaderによるメモリ上への展開は行わないこと。
 
余裕があればWebWorkerで別スレッド立ててUIスレッドへの影響範囲を小さくする。
 

(3) サーバーから署名付きURL取得

ブラウザからのsignedURL発行要求を認可,返却。
S3のbucketやオブジェクトに対し、PUTやHEADリクエストが通るようにする。
 
この辺までは割といつも通り。
 

(4) S3::createMultipartUpload

S3の該当keyに対して createMultipartUpload 要求。uploadIdを取得。
要するに「今から分割ファイルでアップロードするぜ?」という宣言みたいなの。
このuploadIdは今後のリクエストすべてに同梱することとなる。
 

(5) 分割したblobを送りつける S3::uploadPart

 
FileReaderにいきなり巨大なfileオブジェクト食わすと多分2GBあたりでブラウザが音も立てず死ぬ。
chunk送信用のループ内でFile::slice(startByte , endByte)してblob化して都度捨てるのがミソ

 
この部分しかメモリ上に乗らないようになる。これが可能なのよく知らなかった。
並列アップロード数のロジックとかchunkごとの大きさ(5MB以上)とかは様子見ながら好きにカスタムしたらいいんじゃないかな。
 
この時帰ってくるETag値と、送ったPartNumber値はこういう形式でとっておく。

const multipartMap= { Parts: [] };

//...ループ内
multipartMap.Parts[partNumber - 1] = {
    ETag: 返ってきたETag値,
    PartNumber: partNumber
};

(6) 完了 S3::completeMultipartUpload

 
すべて送り終わったら、completeMultipartUploadリクエストを送る。
この時、先ほどのmultipartMapを同梱して送りつける。
これで、初めてS3のバケット上にオブジェクトができる。
 

Code

4-6まではこんな感じになります。今回はaws-sdk使用。全部Promise返すモードがいつの間にかついてて良い。
(並列アップロードのロジック、chunkごとの大きさ指定(5MB以上)とかエラー処理は記事のためあえて省いてる。)

const upload = async (s3 ,  s3Params , file)=>{

    const mime = Mime.lookup(file.name);
    const multiPartParams = s3Params.ContentType ? s3Params : {ContentType : mime , ...s3Params};
    const allSize = file.size;

    const partSize = 1024 * 1024 * 5; // 5MB/chunk

    const multipartMap = {
        Parts: []
    };

    /*  (4)   */
    const multiPartUploadResult = await s3.createMultipartUpload(multiPartParams).promise();
    const uploadId = multiPartUploadResult.UploadId;

    /*  (5)  */
    let partNum = 0;
    const {ContentType , ...otherParams} = multiPartParams;
    for (let rangeStart = 0; rangeStart < allSize; rangeStart += partSize) {
        partNum++;
        const end = Math.min(rangeStart + partSize, allSize);

        const sendData = await new Promise((resolve)=>{
            let fileReader =  new FileReader();

            fileReader.onload = (event)=>{
                const data = event.target.result;
                let byte = new Uint8Array(data);
                resolve(byte);
                fileReader.abort();
            };
            const blob2 = this.file.slice(rangeStart , end);
            fileReader.readAsArrayBuffer(blob2);
        })

        const progress = end / file.size;
        console.log(`今,${progress * 100}%だよ`);

        const partParams = {
            Body: sendData,
            PartNumber: String(partNum),
            UploadId: uploadId,
            ...otherParams,
        };
        const partUpload = await s3.uploadPart(partParams).promise();

        multipartMap.Parts[partNum - 1] = {
            ETag: partUpload.ETag,
            PartNumber: partNum
        };
    }

    /* (6) */
    const doneParams = {
        ...otherParams,
        MultipartUpload: multipartMap,
        UploadId: uploadId
    };

    await s3.completeMultipartUpload(doneParams)
        .promise()
        .then(()=> alert("Complete!!!!!!!!!!!!!"))
}

 
理屈上、fileObjectの参照が残ってれば,中断やキャンセル、断片ごとのリトライも可能なはず。
S3 Multipart Upload Request のタイムアウト設定を長めにしておいてもいいかも。
 
 

2.めんどくさいのでOSSを使う

 
上記までをライブラリ化しようと思ってたけど、ある意味当然のごとく既に存在した。。
仕様も非同期チェーンも特に覚えたくない人は、早くお家帰りたいのでこっちのありものを使う。
 
EvaporateJS
 
一時停止やキャンセル、リトライ、chunkサイズの指定や同時送信数も指定可能。
webWorker上の別スレッドで動かすモードはついてないみたいだけど、よくまとまっててすごい。
ただしPromise使うので、typescriptやbabel等のプリプロセッサ通さない人は、es6-promiseあたりのポリフィルを念のため入れること。
以上です。

Read More

AWS Lambdaで Slackのslashコマンドを作る(1)

伏見です。
表題の通りやっていきたいと思います。

(1)セットアップ

まずはおもむろにサーバーレスフレームワークのセットアップをします。
2016年9月現在、Serverless Frameworkは1.0@Beta3くらいまで出てますが、
サクッとAWSのlambdaとAPI Gatewayを使う限り、regeonやprofileなどを対話形式で選んでくれるバージョン0.5.6の方が
今の所使いやすいのでこっちを使います。
0.5系のdocumentはこちら 

入れます。

npm install -g serverless

プロジェクト立てます。

serverless project create

対話形式でいろいろ聞いてくるので、聞かれるままに言語やAWSのプロファイルの設定をします。
あ、aws-cliは入っているものとします。

次に、lambdaのファンクションやAPI gatewayのエンドポイントをサクッと立てます。
ここでは、command01と云う名前にしてみます。

serverless function create functions/command01

また対話形式で、言語や作るものを聞かれます。
ここではnodejs4.3を選んでおきます。

Serverless: Please, select a runtime for this new Function
  > nodejs4.3
    python2.7
    nodejs (v0.10, soon to be deprecated)

設定を終えると、functions/command01以下にlambdaファンクションの本体となる handler.jsや、API gateway、lambdaの設定を司るs-function.jsonなどができています。
ファンクションを作ったディレクトリに移動して、npmの設定を行います。
これは、lambdaにdeployする時に、node_modulesが独立しているためです。

cd functions/command01
npm init

ちなみにwebpackを使う方はファンクションごとのこの設定、いりません。babelやtypescriptを使ったり、jsonやimageをバンドルして扱いやすくしたい…あるいは複数のfunctionごとにコードを共有したい…という方もこっちですね。
0.5系向けならwebpack plugin  がとにかく簡単でオススメです。
サクッとやるだけならいらないです。

(2)まずは繋いでみる

とりあえずデプロイします。

lambda。

serverless function deploy

api-gateway。

serverless endpoint deploy

endpoint deploy後に表示されたURLにブラウザで確認します。

ほげ

表示されました。
これでとりあえずのひな形ができました。

実際の機能作成は次の記事で。

Read More

IE11のuserAgent対応javascriptによるIE判定式(全バージョン対応)

IE11は従来のIEと違い、userAgentの中にMSIEがありません。
よって良くある、userAgent内のMSIEindexOf等で探す方法ではIE11がうまく取れません。
jsでIE判定をかます際に色々調べたので情報共有。

javascriptによるIE判定式

var userAgent = window.navigator.userAgent.toLowerCase();
if( userAgent.match(/(msie|MSIE)/) || userAgent.match(/(T|t)rident/) ) {
    var isIE = true;
    var ieVersion = userAgent.match(/((msie|MSIE)\s|rv:)([\d\.]+)/)[3];
    ieVersion = parseInt(ieVersion);
} else {
    var isIE = false;
}

上記コードで
isIEにIEかどうかの判定(boolean型)
ieVersionにバージョン数(int型)
parseIntを挟むかどうかはケースバイケースで。

今後のIEはどうなるか分かりませんがとりあえず現在はこれでいけると思います。

Read More

iPhoneとiPadで、$.on、$.delegateまたは$.bindから割り当てたクリックイベントが発火しない場合の対処法

原因はよくわからないけど、対処は簡単。
clickイベントを割り当てた要素の属性に「onclick=””」を指定する。

Click here

こちらにあった
http://stackoverflow.com/questions/10165141/jquery-on-and-delegate-doesnt-work-on-ipad

Read More

Wheelイベントをオーバーライドして、画面をスクロールせずにwheelDeltaの値を取得する

$(function(){
     function handle(delta) {
                if (delta < 0) console.log("down------------------delta:" + delta);
                else console.log("up--------------------delta:" + delta);
            }

            function wheel(event){
                var delta = 0;
                if (!event) /* For IE. */
                    event = window.event;
                if (event.wheelDelta) { /* IE/Opera. */
                    delta = event.wheelDelta/120;
                    if (window.opera)
                        delta = -delta;
                } else if (event.detail) { /** Mozilla case. */
                delta = -event.detail/3;
                }
                /** If delta is nonzero, handle it.
                 * Basically, delta is now positive if wheel was scrolled up,
                 * and negative, if wheel was scrolled down.
                 */
                if (delta)
                    handle(delta);
                if (event.preventDefault) {
                    event.preventDefault();
                }
                event.returnValue = false;
            }

            if (window.addEventListener) window.addEventListener('DOMMouseScroll', wheel, false);
            window.onmousewheel = document.onmousewheel = wheel;
})

Read More