オモンパカリスト

深層学習、計算論的神経科学に興味あります

Kotlinでデータ加工クエリのDSLを書く

tl;dr

ありとあらゆるクエリをなんでもかんでもメソッドチェーンDSLで書きたい

メソッドチェーンシンタックスでパイプラインを考える

データ加工にはイマドキMapReduceとしてApache Beam *1 が優れています。
Beam Modelではそれぞれデータ加工ロジックを任意の粒度で切り分け、
パイプラインに肉付けしていくイメージでプログラミングしていきます。

Beamが提供しているCoreクラスを使えばかなり短く処理を記述できますが、
実際のデータ加工ロジックは複雑なことが多いのでDoFnサブクラスを定義し、ParDo処理を行うことが多いです。

BeamはJava版の開発が盛んで、ver.2.2にもなって充実さが増してきました。
しかしJavaの書き方は少し冗長でBeam Modelの関数型ライクなモデリングとは相性が微妙です。
そのため僕は同じJVM言語であるKotlinを選択します。

val p = Pipeline.create(options) // パイプライン作成

p.apply("Read from BigQuery", BigQueryIO.readTableRows().fromQuery(sql)) // データソースからデータを取得
    .apply("Phase 1", ParDo.of(Phase1Fn())) // 加工1
    .apply("Phase 2", ParDo.of(Phase2Fn())) // 加工2
    .apply("Phase 3", ParDo.of(Phase3Fn())) // 加工3
    // ...
    .apply("Write to Datastore", DatastoreIO.write().withProjectId("hogehoge")) // データシンクへデータを書き込み

クエリ = 関数(全データ) という考え方に落とし込むと、
データソースからデータを受け取りメソッドチェーンで関数を複数回適用していき、
手に入れたい形へ持っていく、というBeam Modelは比較的理解しやすいです。
Kotlinはmap, filter, reduceなどコレクション操作が充実しているため、
Beam Modelのデータ加工ロジックをKotlinのシンタックスで一度記述してしまえば
クエリをスムーズに考えることができます。

まずはパイプラインのイメージをDSLで書いてみる

データソースD1D2があるとします。
D1はBigQueryなどのDWHで、大量のユーザの行動ログデータがありここから柔軟な分析をしたいと思います。
D2MySQLで管理されたデータソースであり、ユーザデータなどが別WEBサービスからCRUDな操作が行われているものです。

D1はテーブルが日付分割されて1テーブルに1億レコードほども格納されていて、そこからSQLを叩いてある程度中間表現に絞っても1000万レコードほどが吐き出されるほどのビッグデータだとします。
D2はユーザ数分くらいのレコード数であり、その数は3000ほどだとします。

今回の場合、D1をメインとし、D2が副入力として結合されるパイプラインという構築方法をとります。

KotlinライクなDSLで書いてしまって、そのあとBeam Modelを実際に実装していくというやり方が個人的にしっくりきます。

D2.filter {
    DelFlg == 0
}.map {
    (
        , id
        , name
        , age
    ) as s1
}.forEach { s1 ->
    // D2は最終的にListとして保持する
    // Listを副入力としてD1のパイプラインにjoinする
    D1.filter {
        s1.id == it.id
    }.map {
        (
            s1.id
            s1.name
            s1.age
            it.date
            it.event
        ) as s2
    }
}

こんな感じでクエリをDSLで書いてみてしまえば、頭が整理されます。
あとはBeam Modelを実装していきます。

実際にBeam Modelを書く

// D2 -> List<s1>までのPTransformロジック
@DefaultCoder(AvroCoder::class)
data class UserInfo (
    var id: Long = 0L
    , var name: String = ""
    , var age: Long = 0L
)

class ReadUserFn : JdbcIO.RowMapper<UserInfo> {
    override fun mapRow(rs: ResultSet): UserInfo {
        val id = rs.getLong(1)
        val name = rs.getString(2)
        val age = rs.getLong(3)
        return UserInfo(id, name, age)
    }
}

class UserListFn : CombineFn<UserInfo, UserListFn.Accum, MutableList<UserInfo>>() {
    @DefaultCoder(AvroCoder::class)
    data class Accum (
        var master: MutableList<UserInfo> = mutableListOf<UserInfo>()
    )
    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, input: UserInfo): Accum {
        accum.master.add(input)
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        var merged = createAccumulator()
        accums.forEach {
            it.master.forEach { merged.master.add(it) }
        }
        return merged
    }

    override fun extractOutput(accum: Accum): MutableList<UserInfo> = accum.master
}

// D1と副入力List<s1>のマージロジック
@DefaultCoder(AvroCoder::class)
data class UserEventInfo (
    var user: UserInfo
    , var date: String
    , var event: String
)

class MapFn : DoFn<TableRow, UserEventInfo> {
    var view: PCollectionView<MutableList<UserInfo>>? = null

    constructor() {}
    constructor(_view: PCollectionView<MutableList<UserInfo>>) {
        view = _view
    }

    @ProcessElement fun processElement(c: ProcessContext) {
        val row = c.element()
        val side = c.sideInput(view) ?: return

        side.filter { s1 ->
            s1.id == row.get("id").toString().toLongOrNull()
        }.map { s1 ->
            val date = row.get("date").toString() ?: return
            val event = row.get("event").toString() ?: return
            UserEventInfo(s1, date, event)
        }.forEach { s2 ->
            c.output(s2)
        }
    }
}

fun main(args: Array<String>) {
    // ...

    val p = Pipeline.create(options)

    val jdbcInput = JdbcIO.read<UserInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("username")
            .withPassword("password")
        ).withQuery(d2Sql)
        .withCoder(AvroCoder.of(UserInfo::class.java))
        .withRowMapper(ReadUserFn())

    val view = p.apply("Read from D2", jdbcInput)
        .apply("Create List<s1>", Combine.globally(UserListFn()).asSingletonView())

    p.apply("Read from D1", BigQueryIO.readTableRows().fromQuery(d1Sql))
        .apply("Merge List<s1>", ParDo.of(MapFn(view)).withSideInputs(view))
}

SQL <=> メソッドチェーンの互換

世の中には逆にSQLでなんでも書きたいと考える人がいます。 そんなSQL信者にも朗報として、さきほどのDSLから「SELECT <=> map」「WHERE <=> filter」など変換則を見出しさえすれば、比較的簡単にコンバート可能です。

SELECT s1.id, s1.name, s1.age, date, event
    FROM (
        SELECT id, date, event
            FROM D1
    ) as d1
    JOIN (
        SELECT id, name, age
            FROM D2
            WHERE DelFlg = 0
    ) as s1
    ON d1.id = s1.id

SQLインターフェースが提供されているデータベースによってオプティマイザが異なります。
実際にはJOIN句の左と右どちらがデータサイズが大きいものを選ぶべきかはデータベースによって異なります。

まとめ

大概、パイプラインに関数を適用していくやり方は クエリ = 関数(全データ) という方程式で説明されるようにデータ加工ロジックと相性があいます。
こんな感じでKotlinの簡潔なコレクション操作シンタックスで一度DSLを書いてみて、
そのあとBeam Modelを書いてしまう、というのが思考のコンテキストスイッチが発生せず生産性が高くなると見込んでいます。
楽しいデータフローライフを!!!!!!!!

*1:Cloud DataflowがOSS

VAEと時系列を扱うVRAE

Google日英翻訳がNMT(ニューラルネットワークを利用した翻訳モデル)になったらしいです。
文語ベースでバリバリ翻訳してくれてるみたいで、
単語にマウスオーバー当てても単語間の対応関係がわからなくなったのはさみしいけど、
たしかに精度がよくなったような、そんな気がするぜ。
というわけで英語論文めっちゃ読んだろ。。

VAEで何かしたいなーと思ってて、時系列を扱えるVRAE周りを調べました。

Variational AutoEncoder

Variational AutoEncoderをついにしっかり理解できた気がする。

はっつーさんのkeras実装ブログがとても勉強になりました。
有難うございます。
ralo23.hatenablog.com

Variational AutoEncoderについてまとめたスライドを作りました。

続きを読む

情報幾何学の参考書2つ。

情報幾何学を勉強中。
目的はDeep Neural Networkの解析。

情報幾何学

  • Fisher情報量(情報行列) 確率変数がパラメーターに関して持つ情報量
  • Riemann計量 微分幾何学ででてくる計量

若き頃のC.R.Raoが「Fisher情報行列とRiemann計量って一緒じゃね?」と言い始め、
いろいろな可能性が湧き出てきて、甘利先生、長岡先生たちが理論体系としてまとめあげた。
情報幾何学のはじまりはじまり。

確率分布空間を多様体構造で幾何的に捉える。
狭義では「双対アファイン接続の微分幾何学」。

続きを読む

情報幾何の勉強。機械学習の数理的研究。

(勉強中のブログです。気をつけてるんですが厳密さに欠けることも書きがちなので、あくまで備忘録という免罪符を利用します。暖かい目でみてください。)

tl;dr

情報幾何学という分野を知った

機械学習、深層学習の仕組みを理解する一つの方法論
日本から生まれた理論体系。

続きを読む

Kerasつかってみた。紹介とか。おすすめサイトとか。

深層学習フレームワークとしてKerasを使ってみたら結構楽しかったです。
直感的なレイヤー型記述はChainerに似てます。
Theano または TensorFlowのラッパーとして動くので低レベル記述も効いて汎用性は高そうです。

おすすめのサイト(随時更新するつもり)

続きを読む

機械学習・深層学習の勉強したこと・したいこと、このブログの方針

ちょっと予告とは違う更新だけど。。

深層学習(Deep Learning)というものを知り、いろいろ調べて勉強したりした一年
紆余曲折がありつつも、Pythonを初めて扱い深層学習以外の機械学習全般に興味を持った
今取り組んでいることは、「機械学習を基礎から身につけ、pythonで実装していく力を身につける。」

ある程度参考書やサイトを漁った結果、自分なりのベストプラクティスが固まりつつある(前もこんなこと言った)

このブログは息抜き程度の備忘録の位置づけにしたいので、技術ブログではないことをあやふやな記述の免罪符にしたい
わりとまじで。切実に。

機械学習のベストプラクティス(今現在)

数学の事前準備

さすがに微分積分と行列、確率はある程度必要になる。
個人的に行列の微分が苦手。現役の学生なので覚えてるが、
一切を忘れてしまったという方は数学の参考書で数式慣れしたほうがいいかも

超初心者

Python触ったことないYO!なひと。

Numpyの勉強はここがためになる
機械学習の Python との出会い — 機械学習の Python との出会い

PythonじゃないけどMATLABに似たようなやつでOctaveという言語がある。
Courseraのビデオ学習が勉強になる。
www.coursera.org

日本語の字幕がユーモアあって楽しい。
ちなみにNg先生は深層学習での大家でもある

Kaggle(様々な問題(データセット)に世界中のデータサイエンティストが正答率を競い合うプログラミングサイト)に登録もしておく

qiita.com

いずれはKaggleに!と常々思っていたのでこのQiita記事の後押しもあり登録してみた
スコアの高いコードが参考になる

深層学習を学びたいひとは、大変だけど論文サーベイもできたらいいと思う
この一年で1000以上も投稿されてさすがに読みきれないので、
slideshareでまとめてくれてるひとのスライドにお世話になることも多い
Twitterはさみとぎ屋というアカウントで技術や論文概要をRTしたりしてるので
よかったらフォローしてください

続きを読む

確率的なDeep Learning。生成モデル。~その1~

深層学習(Deep Learning)がバズワードになって何年目だ。
画像認識コンテストでDeepなヤツが優勝したのが2012年で4年前だ。
Deepの先駆者DBN[Hinton 2009]からだと7年か。
2014-2015の間だけでも1500もの関連論文が発表されているらしい。鬼のようだ。

軽く歴史を振り返ると、制限付きボルツマンマシンを「多段に積み上げた」(深層と呼ばれる所以)
Deep Belief Network[Hinton 2009]が新時代の幕上げだった。

Deepという名前もここから実用的に使われだしたっぽい、多分。

そして今、非常に気になってるのがVAEとGAN、そしてEBMにGANのテクを使ったヤーツ[Kim+ 2016]。
こいつらは生成モデルの最新技術なヤーツ達だ。
こいつらについてちょっと紹介したいので、その前に統計的機械学習とボルツマンマシンについてまとめたあと
この人類の希望達を語っていこうと思う。

別に論文じゃないし精査せずに抽象的に書いてく。厳密性や信憑性は求めないでください

続きを読む