GCPでオレオレGoogle Analytics(リアルタイム版)を開発する(その3)
前回まででブラウザ→Cloud Functions→Pub/Subとデータを送るところができたので、今回はPub/Sub→Dataflow→BigQueryとデータを格納するところを作る。
1. BigQueryにデータを格納する箱を作る
まずは一番後ろのBigQueryにデータセット・テーブルを作成する。データセットは普通のデータベースでいうところのスキーマ。
データセットは簡単に作れるから特にキャプチャなし、テーブルはとりあえずこんなもんでいいんでなかろうか。timestampはエポックミリ秒なのでInteger、それ以外は全部String。
実運用で大量のデータを計測・分析するなら日付カラムを追加してパーティショニングした方がいいと思うけど、とりあえずこれは実験なので設定しない。確かパーティションの最大数が4000だったと思うけど日付でパーティショニングすれば10年以上のデータもてるんで十分すぎるでしょう。
ちなみに前回Cloud Functionsで作成したJSON形式のメッセージ、keyがカラム名と一致している必要がある。(参考)
Pub/Sub メッセージは JSON 形式となっている必要があります。{"k1":"v1", "k2":"v2"} という形式でフォーマットしたメッセージは、BigQuery テーブルの k1 と k2 の 2 列に文字列データ型で挿入されます。
2. Pub/Subでデータをやり取りするためのトピックを作る
ここは名前指定してトピック作るだけなので簡単。スクリーンショットも不要。
メッセージングプラットフォームに慣れている人にはどうってことはないのかもしれないけれど、Pub/Subでわかりづらかったのは用語。パブリッシャー・トピックくらいまではOKだったんだけどサブスクリプションとサブスクライバーってええ?別物なの?みたいな。
結局のところ、
- パブリッシャー:メッセージの送信者
- トピック:メッセージが一時的に置かれる箱、送信者はトピックを指定してメッセージを送信する
- サブスクリプション:トピックにぶら下がってメッセージを受ける人。トピックとの関係は1 : n。全てのサブスクリプションにメッセージが届けられると、そのトピックからメッセージが消される。
- サブスクライバー:サブスクリプションに届いたメッセージを拾って処理する人(アプリ)。サブスクライバーがメッセージを拾うとそのメッセージはサブスクリプションから削除される。
ということだと思う。細かい話は公式ドキュメントを読んでください。
3. Pub/SubとBigQueryをつなぐDataflowを設定する
ここもまあそんなに難しくないかな・・・「テンプレートからJOBを作成」を選んであとはポチポチ入れるだけ。ここで重要なのはテンプレートに「Pub/Sub Topic to BigQuery」を選ぶこと。
そうすると、データの入り口となるトピック名と格納先となるBigQueryのテーブル名の指定・あとはGCS上の一時領域の指定があるので適切に設定すればOK。
ブラウザからビーコンを送って数分後にBigQueryのテーブルにデータが入ればOK。入らない場合は
- ブラウザのDeveloper Toolでビーコンが正しく飛んで200番のレスポンスがあることを確認
- Cloud FunctionsのログでメッセージがTopicにプッシュされていることを確認
- データフローのログやら見て頑張ってエラー箇所を探す
という感じでデバッグだろうか。
あと、何かしらの不具合によってゴミメッセージがトピックにプッシュされた場合はエラーメッセージ退避用テーブル(元のテーブル名+"_error_records")が自動で作成されてそこに流し込まれていた(Dataflowのテンプレートをよく見ると異常系の処理が定義されていた)。
ここまできてよくわからなかったこと(ほとんどDataflow)
一応データパイプラインを一通り構築できたにはできたけれども依然としていくつかわからないことが残った。おいおい確認していくつもり。
- Dataflowの作成時に指定した一時領域は何に使われるのか。気にしなくてもいいのかもしれないけど。
- Dataflowテンプレートのコードを改変してデプロイできるのか。
- 一度停止したDataflowジョブは再起動できないのか。ひょっとして使い捨て?
- 「Pub/Sub Topic to BigQuery」のテンプレートを使用した場合はサブスクリプションが暗黙的に自動生成されると考えられるが、それはどこで確認できるのか。