athena、Digdag、Embulkを使って予算消化済みの広告配信を停止する
こんにちは。Wano株式会社で動画広告配信のエンジニアをやっている島袋です。
直近1ヶ月で取り組んでいた、広告の配信停止処理の改善について紹介したいと思います。
予算とは?消化とは?配信停止とは?人生とは?
私が所属しているVeleT事業部では、動画広告配信のDSP事業をやっております。
広告配信はクライアントから頂いた予算に基づいて配信を行うので、予算を超過して配信しないように、適切な停止処理を行う必要があります。
(配信と、コストの発生が非同期であるため、予算ぴったりで配信を停止するというのは難しく、超過幅をなるべく小さくする必要があります。)
(なお、1日あたりの予算設定があるので、超過額の分は後続の日程の予算を調整します。)
既存の構成(Kinesis Data Stream + Lambda)
既存の仕組みは、広告配信サーバから発生する配信実績ログをFluentdを使ってKinesis Data Streamに流し、Lambdaがそれを取り出して料金計算をし、案件毎の予算と突合し、超過しているものについては配信を停止するというものでした。
こういう流れです。
Webサーバ(nginx)->Fluentd->Kinesis Data Stream->Lambda
この構成だと、ログの流量が少ないうちは問題なく動作するのですが、LambdaがRDS(MySQL)を読み書きしているため、ログの流量が大きくなってくると、Lambdaのところで詰まりだし、配信停止が間に合わなくなります。
(RDSでのデータの持ち方の問題もありますが)
改善
Lambdaで逐次処理せず、いったんs3に貯める
kinesis data streamから先を以下のように変更しました
kinesis data stream->kinesis data firehose->s3
ログの配置先を、Glue Data Catalogでテーブル定義しておき、年、月、日、時の単位でパーティションを設定しておきます。
athenaを使って、ログのカウントをとる
athenaはAWSのフルマネージドprestoみたいなやつです。
詳細は省きますが、現在時刻のパーティションに絞って、(動画、配信先メディア、Vast Tracking Event)でgroup byをかけて、何行あるかカウントをとります。
(Vast Tracking Eventというのは、VASTと呼ばれる動画広告配信の共通フォーマットで定義されたイベントのことです。例:再生開始、一時停止など)
こんな感じで
SELECT
動画ID, メディアID, イベントID, COUNT(*) AS CNT
FROM 配信実績ログ
WHERE YEAR = "2019" and MONTH = "03" and DAY = "15" and HOUR = "01"
GROUP BY 動画ID, メディアID, イベントID
athenaのクエリ結果をEmbulkでMySQLに挿入する
athenaのクエリ結果はcsvフォーマットでs3に保存されるので、Embulkを使ってMySQLに一括挿入します。
Embulkはトレジャーデータが開発したOSSのバルクローダーです。
Embulkは入力元、出力先の対応プラグインを導入し、設定ファイルを書くだけでお手軽にバルクロードができるすごいやつです。大好き
コスト計算、予算超過判定
上記で挿入したデータに基づきコスト計算、予算超過判定をSQLで実行します。(MySQLで)
予算超過した動画の配信停止
あとはredisから対象の動画IDを取り除くことで、配信停止になります。
(配信サーバにredisを参照させることで別のつらみもあるのですが、それはまた別の話)
一連の処理をDigdagで実装する
上記の一連のフローをDigdagで実装しました。
Digdagはトレジャーデータが開発したOSSのワークフローエンジンです。
上記のフローはシェルスクリプトでもいけるんですが、Digdagだと宣言的にフローがかけるし、リトライ処理などもお手軽です。
このDigdagで実装したバッチを1分周期で実行させてます。
改善後
s3のパーティションに貯まってる分を一括で処理するので、kinesisで詰まるといったことがなく、超過幅を小さくすることに成功しました(具体的な数値は出せないですが。。。)
しかし、この構成は以下の問題があります。
微妙に超過判定対象から漏れるログが出てくる
1分周期で動作するということは、59分に起動して、終了したあとの時刻が変わるまでのタイミングにs3に到着したログについては判定から漏れてしまいます。
(時刻が変わったら、処理対象のパーティションも変わるため)
(クライアントに請求する料金については別途バッチで計算しているので、漏れはありません)
現在は許容範囲ですが、1分あたりのログの流量が増えてくると無視できなくなります。
athenaが安定しない
athenaはたまにHIVE_CURSOR_ERRORでクエリが失敗します。(クエリ自体には問題がない)
athenaはAWSのユーザが計算リソース(EMRのクラスタ?)を共有するようなモデルなのか、リソースの逼迫によるエラーと思わしき状況に度々出くわします。
本来なら自前でEMRのクラスタを使ってprestoを利用したほうがよいのでしょうが、クラスタのお守りをしたくありません。。。。!!!!(そこに人的リソースを避けないというのが大きいです)
バッチの突き抜けの恐れ
今回のDigdagバッチは1分内に完了しているので、1分周期で実行できているのですが、ログ量の増加によって実行時間が長くなってくると(いわゆる突き抜け)、予算超過判定の間隔が大きくなって、超過幅が増大する恐れがあります。
(とはいえ、ログ量増加で処理時間が長くなるのはathenaのところなので、prestoに移行してノード数を増やせばなんとかなるのではと考えていますが)
感想
ピタゴラスイッチ作るの疲れた
広告配信サーバの高速化、スケールアウトの知見などはググると結構出てくるのですが、配信停止処理についてはあまり話が出てこないので、他社さんの事例を知りたいところです。
「俺が本物の配信停止のやり方を見せてやるよ」という方がいらっしゃいましたら、是非うちにいらしてください。
自由度の高い自社サービスでIPOを目指す!アドテクノロジーエンジニア