日記マン

モチベはGCP, MapReduce, MachineLearning, Docker, [Kotlin/Go/Python] です。アカデミックへの興味は情報幾何や計算論的神経科学など。

Kotlinでデータ加工クエリのDSLを書く

tl;dr

ありとあらゆるクエリをなんでもかんでもメソッドチェーンDSLで書きたい

メソッドチェーンシンタックスでパイプラインを考える

データ加工にはイマドキMapReduceとしてApache Beam *1 が優れています。
Beam Modelではそれぞれデータ加工ロジックを任意の粒度で切り分け、
パイプラインに肉付けしていくイメージでプログラミングしていきます。

Beamが提供しているCoreクラスを使えばかなり短く処理を記述できますが、
実際のデータ加工ロジックは複雑なことが多いのでDoFnサブクラスを定義し、ParDo処理を行うことが多いです。

BeamはJava版の開発が盛んで、ver.2.2にもなって充実さが増してきました。
しかしJavaの書き方は少し冗長でBeam Modelの関数型ライクなモデリングとは相性が微妙です。
そのため僕は同じJVM言語であるKotlinを選択します。

val p = Pipeline.create(options) // パイプライン作成

p.apply("Read from BigQuery", BigQueryIO.readTableRows().fromQuery(sql)) // データソースからデータを取得
    .apply("Phase 1", ParDo.of(Phase1Fn())) // 加工1
    .apply("Phase 2", ParDo.of(Phase2Fn())) // 加工2
    .apply("Phase 3", ParDo.of(Phase3Fn())) // 加工3
    // ...
    .apply("Write to Datastore", DatastoreIO.write().withProjectId("hogehoge")) // データシンクへデータを書き込み

クエリ = 関数(全データ) という考え方に落とし込むと、
データソースからデータを受け取りメソッドチェーンで関数を複数回適用していき、
手に入れたい形へ持っていく、というBeam Modelは比較的理解しやすいです。
Kotlinはmap, filter, reduceなどコレクション操作が充実しているため、
Beam Modelのデータ加工ロジックをKotlinのシンタックスで一度記述してしまえば
クエリをスムーズに考えることができます。

まずはパイプラインのイメージをDSLで書いてみる

データソースD1D2があるとします。
D1はBigQueryなどのDWHで、大量のユーザの行動ログデータがありここから柔軟な分析をしたいと思います。
D2MySQLで管理されたデータソースであり、ユーザデータなどが別WEBサービスからCRUDな操作が行われているものです。

D1はテーブルが日付分割されて1テーブルに1億レコードほども格納されていて、そこからSQLを叩いてある程度中間表現に絞っても1000万レコードほどが吐き出されるほどのビッグデータだとします。
D2はユーザ数分くらいのレコード数であり、その数は3000ほどだとします。

今回の場合、D1をメインとし、D2が副入力として結合されるパイプラインという構築方法をとります。

KotlinライクなDSLで書いてしまって、そのあとBeam Modelを実際に実装していくというやり方が個人的にしっくりきます。

D2.filter {
    DelFlg == 0
}.map {
    (
        , id
        , name
        , age
    ) as s1
}.forEach { s1 ->
    // D2は最終的にListとして保持する
    // Listを副入力としてD1のパイプラインにjoinする
    D1.filter {
        s1.id == it.id
    }.map {
        (
            s1.id
            s1.name
            s1.age
            it.date
            it.event
        ) as s2
    }
}

こんな感じでクエリをDSLで書いてみてしまえば、頭が整理されます。
あとはBeam Modelを実装していきます。

実際にBeam Modelを書く

// D2 -> List<s1>までのPTransformロジック
@DefaultCoder(AvroCoder::class)
data class UserInfo (
    var id: Long = 0L
    , var name: String = ""
    , var age: Long = 0L
)

class ReadUserFn : JdbcIO.RowMapper<UserInfo> {
    override fun mapRow(rs: ResultSet): UserInfo {
        val id = rs.getLong(1)
        val name = rs.getString(2)
        val age = rs.getLong(3)
        return UserInfo(id, name, age)
    }
}

class UserListFn : CombineFn<UserInfo, UserListFn.Accum, MutableList<UserInfo>>() {
    @DefaultCoder(AvroCoder::class)
    data class Accum (
        var master: MutableList<UserInfo> = mutableListOf<UserInfo>()
    )
    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, input: UserInfo): Accum {
        accum.master.add(input)
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        var merged = createAccumulator()
        accums.forEach {
            it.master.forEach { merged.master.add(it) }
        }
        return merged
    }

    override fun extractOutput(accum: Accum): MutableList<UserInfo> = accum.master
}

// D1と副入力List<s1>のマージロジック
@DefaultCoder(AvroCoder::class)
data class UserEventInfo (
    var user: UserInfo
    , var date: String
    , var event: String
)

class MapFn : DoFn<TableRow, UserEventInfo> {
    var view: PCollectionView<MutableList<UserInfo>>? = null

    constructor() {}
    constructor(_view: PCollectionView<MutableList<UserInfo>>) {
        view = _view
    }

    @ProcessElement fun processElement(c: ProcessContext) {
        val row = c.element()
        val side = c.sideInput(view) ?: return

        side.filter { s1 ->
            s1.id == row.get("id").toString().toLongOrNull()
        }.map { s1 ->
            val date = row.get("date").toString() ?: return
            val event = row.get("event").toString() ?: return
            UserEventInfo(s1, date, event)
        }.forEach { s2 ->
            c.output(s2)
        }
    }
}

fun main(args: Array<String>) {
    // ...

    val p = Pipeline.create(options)

    val jdbcInput = JdbcIO.read<UserInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("username")
            .withPassword("password")
        ).withQuery(d2Sql)
        .withCoder(AvroCoder.of(UserInfo::class.java))
        .withRowMapper(ReadUserFn())

    val view = p.apply("Read from D2", jdbcInput)
        .apply("Create List<s1>", Combine.globally(UserListFn()).asSingletonView())

    p.apply("Read from D1", BigQueryIO.readTableRows().fromQuery(d1Sql))
        .apply("Merge List<s1>", ParDo.of(MapFn(view)).withSideInputs(view))
}

SQL <=> メソッドチェーンの互換

世の中には逆にSQLでなんでも書きたいと考える人がいます。 そんなSQL信者にも朗報として、さきほどのDSLから「SELECT <=> map」「WHERE <=> filter」など変換則を見出しさえすれば、比較的簡単にコンバート可能です。

SELECT s1.id, s1.name, s1.age, date, event
    FROM (
        SELECT id, date, event
            FROM D1
    ) as d1
    JOIN (
        SELECT id, name, age
            FROM D2
            WHERE DelFlg = 0
    ) as s1
    ON d1.id = s1.id

SQLインターフェースが提供されているデータベースによってオプティマイザが異なります。
実際にはJOIN句の左と右どちらがデータサイズが大きいものを選ぶべきかはデータベースによって異なります。

まとめ

大概、パイプラインに関数を適用していくやり方は クエリ = 関数(全データ) という方程式で説明されるようにデータ加工ロジックと相性があいます。
こんな感じでKotlinの簡潔なコレクション操作シンタックスで一度DSLを書いてみて、
そのあとBeam Modelを書いてしまう、というのが思考のコンテキストスイッチが発生せず生産性が高くなると見込んでいます。
楽しいデータフローライフを!!!!!!!!

*1:Cloud DataflowがOSS