日記マン

動画広告プロダクトしてます。Go, Kubernetesが好きです。

Apache Beam Streaming をGCPサービスに書き込む備忘録

Apache Beam のストリーミングモードによる実装 (Cloud Pub/Subからプルしてくるとか) で、

ファイルストレージサービスである Google Cloud Storage ,
NoSQLの Cloud Datastore ,
そして Cloud Firestore ,

へ書き込む方法を個人的備忘録メモ。
公式のexamplesにはなく、それでもわりと実装する機会があるので。

Google Cloud Storage に書き込む

FileIO.writeDynamic を使うことで、
流れてくるデータ InputT から動的に DestT を決定し、
DestT をもとに withNaming を用いて柔軟にファイル名を決定できる。
ファイルに書き込む内容などのファイルリソースに対するハンドリングは FileIO.Sink<InputT> クラスを拡張して行う。

val toGCS = FileIO.writeDynamic<DestT, InputT>()
    .by { (InputT) -> DestT }
    .withDestinationCoder(CoderT)
    .withNaming { (DestT) -> FileIO.Write.FileNaming { (_, _, _, _, _) -> String } }
    .via(MySink())
    .withNumShards(1)
    .to(path) // path = "gs://.../"

class MySink: FileIO.Sink<InputT> {
    var writer: PrintWriter? = null

    override fun flush() {
        writer!!.flush()
    }

    override fun open(channel: WritableByteChannel?) {
        writer = PrintWriter(Channels.newOutputStream(channel!!))
    }

    override fun write(element: InputT?) {
        writer ?: return
        element ?: return
        // val output = ...
        writer!!.println(output)
    }
}

Cloud Datastore に書き込む

Cloud Datastore のIO DatastoreIO が提供されているがなんだかうまくいかないので
自分でDoFnを拡張した。

Datastore クライアントは Serializable ではないため DoFnサブクラスのコンストラクタに渡すことはできない。
Datastore クライアントの初期化に必要な Options は、 startBundle メソッドの引数 StartBundleContext から取得できる。

Element 1つ1つを書き込むのではなく、BatchPutが望ましい。
500/50/5 書き込みルールがあるため、余裕を持って 200 に設定している。

compile 'com.google.cloud:google-cloud-datastore:1.73.0'
class CloudDatastoreWriteFn(val kind: String): DoFn<InputT, Void>() {
    private val BATCH_LIMIT_SIZE = 200
    private var client: Datastore? = null
    private var mutations = mutableListOf<InputT>()

    @StartBundle
    fun startBundle(c: StartBundleContext) {
        val options = c.pipelineOptions.`as`(GcpOptions::class.java)
        client = DatastoreOptions.newBuilder()
                .setCredentials(options.gcpCredential)
                .setProjectId(options.project)
                .build().service
    }

    @ProcessElement
    fun processElement(c: ProcessContext) {
        mutations.add(c.element())
        if (mutations.size > BATCH_LIMIT_SIZE) {
            flushBatch()
        }
    }

    @FinishBundle
    fun finishBundle() {
        flushBatch()
    }

    private fun flushBatch() {
        val batch = client!!.newBatch()
        mutations.forEach {
            val key = client!!.newKeyFactory().setKind(kind).newKey(KeyT)
            val entity = Entity.newBuilder(key)
                    // .set(String, ValueT)
                    // ...
                    .build()

            batch.put(entity)
        }
        val result = batch.submit()
        mutations = mutableListOf()
    }
}

Cloud Firestore

Firestore はそもそもIOが提供されていない。
前述のDatastoreとやり方はあまり変わらない。

compile 'com.google.firebase:firebase-admin:6.8.1'
class CloudFirestoreWriteFn(val collection: String): DoFn<InputT, Void>() {
    private val BATCH_LIMIT_SIZE = 200
    private var client: Firestore? = null
    private var mutations = mutableListOf<InputT>()

    @StartBundle
    fun startBundle(c: StartBundleContext) {
        val options = c.pipelineOptions.`as`(CustomOptions::class.java)
        client = FirestoreOptions.newBuilder()
                .setCredentials(options.gcpCredential)
                .setProjectId(options.project)
                .setTimestampsInSnapshotsEnabled(true)
                .build().service
    }

    @ProcessElement
    fun processElement(c: ProcessContext) {
        mutations.add(c.element())
        if (mutations.size > BATCH_LIMIT_SIZE) {
            flushBatch()
        }
    }

    @FinishBundle
    fun finishBundle() {
        flushBatch()
    }

    private fun flushBatch() {
        val batch = client!!.batch()
        mutations.forEach {
            val ref = client!!.collection(collection).document(InputT::document)
            batch.set(ref, InputT::value) // batch.set(DocumentReference, HashMap<String, Any>)
        }
        val result = batch.commit()
        mutations = mutableListOf()
    }
}