AWSマルチアカウントのS3間のcopyのメモ

AWSアカウントAにIAMユーザーA1,
AWSアカウントBにIAMユーザーB1がいるとする。

アカウントAのS3からアカウントBのS3にオブジェクトのコピーを行うには、通常、ユーザーA1としてアカウントAのS3からオブジェクトをgetし、ユーザーB1としてアカウントBのS3にputする方法が考えられる。
 
そうではなく、S3からS3に直接オブジェクトをcopyしたい。
 
そこで、コピー元の対象bucketのバケットポリシーに以下を追加する。
 

{
   "Version": "2012-10-17",
   "Statement" : {
      "Effect":"Allow",
      "Sid":"ReadAccess",
      "Principal" : {
          "AWS":["arn:aws:iam::アカウントBのユーザーB1"]
      },
      "Action":"s3:GetObject",
      "Resource":"arn:aws:s3:::コピー元対象bucket/*"
   }
}

 
cliでのアクセスは以下の通り。クライアントのコンピュータを介することなく複数のprofileを跨いだcopyやsyncができる。

 

aws s3 cp --profile user_A1 s3://A-bucket/object.png --profile user_B1 s3://B-bucket/

Read More

RxJavaで並列処理と非同期チェーン

今のとこAndroid開発でRxJava使ってる。
待ち受けストリーム的な実装は基本的にしていなくて、jsでいうPromise的な扱いとLinq代わりのコレクション操作をRxJavaに任せてる、という感じ。
適当にやってるとその辺幾つか忘れる項目があるのでメモ。

並列処理

Observable.zipを使う。

Observable<Integer> obs1 = Observable.just(256);
Observable<String> obs2 = Observable.just("String");
Observable<Boolean> obs3 = Observable.just(true);

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
 .subscribe(str -> Log.d("debug" , str));

非同期チェーン

複数のServiceとして分けたObservable実装をつないでいくとき、
愚直に書くとどんどんネストが深くなっていって、なんかコールバック地獄みたいになる。
ここではflatMapを使っていく。
flatMapは次の処理にObservableを渡せる。

//observable1 , observable2 , observable3がそれぞれAPIを叩くような処理のとき。

observable1
    .flatMap((result1)->{  
        if (result1の結果){
            return observavle2();
        } else {
            return observable3();
        }
    })
    .subscribeOn(Schedulers.newThread())
    .onBackpressureBuffer()
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe((result)->{
     // onNext
    },(err)->{
      // onError 

    },()->{
      //onComplete
    });

ただ、他の言語でPromise返すような実装もそうだが、もし処理によって返す型が違うObservableを繋がなきゃいけないときの個人的プラクティスはあんまり固まってない。
この辺迷うくらいならAPIを叩くような個々のServiceはObservable単位で切らない方がいい。

Promise代替的なチェーンよりはなんだかんだasync/await的な構文の方が分岐周りとかスッキリするような気になっていて、
それもあって次はネイティブ部分が必要になる処理書くときはkotlin使ってみたい。

それ普通にhandler立てたら??????とか言わない。

Read More

おんやど恵@湯河原でWanoグループの開発合宿してきた (後編)

では前回に引き続き後編です。

一晩明けて 8:00-

朝まで作業していた方が非常に多く、皆さん非常に疲れがたまっていたようです。
やさぐれてます。


 
こちらが朝食のメニューと朝の先輩です。
干物に、湯豆腐などついていました。

うまい

 
 

追加開発 9:00-

寝る部屋を引き払っても会議室は午後まで借りられる!
朝食を終えてからも、各チームの開発はまだまだ続きます。

 
 

成果発表! 12:30-

(1) チーム社内音楽レコメンド

チーム社内音楽レコメンド(チーム不健康)は社内BGMのプレイリストを個々人が追加/お気に入りする機能をほぼ完成させていました。
Dockerのubuntuイメージをベースとして、アプリケーションサーバー側の基本言語はGo,webフレームワークはRevelを使用しています。
それに加えて、いつの間にかAWS Pollyによるテキスト読み上げ/館内放送の仕組みを追加していました。


 

社内に音楽を流している間でも、放送でテキストスピーチによる呼び出しが可能なのだそうです。
恐ろしいですね。どこに呼び出されるんでしょうか。
 
今回の開発は、DockerやGoでのweb開発を皆が試すいい機会にもなったようです。

(2) チーム受付動画配信

弊社受付のプロジェクターにながす動画を操作/プレイリスト変更しようという本プロジェクト。

深夜に一部苦戦していたところはあったようですが、見事形にされていました。
WebSocket経由で、リアルタイムで動画プレイリストのリソース変更・再生/停止操作を行い、それがプロジェクターにつながるクライアントのコンピュータにも反映されます。
ネットワーク越しに某すごーいなアニメを再生/停止するデモを行いました。
Rustの布教も欠かさない!

(3) チームセンサー

オフィスの各部屋の人員管理を一手に担おうというこのプロジェクト。
エンジニアは物理センサー開発をし、デザイナーはGUIの開発を行いました。
運用時にはいくつかの種類のセンサーが使われる模様です。
「オフィスのドアは開いているか?」「予約はされているがミーティングルームは実際に使われているのか?」に対しては、フォトリフレクターなどの装置を用いました。
「トイレに人はいるのか?」には人感センサーなどを用いる話もありました。(どれを使うかまだ決まっていないようです。)

Slackやチーム社内音楽レコメンドの館内放送などとも連携しよう!という意見も出たので、今後の幅が広がりそうです。

(4) チームレンタル

社内端末管理システムに取り組んだこのチーム。
当初の構想に加え、貸出予約期間もオプション設定できるようになり、端末の外部持ち出し時の管理も視野に入ってきました。

パスワード認証だけでなく、「KAIROS」というクラウドの顔認証API経由でも端末利用者の持ち出しを管理することができるようになる予定だそうです。すごい。
苦労した点としては、アプリのカメラ周りをいじるのが初めてだったようで、そこに難儀したようです。

(5) チームVR受付

GearVRによる来客受付システムに取り組んだ僕のチーム。

結果的にはほとんどUnity勉強会!みたいになってしまいました。(というか他に写真なかったんすか!)
実際は、視線で呼び出したいチームのアイコンに視線を合わせ、GearVRのタップで選択していきます。

シーンの切り替えでなく、アニメーションでGUIの変更を管理するのが次の目標です…。

 
 

撤収 14:00-

こうしてWanoグループのエンジニア/デザイナー陣は爽やかな疲れ?と共に1泊2日の開発合宿を終えました。
普段一緒に働いているチーム以外のメンバー以外とも開発を行って、新しい知見を得たり、普段使わない技術にどんどん取り組んでいくことができました。
チーム同士で開発したものを相互に繋ぎこむアイディアも生まれていったので、まだまだ面白いことができそうですね。
おん宿恵さんでの開発合宿、非常に実りのある結果になったかと思います!
 

 

Read More

おんやど恵@湯河原でWanoグループの開発合宿してきた (前編)

さる2017年3月10日/11日に、神奈川県は湯河原温泉の「おんやど恵」さんでWanoグループの開発合宿を行いました。
本稿では、合宿中にあった出来事を幾つかに分けて記事をアップしていこうかなと思います。

開発合宿の目的

今回の開発合宿では、エンジニア/デザイナー陣が5グループほどに分かれて各々のチームで開発を行いました。
開発のメインテーマは、「日常の様々なタスクを技術によって便利にする!」です。

チームは以下の通りです。

(1) チーム社内音楽レコメンド
(2) チームセンサー
(3) チームモバイル端末管理
(4) チーム受付動画配信
(5) チームVR受付

一応それぞれの開発テーマはあるものの、「普段業務で培っている知識を使って役立つものを作る」だけでなく、
なかなか案件では(まだ)使う機会のないIoT周りやマニアックな言語、VR等の技術に挑戦して、**開発チーム全体の徳を高めていこうぜ!**というのも目的の一つとなります。
 
それでは順次、出来事を記載していこうかと思います。

1日目

出発

集合。
最寄りから直接合流する人もいました。(僕も)

東海道線にてひたすら進みます。
各停なのに1駅の区間長いなー!という個人的驚き。

電車内では早速作業を開始する人や、

台湾出身のエンジニアを中心に、食習慣の違いについての討論会も開かれたりしていました。(ただの雑談ともいう)

12:30 湯河原到着

着きました。
当日の湯河原では源頼朝が兵を起した決起祭みたいなのをやってました。祭を見る機会はなかったのですが、あちこちで看板を拝見しました。
 

 
 
湯河原駅からバスで10分ほど、「おん宿恵」さんに着きました!
こちらのお宿は支配人さんが元システムエンジニアなのだそうです。
開発合宿用途として、痒いところに手が届く素敵な設備がしっかり揃っています。素敵。
他のベンチャーさんでの開発合宿でもよく使われているみたいですねー。

 
 
着いて早々とりあえず足湯につかるエンジニア。We are 足湯エンジニア。

13:30 昼食

宿近くの魚繁さんでお昼いただきました。
鯛のかぶと煮派とブリマグロ丼派に分かれました。

際限なく喋り続ける先輩たちを眺めながら、TunecoreのエンジニアYoheiheiくんが 「テレビみたいっすね。」 って言ったのが昼のハイライト。

14:30 – 開発開始

皆急に静かになり、チーム毎にあらかじめ決めていた行程にしたがって、黙々を作業を始めました。
ここで各チームが一体何をやっているかを見ていきましょう。

チーム社内音楽レコメンド

主に社内のBGMには音楽ストリーミングのサービスを利用しています。
ですが、「もっと面白く、個々人が自前でプレイリスト内に音楽を差し込んだり、気に入った曲情報を直接API経由で取得できるようにしよう」というプロジェクトを担当したのがこのチームです。

チームセンサー

このチームが取り組んでいる問題は「オフィスの残員管理」や「社内男子トイレ難民問題の解決」。
これらをラズパイなどのマイコンやセンサーなどを利用したIoTで解決してみよう!というアプローチを取ったのがこのチームです。

何やらいきなりハンダゴテでの作業が始まっていました。

チームレンタル

社内テスト用モバイル端末や社員そのものの人数が増えてきて、弊社でも端末のリアルタイムな管理が必要なフェーズにぼちぼち入ってきました。
それを顔認証APIを使ったり、端末ロックアプリ開発とかして管理していこうぜ!というアプローチを取ったのがこのチームです。
何やらいきなりホワイトボードでシステム全体の相関図を書きつつ、熱い議論が始まっていました。

チーム受付動画配信

新しくEdocodeのエンジニアとして入った藤田さんのプロジェクト。
弊社の受付スペースでプロジェクタを使って流される動画システムのバックエンドを、Rust/WebSocketで書こうというプロジェクト!

この日までにすでに構成図仕上げていらっしゃいました。すごーい!

僕もそのうちWebAssemblyとかRustで吐いて徳を積みたいです。

チーム受付VR

社内受付電話からの内線の多さを解消したい => タブレット用のかっこいい受付アプリ作りたい => それつまんないからVRでやろうぜ!
という流れで、無謀にも始まったVRのプロジェクトです。

Unity入門2,3日目のメンバーのみで始めるVR体験!

 
 
 
和やかな雰囲気の中始まった開発合宿。
一体どうなるのでしょうか(フラグ)

次回に続きます
 

Read More

PostCssで勧告ちょい前くらいまでのCSSをビルドする

最近はGolangの勉強中心のみやっててちょっと疲れたので閑話休題っす。


要約3行

PostCss
ボイラープレート
書く


Example

やったこと

前に「cssのclearfixハックの代わりとなる記法がChromeに実装される」っていうので注目を集めたこともあって、
ちょっとcssビルド周りの設定を見直しがてら、ここで一旦jsやhtmlじゃなくて最新仕様のcssだけpostcssのcliでビルドする、という主旨でやってみます。
使うもの、postCSSのcliツール。

ここ1年以上、postCSS使用の際はビルドツールの雄webpackからpostcssを呼んでいましたが、多分そっちはやりすぎっていう向きも多いかと思うので、今回はcliでのビルドの紹介に留めます。
(というか単体でのビルドやったことなかったので。)


PostCss

概要

PostCSSは各種css変換プラグインを通すためのAPIを提供するツールです。好きなコンパイラ設定を個々に導入できるイメージです。
cssフレームワークのBootstrapはsassからpostcssになったりと、そこそこ使われている場面があります。

PostCSSプラグインには、もちろんSASS記法を高速コンパイルするものもあります。
ですが、SASSが事実上のデファクト(?)とはいえ、独自記法に関しては今回の趣旨と違うため導入手順は割愛します。

プラグインの追加

特にcliで呼ぶ場合、コマンドで指定してもいいですが、npmで各プラグインをインストール後、
こういう設定ファイルを書くと便利です。

{
  "use": [
    "postcss-import",
    "postcss-cssnext",
    "postcss-flexbugs-fixes",
    "postcss-flow-root",
    "postcss-grid-kiss"
  ],
  "postcss-flow-root" : {
    "fallback" : "clearfix"
  },
  "input": "src/index.css",
  "output": "build/bundle.css"
} 

コンパイル

postcss -c .postcssrc.json 

代表的なPostCssプラグイン

cssnext(postcss-cssnext)

cssnextは幾つかのPostCSSプラグインがあらかじめ同梱されてるPostCSSプラグインです。
基本的には策定中のCSS仕様を使えるようにするプラグインであり、CSS版Babelみたいなものです。

今回の趣旨としては大枠としてこれだけを入れとけばOKです。

同梱されているPostCSSプラグインはこんな感じ
個別に無効化することも可能です。

Bootstrapと並ぶcssフレームワークのFoundationもこれを使ってるぽいです。

できるようになることは本家やCSSの仕様見てください。

  • all : initial; のバックポート実装が使えるようになる。
  • css variables が全ブラウザで使えるようになり、カラーパレットやmargin設定の管理がより強固にできるようになる。
  • custom media queriesで各デバイス用のメディアクエリを論理立てて一元管理できるようになる。
  • 同梱されているautoprefixerプラグインでベンダープリフィックス勝手につけてくれる。

と、まあそれなりに利点があります。

あとはColor FunctionなどSASSとかの文化から策定に行ったっぽい奴もありますね。

autoprefixer

おそらく世界で一番使われてるPostCSSプラグインです。前述のcssnextの中にも同梱されています。
moz-* とか webkit-* とかのベンダープリフィックスを自動で付与します。
下記のような感じですね。

.cannot-selectble {
    user-select: none;
}

 

.cannot-selectble {
    -webkit-user-select: none;
       -moz-user-select: none;
        -ms-user-select: none;
            user-select: none;
}

ちょい足し

cssnextに加えて、魔改造オレオレCSS化しないよう気をつけながらテスト的にPostCSSプラグインをいくつか追加してみます。

postcss-flexbugs-fixes

ブラウザごとに起こるflexboxの挙動の違いをある程度吸収します。

postcss-flow-root

幾つかのブラウザ実装が始まったW3Cのdisplay: flow-rootを入れ込みます。
しばらくはオプション指定でclearfixとかいうあのハックまがいの実装を吐いてもらいますが、これであの実装とは事実上別れを告げます。
こういうとこにはflexboxやGrid使いたいのですが。
しばらくは fallback: "clearfix"の設定が必要そう。

postcss-grid-kiss

Safariでも実装が始まったCSS Grid Layoutのプロパティを非対応ブラウザに対しても適用します。
どうやら全部は再現できていないっぽいですが。

postcss-import

webpackなどでimportした他cssファイルを展開するのに使います。
最終的に1ファイルにして、ブラウザ実行時に@import時の負荷を防ぎます。

番外/SASS記法

独自記法があるので今回は見ませんが、SASS記法を提供するためのプラグインでstarが多そうなものに precss , sugarss , postcss-scss 等があります。
他のも探せばあるかと思います。

まとめ

成果物

トランスパイラを通すことで仕様に則った便利機能を一足先に使っていきたいという意味では有用かと思います。

そうではなく、「バグを潰す」という意味では、あまり期待しすぎない方がいい感じです。
単純にトランスパイラでなんとかなる程度のブラウザ間の差分ならかわいいもので、辛さは多分そこで吸収できない先にあるわけなので。

つらつら書きましたが、あんまり最新の仕様を把握しきってないとこもあるかと思うので、もうちょっとなんとかなるぜってご意見あれば、マサカリ等お待ちしております。
以上です。

Read More

Golangのvendor管理周りでちょっとハマった

vendor管理 / glide

Goは1.5あたりからプロジェクトのvendorディレクトリ内のパッケージをグローバルより優先して見に行く設定にできる。

export GO15VENDOREXPERIMENT=1

 

プロジェクト毎のvendor管理のためにパッケージ管理ツールglide などを使うと、基本的にプロジェクトのvendor内に3rdパーティのパッケージがインストールされていく。

glide get した依存モジュールはglide.yamlに自動追記され、vendor以下がバージョン管理対象外でも glide install で別環境でも復元できるようになる。

要するに「プロジェクトごとにモジュールのバージョンとか固定しようぜ」っていう、package.json的なものに近い。

サブパッケージ内からvendor内のパッケージが見つからない

そこで、main.goがsubpackage内のモジュールを呼ぶこういう感じになるとする。
 


projectName │  ├── subpackage │   ├── example │   │   └── example.go │   └───example2 │      └── example2.go ├── glide.lock ├── glide.yaml ├── main.go └── vendor    ├── github.com ├── aws ....

 

上記の構成でいうexample.goからawsをimportしたところ、cannot find package...で怒られた。
結論から言うと、原因はここにかいてあった通りだった。

ダメ

main.go

import (
    "./subpackage/sub"
)
func main() {
    sub.Run()
}

OK

main.go

import (
    "projectName/subpackage/sub"
)
func main() {
    sub.Run()
}

下だと動く。
相対パスでのimportで入れたパッケージからもグローバルのパッケージは呼べたので油断していたが、vendor内にアクセスするには、対象がそのプロジェクトの一部だということを明示しないといけない?っぽい。

Read More

Golang or NodeJSで AWS Step Functions のActivityを書く

AWS Step Functionsによるピタゴラスイッチの終点で、「通常のEC2上でStep Functionsを待ち受け、RDS等になんか処理をするためのタスク」を書いている。

{
  TaskToken:xxxxx
  Input:<前段のタスクで処理したresponse>
}

ワーカーを作動させていると、こういう感じのデータがStepFunctionsから渡ってくるので、それを元に好きに処理して、SuccessやFailureリクエストをStep Functions側に返せばいい。
とりあえずDBへの登録処理とか、データの読み取りなしで実装してみる。

NodeJS

単純にNodeJSでやるならこういう感じ。

src/worker.ts

const AWS = require("aws-sdk");

const stepfunctions  : AWS.StepFunctions = new AWS.StepFunctions({
    apiVersion: '2016-11-23',
    region : "ap-northeast-1"
});


(async ()=>{
    while (1) {
        await new Promise(async (resolve) => {
            const stepFunctionParam = {
                activityArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:activity:StepFunction-Called', /* required */
            };

            const data = await stepfunctions.getActivityTask(stepFunctionParam).promise();

            if (data != null && data.taskToken != null){
                console.log(data)

         /* ... Do Something !!!!!!! */

                const sendParams = {
                    output: "true", /* required */
                    taskToken: data.taskToken, /* required */
                };   

                await stepfunctions.sendTaskSuccess(sendParams).promise()

            }
            resolve();
        }).catch(error => console.error(error))
    }
})()

 
走れ。
 

AWS_PROFILE=my_aws_profile NODE_ENV=test node build/worker.js

 

タスク部分並列処理にしたかったらawaitやらresolveの位置いじっておく。
とりあえずNodeJSで動作確認したが、この手のコードはgolangがポ〜タビリティ高くて良さそうなので、次はGo言語の入門がてら書いてみる。

Go

あとgoのパッケージマネージャツール(vendor管理)glideも使ってみた。

glide create
glide get github.com/aws/aws-sdk-go

しておく。

main.go

package main

import (
    "fmt"
    "github.com/aws/aws-sdk-go/service/sfn"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/aws"
    "time"
)

func asyncTask(sfnSession *sfn.SFN , resp *sfn.GetActivityTaskOutput){

        /* ... Do Something !!!!!!! */

    params := &sfn.SendTaskSuccessInput{
        Output:    aws.String("true"),      // Required
        TaskToken: resp.TaskToken, // Required
    }
    sfnSession.SendTaskSuccess(params)

}

func loop() {

    awsConfig :=  &aws.Config{Region: aws.String("ap-northeast-1")}

    sess, err := session.NewSession(awsConfig)
    if err != nil {
        fmt.Println("failed to create session,", err)
        return
    }

    sfnSession := sfn.New(sess)

    params := &sfn.GetActivityTaskInput{
        ActivityArn: aws.String("arn:aws:states:ap-northeast-1:xxxxxxxxx:activity:StepFunction-Called"), // Required
    }
    resp, err := sfnSession.GetActivityTask(params)

    if err != nil {
        fmt.Println(err.Error())
        return
    } else if resp.TaskToken != nil {
        fmt.Println("success")

        // 並列処理
        go asyncTask(sfnSession , resp)
    }

    fmt.Println(resp)

}

func main() {

    for {
        loop()
    }

}

 
走れ。

AWS_PROFILE=my_aws_profile go run main.go

awsのconfigとかloopの外でよかったかな。
Go初めてなので参照周りやセッションとかもうちょっと調べる。

 

Read More

mozjpeg3.1 を Amazon Linux用にフルビルドするメモ

 
ローカルのMac上で開発してると、一部のnpmモジュールはインストール時に自動でMac用のバイナリ生成のコードをインストールしてパスも通してくれちゃうので、そのまま同梱するとAWS Lambda上では当然動かない。
今回は画像の圧縮でよく使われる mozjpegpngquant がそれに当たった。
Lambdaをコールするたび、いちいちコンテナ上でインストールするのも微妙なので出来ればあらかじめ全部入りの実行ファイルをmakeしておく。
地味にいろいろ迷ったので作業メモする。
 
 
Amazon LinuxのDockerImageかubuntuのImage あたりをpullしてきて,docker-machine上で作業。
 

# 基本
apt-get update
apt-get install -y gcc make wget dpkg 

# mozjpeg用
apt-get install -y yasm nasm

wget https://github.com/mozilla/mozjpeg/releases/download/v3.1/mozjpeg-3.1-release-source.tar.gz
tar -xf mozjpeg-3.1-release-source.tar.gz
cd mozjpeg
./configure --disable-shared --enable-static 
make
make deb

 
ここ大事。 普通にやると多分実行時に symbol jpeg_float_quality_scaling, version LIBJPEG_6.2 not defined in file libjpeg.so.62 with link time reference とかで怒られる。

./configure --disable-shared --enable-static

 
これで可逆圧縮用の jpegtran や非可逆圧縮の cjpg が出来ている。
これらを同梱して既存のモジュールにエイリアス貼ったり愚直に置き換えるなりして Lambda上でも使える。
どうしてもLambda上で動いて欲しいものは1ファイルで実行できる形に持っていきたい。

Read More

AWS Step Functionsでマイクロサービスから状態管理を(なるべく)取り除きたい

書くこと

「プロジェクトごとに何度も作るような処理の一部をマイクロサービス化する」という仕事があって、その一環でStep Functionを使ったので、途中までやっていたことまとめ。
 
 
example project

ピタゴラスイッチ、その辛み

AWS Lambdaは便利だが、個別の非常に短いFunctionを繋いで1つの大きなフローを組む場合、状態管理に非常に気を使う側面があった。
エントリーポイントになるFunctionがそれぞれの個別タスクの処理結果を待ち受けするような処理にすれば話は簡単になるが、
親タスクが子タスクの状態を管理するような構成にすると、エントリーポイントそのもののタイムアウト時間を気にする必要も出てくる。
基本的にはタスク発火のエントリーポイントでは「処理依頼を投げたら投げっぱなし」にしてさっさとその役割を終えたい。

だが、こういうピタゴラスイッチ的な構成を組もうとすると、動作させたい個別のタスクごとにいちいちSQSやらDynamoやらでタスクキュー的な実装をする必要があった。1つの役割をするためだけに、なんか個々のサービスの関係性が複雑になってくる。
 

StepFunctions

昨年末発表のAWS Step Functions で、使ってみたらこの辺がだいぶ楽になりそうだった。
詳細は弊社橋本さんのこれを参照。
 

serverless-stepfunctions

Serverless Framework上でStep Functionsを定義するプラグインを早速作っていた方がいて、とても便利なので使っていく。

テスト的に組んだフローはこんな感じ

  • エントリーポイントのfunctionsではタスク設定を書いたobjectを生成する
  • Parallelステートで同じFunctionに処理を分岐。Filters機能を使って、エントリーポイントのoutputからで反応すべきkey指定
  • 個々のFunctionは確率的にFail、リトライ
  • 並行的に走らせたすべてのタスクが終わったら最後のFunctionsを起動
     

Parallelでは動的に並列タスクを増減できないのが弱点。
 
ここまでが出来るようになったServerlessFrameworkのexampleプロジェクトを、許可を取ったのでリポジトリ公開する。

処理の定義はこんな感じ。yaml見やすい。
 
serverless.yml
 

service: step-test

provider:
  name: aws
  runtime: nodejs4.3
  stage: dev # replace your setting
  profile: dev # replace your setting
  region: ap-northeast-1 # replace your setting

package:
  exclude:
   - src/**
   - node_modules/** # 今回は特に実行時依存モジュールないので外しておく
   - .local/**
   - .git/**

plugins:
  - serverless-step-functions
  - serverless-offline

stepFunctions:
 stateMachines:
   testStateMachine-v5:
     Comment: "Input and Parallel Function"
     StartAt: EntryState
     States:
       EntryState:
         Type: Task
         Resource: entry
         Next: ChildState
       ChildState:
         Type: Parallel
         Next: EndState
         Branches:
          - StartAt: Child0
            States:
              Child0:
                Type: Task
                InputPath: "$.0"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child1
            States:
              Child1:
                Type: Task
                InputPath: "$.1"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
          - StartAt: Child2
            States:
              Child2:
                Type: Task
                InputPath: "$.2"
                Resource: childTask
                End: true
                Retry:
                - ErrorEquals:
                  - HandledError
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
                - ErrorEquals:
                  - States.TaskFailed
                  IntervalSeconds: 5
                  MaxAttempts: 6
                  BackoffRate: 1
       EndState:
         Type: Task
         Resource: lastTask
         End: true

functions:
  entry:
    handler: build/src/testFuncHandler.entry
    timeout :30
    memorySize : 128

  childTask:
    handler: build/src/testFuncHandler.childTask
    timeout : 30
    memorySize : 128

  lastTask:
    handler: build/src/testFuncHandler.lastTask
    timeout : 30
    memorySize : 128



 

エントリーポイント、個別タスクのFunctionの定義はこんな感じ。

module.exports.entry = async (event ,context , callback)=>{

    const hoge = {
        0:{
            id : 0,
            name: "hoge"
        },
        1:{
            id : 1,
            name: "fuga"
        },
        2:{
            id : 2,
            name: "piyo"
        }
    };

    callback(null , hoge);
}

/*
 serverless logs -f childTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
 * */
module.exports.childTask = async (event ,context , callback)=>{

    const num = Math.random();
    if (num < 0.25){
        console.log("ERROR!!!")
        const error = new Error("something is wrong");
        return callback(error);
    }

    const name= event.name
    callback(null , {Name : name});
}

/*
 serverless logs -f lastTask --region ap-northeast-1 --profile dev --stage dev --tail --interval 500
* */
module.exports.lastTask = async (event ,context , callback)=>{
    console.log(event)
    callback(null , event);
}

 
AWS console上でこのフローをイメージ化するとこんな感じになる。
 


 

いざタスクをinvokeすると、

sls invoke stepf --state testStateMachine-v5 --stage dev --profile dev --regeon ap-northeast-1 

 
最後のタスクのeventには各並列処理の実行結果の配列が入ってくる。
 

{ executionArn: 'arn:aws:states:ap-northeast-1:xxxxxxx:execution:step-test-dev-testStateMachine-v5:xxxxxxxxx',
  stateMachineArn: 'arn:aws:states:ap-northeast-1:xxxxxxxxxx:stateMachine:step-test-dev-testStateMachine-v5',
  name: 'xxxxxxxxxxxxxxxxxxx',
  status: 'SUCCEEDED',
  startDate: xxxxxx,
  stopDate: xxxxxx,
  input: '{}',
  output: '[{"Name":"hoge"},{"Name":"fuga"},{"Name":"piyo"}]' }

 

まとめ、ToDo

各Functionのリトライ周りがすっきりした。すごい。
今回の要件が限定的なこと、StepFunctionsのテスト的なこともあって「常に複数のParallel Functionを静的に確保する」構成を作ってみたが、やはりここは何か気持ち悪い。
やっぱこれ、動的にParallel叩けるべきじゃないっすかねえ…。

次はこの辺考える。

  • フロー自体がFailした時どこに投げるのが使用側が楽か
  • マイクロサービス化する上で使用側のAPIはどうするべきなのか
  • 最終的にActivtyステートでGoのワーカーを待ち受けさせ、RDSにデータを入れる

Read More

ブラウザからS3に巨大なファイルを低メモリで送りつけるアレ

ブラウザから8GB以上の巨大なファイルや多量のファイルをS3に送りつけるには、MultiPart Uploadの機能を使っていく必要がある。
具体的には、データを5MB以上のchunkに分け、分割して送りつけることになる。
データが8GBいかないケースでも、ファイル読み込み量を分割できる機構が使えるので、メモリにも優しい。
それなりに使う場面はありそう。

1.とりあえずドキュメント通り実装して徳を積む

マルチパートアップロードの概要
Multipart Upload API を使用したオブジェクトのアップロード

(1) S3のCORS設定

  • ExposeHeader => ETag を指定する。
  • AllowHeader
<AllowedHeader>Authorization</AllowedHeader>
<AllowedHeader>Content-Type</AllowedHeader>
<AllowedHeader>User-Agent</AllowedHeader>
<AllowedHeader>x-amz-date</AllowedHeader>
<AllowedHeader>x-amz-user-agent</AllowedHeader>
<AllowedHeader>x-amz-storage-class</AllowedHeader>
<AllowedHeader>x-amz-acl</AllowedHeader>

この辺を設定する。PUT権とかはあとで動的に付与する。

(2) ブラウザのfile api

ブラウザのfile api でファイル名やbyte数取得。
 
この段階ではFileReaderによるメモリ上への展開は行わないこと。
 
余裕があればWebWorkerで別スレッド立ててUIスレッドへの影響範囲を小さくする。
 

(3) サーバーから署名付きURL取得

ブラウザからのsignedURL発行要求を認可,返却。
S3のbucketやオブジェクトに対し、PUTやHEADリクエストが通るようにする。
 
この辺までは割といつも通り。
 

(4) S3::createMultipartUpload

S3の該当keyに対して createMultipartUpload 要求。uploadIdを取得。
要するに「今から分割ファイルでアップロードするぜ?」という宣言みたいなの。
このuploadIdは今後のリクエストすべてに同梱することとなる。
 

(5) 分割したblobを送りつける S3::uploadPart

 
FileReaderにいきなり巨大なfileオブジェクト食わすと多分2GBあたりでブラウザが音も立てず死ぬ。
chunk送信用のループ内でFile::slice(startByte , endByte)してblob化して都度捨てるのがミソ

 
この部分しかメモリ上に乗らないようになる。これが可能なのよく知らなかった。
並列アップロード数のロジックとかchunkごとの大きさ(5MB以上)とかは様子見ながら好きにカスタムしたらいいんじゃないかな。
 
この時帰ってくるETag値と、送ったPartNumber値はこういう形式でとっておく。

const multipartMap= { Parts: [] };

//...ループ内
multipartMap.Parts[partNumber - 1] = {
    ETag: 返ってきたETag値,
    PartNumber: partNumber
};

(6) 完了 S3::completeMultipartUpload

 
すべて送り終わったら、completeMultipartUploadリクエストを送る。
この時、先ほどのmultipartMapを同梱して送りつける。
これで、初めてS3のバケット上にオブジェクトができる。
 

Code

4-6まではこんな感じになります。今回はaws-sdk使用。全部Promise返すモードがいつの間にかついてて良い。
(並列アップロードのロジック、chunkごとの大きさ指定(5MB以上)とかエラー処理は記事のためあえて省いてる。)

const upload = async (s3 ,  s3Params , file)=>{

    const mime = Mime.lookup(file.name);
    const multiPartParams = s3Params.ContentType ? s3Params : {ContentType : mime , ...s3Params};
    const allSize = file.size;

    const partSize = 1024 * 1024 * 5; // 5MB/chunk

    const multipartMap = {
        Parts: []
    };

    /*  (4)   */
    const multiPartUploadResult = await s3.createMultipartUpload(multiPartParams).promise();
    const uploadId = multiPartUploadResult.UploadId;

    /*  (5)  */
    let partNum = 0;
    const {ContentType , ...otherParams} = multiPartParams;
    for (let rangeStart = 0; rangeStart < allSize; rangeStart += partSize) {
        partNum++;
        const end = Math.min(rangeStart + partSize, allSize);

        const sendData = await new Promise((resolve)=>{
            let fileReader =  new FileReader();

            fileReader.onload = (event)=>{
                const data = event.target.result;
                let byte = new Uint8Array(data);
                resolve(byte);
                fileReader.abort();
            };
            const blob2 = this.file.slice(rangeStart , end);
            fileReader.readAsArrayBuffer(blob2);
        })

        const progress = end / file.size;
        console.log(`今,${progress * 100}%だよ`);

        const partParams = {
            Body: sendData,
            PartNumber: String(partNum),
            UploadId: uploadId,
            ...otherParams,
        };
        const partUpload = await s3.uploadPart(partParams).promise();

        multipartMap.Parts[partNum - 1] = {
            ETag: partUpload.ETag,
            PartNumber: partNum
        };
    }

    /* (6) */
    const doneParams = {
        ...otherParams,
        MultipartUpload: multipartMap,
        UploadId: uploadId
    };

    await s3.completeMultipartUpload(doneParams)
        .promise()
        .then(()=> alert("Complete!!!!!!!!!!!!!"))
}

 
理屈上、fileObjectの参照が残ってれば,中断やキャンセル、断片ごとのリトライも可能なはず。
S3 Multipart Upload Request のタイムアウト設定を長めにしておいてもいいかも。
 
 

2.めんどくさいのでOSSを使う

 
上記までをライブラリ化しようと思ってたけど、ある意味当然のごとく既に存在した。。
仕様も非同期チェーンも特に覚えたくない人は、早くお家帰りたいのでこっちのありものを使う。
 
EvaporateJS
 
一時停止やキャンセル、リトライ、chunkサイズの指定や同時送信数も指定可能。
webWorker上の別スレッドで動かすモードはついてないみたいだけど、よくまとまっててすごい。
ただしPromise使うので、typescriptやbabel等のプリプロセッサ通さない人は、es6-promiseあたりのポリフィルを念のため入れること。
以上です。

Read More