GCPでオレオレGoogle Analytics(リアルタイム版)を開発する(その3)
前回まででブラウザ→Cloud Functions→Pub/Subとデータを送るところができたので、今回はPub/Sub→Dataflow→BigQueryとデータを格納するところを作る。
1. BigQueryにデータを格納する箱を作る
まずは一番後ろのBigQueryにデータセット・テーブルを作成する。データセットは普通のデータベースでいうところのスキーマ。
データセットは簡単に作れるから特にキャプチャなし、テーブルはとりあえずこんなもんでいいんでなかろうか。timestampはエポックミリ秒なのでInteger、それ以外は全部String。
実運用で大量のデータを計測・分析するなら日付カラムを追加してパーティショニングした方がいいと思うけど、とりあえずこれは実験なので設定しない。確かパーティションの最大数が4000だったと思うけど日付でパーティショニングすれば10年以上のデータもてるんで十分すぎるでしょう。
ちなみに前回Cloud Functionsで作成したJSON形式のメッセージ、keyがカラム名と一致している必要がある。(参考)
Pub/Sub メッセージは JSON 形式となっている必要があります。{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1 と k2 の 2 列に文字列データ型で挿入されます。
2. Pub/Subでデータをやり取りするためのトピックを作る
ここは名前指定してトピック作るだけなので簡単。スクリーンショットも不要。
メッセージングプラットフォームに慣れている人にはどうってことはないのかもしれないけれど、Pub/Subでわかりづらかったのは用語。パブリッシャー・トピックくらいまではOKだったんだけどサブスクリプションとサブスクライバーってええ?別物なの?みたいな。
結局のところ、
- パブリッシャー:メッセージの送信者
- トピック:メッセージが一時的に置かれる箱、送信者はトピックを指定してメッセージを送信する
- サブスクリプション:トピックにぶら下がってメッセージを受ける人。トピックとの関係は1 : n。全てのサブスクリプションにメッセージが届けられると、そのトピックからメッセージが消される。
- サブスクライバー:サブスクリプションに届いたメッセージを拾って処理する人(アプリ)。サブスクライバーがメッセージを拾うとそのメッセージはサブスクリプションから削除される。
ということだと思う。細かい話は公式ドキュメントを読んでください。
3. Pub/SubとBigQueryをつなぐDataflowを設定する
ここもまあそんなに難しくないかな・・・「テンプレートからJOBを作成」を選んであとはポチポチ入れるだけ。ここで重要なのはテンプレートに「Pub/Sub Topic to BigQuery」を選ぶこと。
そうすると、データの入り口となるトピック名と格納先となるBigQueryのテーブル名の指定・あとはGCS上の一時領域の指定があるので適切に設定すればOK。
ブラウザからビーコンを送って数分後にBigQueryのテーブルにデータが入ればOK。入らない場合は
- ブラウザのDeveloper Toolでビーコンが正しく飛んで200番のレスポンスがあることを確認
- Cloud FunctionsのログでメッセージがTopicにプッシュされていることを確認
- データフローのログやら見て頑張ってエラー箇所を探す
という感じでデバッグだろうか。
あと、何かしらの不具合によってゴミメッセージがトピックにプッシュされた場合はエラーメッセージ退避用テーブル(元のテーブル名+"_error_records")が自動で作成されてそこに流し込まれていた(Dataflowのテンプレートをよく見ると異常系の処理が定義されていた)。
ここまできてよくわからなかったこと(ほとんどDataflow)
一応データパイプラインを一通り構築できたにはできたけれども依然としていくつかわからないことが残った。おいおい確認していくつもり。
- Dataflowの作成時に指定した一時領域は何に使われるのか。気にしなくてもいいのかもしれないけど。
- Dataflowテンプレートのコードを改変してデプロイできるのか。
- 一度停止したDataflowジョブは再起動できないのか。ひょっとして使い捨て?
- 「Pub/Sub Topic to BigQuery」のテンプレートを使用した場合はサブスクリプションが暗黙的に自動生成されると考えられるが、それはどこで確認できるのか。
GCPでオレオレGoogle Analytics(リアルタイム版)を開発する(その2)
前回の続き。次にCloud Functionsでブラウザから飛んできたビーコンを受ける口を作る。
1. Cloud Functionsを設定
とりあえずエンドポイント名とかは何でもいいと思うけど、注意点は未認証の呼び出しを許可する必要があるくらいだろうか。これだけでAPIコールのエンドポイントができる。なんていい時代なのか。
あとはNode.jsのコードをポチポチ書くくらい。色々弄りながら最終的にこんなコードになった。
/** * Responds to any HTTP request. * * @param {!express:Request} req HTTP request context. * @param {!express:Response} res HTTP response context. */ exports.trackBeacon = (req, res) => { console.log(Date.now()); var messageJson = { "ip" : req.ip ,"ua" : req.headers['user-agent'] ,"uid" : req.query.uid ,"url" : decodeURIComponent(req.query.url) ,"timestamp" : Date.now() }; var img = Buffer.from("iVBORw0KGgoAAAANSUhEUgAAAAEAAAABCAYAAAAfFcSJAAAADUlEQVR42mP8z/C/HgAGgwJ/lK3Q6wAAAABJRU5ErkJggg==", 'base64'); res.writeHead(200, { 'Content-Type': 'image/png', 'Content-Length': img.length }); res.end(img); const {PubSub} = require('@google-cloud/pubsub'); const pubSubClient = new PubSub(); async function publishMessage() { const topicName = 'projects/[プロジェクト名]/topics/[トピック名]'; const dataBuffer = Buffer.from(JSON.stringify(messageJson)); const messageId = await pubSubClient.topic(topicName).publish(dataBuffer); console.log(`Message ${messageId} published.`); } publishMessage().catch(console.error); };
ブラウザから受け取って後続処理に流すのはIPアドレス・User Agent・Unique Identifier(uidパラメタ)・リクエスト発生元URL(urlパラメタ)・タイムスタンプ(エポックミリ秒)。また、画面には1x1の透過PNGを返す。その後にメッセージをPubSubに投げる。package.jsonはこんな感じ。Node.jsとか慣れてないので地味にここの書き方やらバージョン指定に苦戦。
{ "name": "sample-http", "version": "0.0.1", "dependencies": { "@google-cloud/pubsub": "1.7.3" } }
ここに書いたバージョン番号はこのサイトで発見。
www.npmjs.com
2. 合わせてHTMLを変更
上記のパラメタに合わせてHTML側もこんな感じに変更。クッキーの扱いとかがやっつけなのは勘弁してください。
<!doctype html> <html> <body> <h1>Hello World!</h1> <script> //UID生成 var N=48 var uid = getCookie('uid'); if(!uid){ uid=""; for(var i=0; i<N; i++){ uid += S[Math.floor(Math.random()*S.length)]; } document.cookie = "uid=" + uid + "; max-age=36000; secure; samesite=lax;"; } //計測ビーコン var imgUrl = '[Cloud Functionsのエンドポイント]'; var img = document.createElement('img'); img.src=imgUrl + '?uid=' + uid + "&url=" + encodeURIComponent(location.href); document.body.appendChild(img); function getCookie(key){ var cookies = document.cookie; var cookiesArray = cookies.split(';'); for(var c of cookiesArray){ var cArray = c.split('='); if( cArray[0] == key){ return decodeURIComponent(cArray[1]); } } } </script> </body> </html>
GCPでオレオレGoogle Analytics(リアルタイム版)を開発する(その1)
あまりに放置しすぎて存在すら忘れていたこのブログ、久々に手を動かしたので備忘録を残す。
今回作るのはこんな感じ。GAみたくwebサイトのページにタグを埋めてビーコンを飛ばし、それをCloud Functionsで受け取ってBigQueryにストリーミングinsertし、リアルタイムで分析ができる環境を作る。今後いろいろ拡張していくかもしれないけど、取り急ぎは最小限データを流すところだけ頑張る。
この構成の場合、全てのコンポーネントがサーバレスなので何も意識しなくても勝手にスケールしてくれる。いい時代になったなあ・・・
1. まずはwebページを置くためのGCSバケット
というわけで、まずはCloud Functionsから・・・の前にブラウザでアクセスできるwebページを用意する。GCE上にwebサーバを立ててもいいんだけど、別にwebサーバを運用管理するのが今回の目的ではないのでGCSを使ってサクッと実現する(設定次第でCDNとしても使える)。
GCSのページで「バケットを作成」をクリックして
各種情報をつらつらと設定。バケット名はグローバルで一意なので注意。今回も「my-first-bucket12345」という適当な名前を設定してみたらすでに重複していたくらい一意。
ガチにCDNとして運用するならマルチリージョンにするとか日本のデータセンターを選ぶとかした方がいいんだと思うけど、今回はあくまでテストHTMLを置くだけなのであまり考えずにこんな設定。
さて、バケットができたらインターネットアクセスを許可する設定。権限タブから「メンバーを追加」をクリック。
allUsersに対してストレージオブジェクト閲覧者を設定する。
2. HTMLは超最低限でOK
ここまででGCSの設定はOKなので、こんな感じでindex.htmlを作ってアップロード。
<!doctype html> <html> <body> <h1>Hello World!</h1> </body> </html>
URLは
https://storage.googleapis.com/[バケット名]/ファイル名
になるので、アクセスしてこんな感じで見えればOK。
とりあえずHTML配置しておわっちゃったけど汗
次回以降ビーコン計測のあたりをやってきます。
Machene Learning Crash Course 4 - First Steps with TensorFlow #2
さあ、用語も覚えてついにTensorFlow!!
と思ったが、どうやらここからさらに先に進むためにはpandasを学ぶことは避けて通れないようである。pandasはSeriesとDataFrameという2つのデータ構造を提供するためのライブラリ、Seriesはちょっと便利な1次元配列でDataFrameはちょっと便利な2次元配列。
city_names = pd.Series(['San Francisco', 'San Jose', 'Sacramento']) population = pd.Series([852469, 1015785, 485199]) pd.DataFrame({ 'City name': city_names, 'Population': population })
のように自分でSeriesやDataFrameの変数を作ることもできるけれど、大抵はこんな感じでガツンとデータを読み込むんです。
california_housing_dataframe = pd.read_csv("https://download.mlcc.google.com/mledu-datasets/california_housing_train.csv", sep=",")
そして、読み込んだデータを色々したいときはこんな感じ。
california_housing_dataframe.head() #最初の数行を表示する california_housing_dataframe.describe() #各カラムの中央値や平均値・最大最小値などを表示する california_housing_dataframe.['longitude'][0] #longitudeカラムの最初のレコードを取得する california_housing_dataframe.['longitude']*2 #longitudeカラムの全部の値に2をかける
データへのアクセスは連想配列っぽい。まあとにかく、便利な配列ということがわかってれば前に進めそう。
牛歩のような進捗である・・・
Machene Learning Crash Course 4 - First Steps with TensorFlow #1
ついに来たTensorFlow、名前はよく聞くしML=TensorFlow的なイメージはあるけれど全然中身はわからないという。しかもたぶんガンガン機能追加とかされてるから適当に日本語サイトをググっても常に古めの情報になってしまうんでないかという懸念から更に億劫になって結局何もやらないという。今日はこの悪循環をついに断ち切ろうと思う。
First Steps with TensorFlow: Toolkit | Machine Learning Crash Course | Google DevelopersとTensorFlow Guide | TensorFlowを見比べると、どうやらC++のカーネルに対して複数レイヤーでのラッピングが行われた構成になっていて、実際のプログラミングにあたってはLow Level APIとHigh Level APIの両方が使えるようであることがわかる。多分Low Level APIを使って自前でモデルを組むこともできるし、やりたいことが結構一般的な話ならばHigh Level APIを使ってサクッと実現できるよってことなんじゃなかろうか。とはいえ、これ以外にも画像認識のためのVision APIやら文字起こしのためのSpeech APIみたいなものがGoogle Cloudから提供されていることを考えると、オーディエンスのレベルに応じて多くの抽象化レイヤーを提供してる感じ。ちなみにLow Level APIのことをTensorFlow Coreともいったりするみたい。推奨としてはまずいちばんHigh LevelなAPIを使用して、必要に応じてひとつずつ下っていけとのこと。このコースでもEstimatorがメインらしい。
ここで用語の確認。
- tensor : n次元配列のこと、0次元配列(=通常の数値とか文字列とか)も一応この定義によるとtensorの一部らしい。つまり変数とか定数とか、とにかくそういうのは全部tensorぽい。
- rank : tensorの次元数。rank 1 tensorというと1次元配列。
- shape : 配列に入ってる要素の数。
さらに、以下のような図がよく機械学習では出てくるけれど(qiita.comのリンク)、ここで
- graph : このフローチャートみたいな図のこと(処理の流れ)
- operation : 各ノード(○)。データに対する操作。
で各ノードをつないでいる棒がtensorらしい。で、この処理を実際に実行する場合にはsessionを作成して行う模様。多分graphとsessionはオブジェクト指向のclassとinstanceみたいな関係なんだろう。
さらにさらに、ここらへんのアイテムはhyperparameterというらしい。
- steps : iteration回数。1stepは1batchのlossを計算し、これをウェイトの調整に使う。
- batch size : 前回も出てきたけれど、一回のstepで使用するexample数。
なので、
total number of trained examples = batch size * steps
となる。
なお、これに関連する用語で
- period : レポーティングの粒度をコントロールする変数。もしstepが70でperoidが7なら10stepごとにlossが出力される。この値は通常変更しないらしい。(レポーティング用の変数なのでモデルの精度とかには特に影響しない)
Machene Learning Crash Course 3 - Reducing Loss
機械学習はiterative approachによって損失を減らす。iterative approachというのは若干直訳しにくいところがあるけれども、まあ要はたくさんのsampleを処理することによって徐々に損失の少ないweightとbiasによせていく、ということだろう。で、ずっと計算し続けていくとどこかのタイミングでもうweightとbiasがほぼ変わらない段階にたどり着いたら、そのモデルはconverged(収束)したと言える。
ではどのようにモデルをconvergeさせるか。
という最もシンプルなモデルを考えた時に、w1の全ての値についてその損失を計算すると以下のような値の分布になる。
この分布は唯一つの最小値を持ちそこでは傾きが0になるが、まさにそこがこのモデルの収束点である。
(と元の記事では書いてあるけれど、うっかりこれがn次関数で複数の極小点を持つようなexampleだったらどうするんだろう・・・)
このconvergenceをより効率的に求めるために使われる手法がgradient descent(最急降下法)というもの。
これは、このグラフの中でどこか1点を最初に決めて、そこから点をちょっとずつ動かしつつ計算をしてconvergenceを探すアプローチらしい。てかこれなら微分で一発なのでは??これは扱っているモデルがシンプルだからなのか?まあとにかく、実際にTensorFlowを使う際にはこの部分は全部カプセル化されてるから気にする必要ないらしい。
で、次にこの「ちょっとずつ」動かすと言った場合に実際のところどの程度点を移動させるんだろうか。というのがlearning rate(学習率、step sizeとも言うらしい)。このlearning rateのチューニングがキモで、ここを小さくしすぎると永遠にconvergenceに届かないしこれを大きくしてしまうとconvergenceを通り越してしまう(Optimizing Learning Rate | Machine Learning Crash Course | Google Developersで試してみるとわかる)。現実にはlearning rateを最適化することよりもconvergenceにたどり着くことが目的なので、そこそこのlearning rateを見つけるのが重要らしい。(ただここで具体的にどうやってそのそこそこなlearning rateを見つけるかという話には至っていない)
このgradient descentを行う際に一度のiterationで勾配を計算するのに使用するexampleの数をバッチというらしい。ここまでの説明だと上のようなグラフを書くのに使用するexample数って聞こえると思うんだけど、そのさきを読むと
- Stochastic gradient descent(確率的勾配降下法、一度のiterationで1つのexampleを使用する=バッチサイズ1)
- Mini-batch stochastic gradient descent(バッチサイズを10~1000としたもの)
らしい。バッチサイズを1にしたらそもそもグラフとか書けないし、となると結局バッチサイズというのはなんなんだ??という疑問が残る。でも機械学習の仕組み (Vol.4)を見るとややわかりやすいが、やはりここでいうバッチサイズはexample数っぽい。何かしら数学的な調整がうまいこと効いてより少ない計算量でより正確なモデル構築ができるらしい。(全然ピンときてないけど汗)
Machene Learning Crash Course 2 - Descending into ML
教師あり学習(Supervised ML)においてモデルを作るというと、そのモデルは
という式で表される(nはfeatureの数)ので、
- モデルに含めるfeatureを決定する
- トレーニングデータ(labeled examples)を使ってw(weight)とb(bias)を決定する
というステップになる。例えば前回の気温とアイスクリームの売れ行きの例で言えば、featureは気温のみなので
となる。
そして、このモデルと実際のデータの乖離をloss(損失)という。下図で矢印の大きさがloss(すいませんこのimage、本家からお借りしました)。左のモデルよりも右のモデルのほうが損失が少ないのでよりよいモデルであるといえる。
モデルの損失を最小化する手法は損失関数と呼ばれ(多分)、いろいろな種類があるけれどここでは二乗損失(squared less, L2 loss)がチラッと紹介されている。この損失関数は平均二乗誤差(mean square error, MSE)を最小化するようにするということなんだろう、
という禍々しい数式が書いてあるけれど、これはよく見ると単純に
featureの各値についてモデルで算出されたpredictionと実際の値の差分を二乗して、その平均を算出するというだけの話だった。
ちなみにこの方式で計算すると、前回のアイスクリームの売上と気温の関係式は
y' = 1.217582418x1 + 30.16923077
というモデルが構築できた。モデル構築というと物々しいけれど、単純にExcelの関数でちょいちょい計算しただけ(汗)。