日記マン

モチベはGCP, MapReduce, MachineLearning, Docker, [Kotlin/Go/Python] です。アカデミックへの興味は情報幾何や計算論的神経科学など。

社会人1年目振り返り

金沢から上京して、システムエンジニアとしての1年目を終えた。
つらつらと振り返ってみる。

4月入社。最初の2ヶ月に技術研修という題目で外部のスクールに通った。
WebやPCの基礎のキソの座学からJavaによるチーム開発演習までのカリキュラムで、
初学者にとっては重たいだろうけど正直開発経験のある自分にとっては少々退屈だった。
自分はLAMP環境によるWeb開発、MVCアーキテクチャ情報工学修了、
といったどこにでもいるWeb志望学生だったので、経験してきたそれらをなぞったような講習だった。
ただ、Javaでサーバサイドはしたことがなかったので、ひたすら知っていることの復習というわけでもなかったし学びはあった。

6月配属。配属理由は他の部署に比べ、技術的に面白そうだったから。
動画広告のビジネスモデルなどは全く無知だったが、サーバサイド・インフラに好奇心が向いている自分にとっては、
大量のトラフィックやいわゆるビッグデータと対峙することは、やはり学生時代には経験できなかったビジネスの世界ならではのことだから。
ここでアドテク事業あるあるかもしれないが、こういったバックエンド的な技術的課題があるため、
必然的に効率的な最新技術などを取り込むことになる。ウチではGCPでソリューションアーキテクトを行い、
そういったクラウド技術が自分にとってかなりモチベーションだった。
それに対し、フロントエンドなどは技術的にモダンである必要がほとんどない。
そこで自分は大言壮語を吐き、最新の技術に触れたいがためにバックエンドの重たいタスクを欲しがった。

7月ぐらいから自分にとって転機となるようなタスクを振られる。
それがApache Beam(Cloud Dataflow)を使ったストリーミングデータのニアリアルタイム集計基盤の設計・実装だった。
fluentdからPub/Sub、Dataflow、Bigtableなどの技術スタックと対峙した。
はっきりいって、MapReduceを名前ぐらいしか知らなかった自分にとって、とんでもなくデカいタスクだった。
その上、会社にとって知見がなく、いわゆる自分が知見を開けていくことになった。
ただ、それが自分の肌にあった。Apache Beamの理解への学習コストは、おそらく高いと思う。
自分のスキルレベルが低いからなのもあるが、それを抜きにしてもWindow戦略やウォーターマークなど、
どんなデータの集計角度にも対応できるための概念の習得には、一筋縄ではいかなかった。
同年3月に安定版の2.0がリリースされ、文献もほとんどない状態で、ソースコードとリファレンスとドキュメントをなん往復もしながら、実装していった。
体重もかなり減った。ただ、幸いなことにこの技術はモーレツに面白い。それが救いというか、むしろ幸せなことだと思った。

10月ぐらいに、ダッシュボードの設計・実装タスクを振られる。
要件レベルがかなり高く、データ量に対する限界をある程度理解していた自分にとって、
理想の要件を現実的な要件定義に落とし込むために上司を納得させていった。
それでも難しく、かなり苦戦を強いられた。BigQueryを直接ユーザに叩かれると死んでしまうので、
BigQueryからデータを吸い出し、それぞれのユーザが取得したい形に集計した中間表現をDatastoreに格納するDataflowを実装したりした。
これはなかなか難しく金銭的にも苦戦し、正直ビッグデータを抱えるダッシュボード案件はツラい仕事だなと心から思った。
もちろん、ネガティブな感情だけでなく、技術的な難易度の高さがやはり面白かった。いい経験になった。
また、このタスクからDataflowはKotlinで書くことにした。それまではJavaで書いていたが、関数型パラダイムと相性がいいし、なにより好奇心があったため。

11月ぐらいから、GCPユーザたちによるGCPUGというコミュニティのSlackに顔を出すようになった。
Dataflowなどは特に利用ユーザも少なく、(その辺Hadoop, Spark, AWS EMRなどは知見交換が活発だと思う)
喉から手がでるほど情報が欲しかったのもあり、こういったコミュニティはかなりありがたい。

3月からは、打って変わって広告配信APIの設計・実装をしている。
大量トラフィックに対し最適な広告を返すAPIの実装というのは、これまたこの事業でもっともおいしい仕事のひとつだと思う。
さらに嬉しいことに、App Engine/Go言語というモダンな構成要素で、これまた社内初の試みをさせてもらっている。
弊社のミスター投機的実行とは僕のことだ。裏を返せば脆弱性です。(特技 : ライブマイグレーション)
もともとGolangにかなり興味があったので、こうやって早速実務で扱えるなんて非常に恵まれていると思う。
またAPIの設計として、DDDなレイヤアーキテクチャについて勉強中である。
App Engineは処理単位をサービスごとに分割し、サービスの粒度でコンテナとしてバックエンドでプロビジョニングされる。
ソースコードをどのように整理するかということで、レイヤアーキテクチャとは相性がいいと睨んでいる。いずれブログにしたい。

GCP, MapReduce, Apache Beam, AppEngine, Go, 分散NoSQLなど、面白いと思える技術たちに出会い、実践で導入する一年だった。
アドテク事業は魅力的な課題が多く、ユーザの視聴をトラッキングし分析するデータ基盤、配信面のクローリング・スクレイピング解析、
ターゲティング配信、ブランドリフトやコンバージョン計測など、それをいかに解決するかというアーキテクトが面白い。
また、バナー広告ではなく動画広告だからこそできる、ゲーミフィケーションを導入したユーザの視聴体験・感情報酬など、そういったユーザファーストへの挑戦も面白い。
まだまだやりたいことがいっぱいあるので、2年目もがんばります。

Kotlinで書いたプログラムをデーモン稼働させた時のメモ

OS上で常にバックグランド稼働し続けるプロセスをデーモンといいます。
自作のアプリケーションをCentOS6系にてデーモン稼働しておけば、管理が楽です。

普段はcronで定期実行するバッチプログラムを書くことが多く、単純にjarファイルを定期的に実行してました。
ただ、jarプログラム自体を無限ループさせておいて、ずっと稼働しておく、というサービスの要件*1があったので、
init.dでデーモン登録する方法を調べました。

参考になったのはこちらのテンプレートで、ぶっちゃけここを参考にしていただければ事足ります。

fhd/init-script-template - GitHub

この記事では、Kotlinで書いたjarをデーモン化する方法として、備忘録がてらにメモっておきます。

ソースコードはこちら GitHub

プログラム例

全ての依存ライブラリを内包したひとつのスタンドアロンなjarファイルを生成します。
今回はこのjarファイルをデーモンにしたいです。

kotlinのクラス(Kt)をjarにコンパイルするための最低限のpom.xmlです。

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.github.kazukousen</groupId>
    <artifactId>kt_example</artifactId>
    <packaging>jar</packaging>
    <version>1.0</version>

    <name>kt_example</name>
    <url>http://maven.apache.org</url>

    <properties>
        <maven.compiler.target>1.8</maven.compiler.target>
        <maven.compiler.source>1.8</maven.compiler.source>
        <kotlin.version>1.2.10</kotlin.version>
        <main.class>com.github.kazukousen.kt_example.HogeKt</main.class>
        <kotlin.compiler.incremental>true</kotlin.compiler.incremental>
    </properties>

    <dependencies>
        <!-- start Kotlin -->
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-stdlib</artifactId>
            <version>${kotlin.version}</version>
        </dependency>
        <dependency>
            <groupId>org.jetbrains.kotlin</groupId>
            <artifactId>kotlin-test</artifactId>
            <version>${kotlin.version}</version>
            <scope>test</scope>
        </dependency>
        <!-- end Kotlin -->
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- start Kotlin -->
            <plugin>
                <groupId>org.jetbrains.kotlin</groupId>
                <artifactId>kotlin-maven-plugin</artifactId>
                <version>${kotlin.version}</version>
                <executions>
                    <execution>
                        <id>compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>${project.basedir}/src/main/kotlin</sourceDir>
                                <sourceDir>${project.basedir}/src/main/java</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                    <execution>
                        <id>test-compile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>test-compile</goal>
                        </goals>
                        <configuration>
                            <sourceDirs>
                                <sourceDir>${project.basedir}/src/test/kotlin</sourceDir>
                                <sourceDir>${project.basedir}/src/test/java</sourceDir>
                            </sourceDirs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
            <!-- end Kotlin -->
            <!-- start .java -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <executions>
                    <execution>
                        <id>default-compile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>default-testCompile</id>
                        <phase>none</phase>
                    </execution>
                    <execution>
                        <id>java-compile</id>
                        <phase>compile</phase>
                        <goals>
                            <goal>compile</goal>
                        </goals>
                    </execution>
                    <execution>
                        <id>java-test-compile</id>
                        <phase>test-compile</phase>
                        <goals>
                            <goal>testCompile</goal>
                        </goals>
                    </execution>
                </executions>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
            <!-- end .java -->

            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-jar-plugin</artifactId>
                <version>3.0.2</version>
                <configuration>
                    <archive>
                        <manifest>
                            <addClasspath>true</addClasspath>
                            <mainClass>${main.class}</mainClass>
                        </manifest>
                    </archive>
                </configuration>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                        <configuration>
                            <archive>
                                <manifest>
                                    <mainClass>${main.class}</mainClass>
                                </manifest>
                            </archive>
                            <descriptorRefs>
                                <descriptorRef>jar-with-dependencies</descriptorRef>
                            </descriptorRefs>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
</project>

src/main/kotlin/com/github/kazukousen/kt_example/Hoge.kt は例えばこうです。

package com.github.kazukousen.kt_example

import java.io.File
import java.lang.Runtime
import java.lang.Thread
import java.lang.Thread.sleep
import java.time.ZonedDateTime

object Hoge {
    fun run() {
        Misc.print("Hello, World!")
    }

}

object Misc {
    fun print(s: String) {
        println("[${ZonedDateTime.now()}] ${s}")
    }

    fun printErr(s: String) {
        System.err.println("[${ZonedDateTime.now()}] ${s}")
    }
}

class ShutdownThread : Thread() {

    override fun run() {
        Misc.print("get HUP signal.")
        // ここに安全に終了する処理
        Misc.print("Shutdown.")
    }
}

fun main(args: Array<String>) {

    // HUPをフックする
    Runtime.getRuntime().addShutdownHook(ShutdownThread())

    // 処理
    while(true) {
        Hoge.run()
        Thread.sleep(5000L)
    }
}

例として、5秒ごとにこんにちは世界するしょぼいプログラムです

mvn package

killコマンドなどでHUPシグナルが送られた時に、
処理途中のものを安全に終了させるためにハンドリングしたい場合は、
JVMでは Runtime.getRuntime().addShutdownHook() にThreadクラスのサブクラスを渡すことで、終了処理を追加できます。

さて、これで mvn package を実行すれば、
target/kt_example-1.0-jar-with-dependencies.jar というjarファイルが出来上がります。

試しに実行してみて、ctrl-Cコマンドでkillしてみると、期待した動きをします。

$ java -jar target/kt_example-1.0-jar-with-dependencies.jar
[2018-03-04T13:45:32.238+09:00[Asia/Tokyo]] Hello, World!
[2018-03-04T13:45:37.242+09:00[Asia/Tokyo]] Hello, World!
[2018-03-04T13:45:42.243+09:00[Asia/Tokyo]] Hello, World!
[2018-03-04T13:45:47.248+09:00[Asia/Tokyo]] Hello, World!
^C[2018-03-04T13:45:47.796+09:00[Asia/Tokyo]] get HUP signal.
[2018-03-04T13:45:47.796+09:00[Asia/Tokyo]] Shutdown.

このプログラムをinit.dに登録したいと思います。

init.d

今回のプログラムを ktd というサービス名でデーモン化したいと思います。

service ktd start
service ktd status
service ktd stop
service ktd restart

これらの操作が効くようにしたいです。
後ろにdがつくのは、デーモンという意味で、慣習です。

/etc/init.d/ktd というファイル名で以下のように記述します。
こちらのテンプレートを思い切りパクらせていただきました。
fhd/init-script-template - GitHub

# !/bin/bash
#
# ktd        Startup script for the jobd
#
# chkconfig: - 85 15
# description: The JobWatcher.
# processname: ktd
#
### BEGIN INIT INFO
# Provides: ktd
# Required-Start: $local_fs $remote_fs $network $named
# Required-Stop: $local_fs $remote_fs $network
# Should-Start: distcache
# Short-Description: start and stop ktd
# Description: ktd
### END INIT INFO

# Source function library.
. /etc/rc.d/init.d/functions

dir="/path/to/kt_sample"
cmd="java -jar kt_example-1.0-jar-with-dependencies.jar"

name=`basename ${0}`
pid_file="/var/run/${name}.pid"
stdout_log="/var/log/${name}/running.log"
stderr_log="/var/log/${name}/error.log"

get_pid() {
    cat "${pid_file}"
}

is_running() {
    [ -f "${pid_file}" ] && ps -p `get_pid` > /dev/null 2>&1
}

case "${1}" in
    start)
    if is_running; then
        echo "Already started"
    else
        echo "Starting $name"
        cd "$dir"
        $cmd >> "${stdout_log}" 2>> "${stderr_log}" &
        echo $! > "${pid_file}"
        if ! is_running; then
            echo "Unable to start, see $stdout_log and $stderr_log"
            exit 1
        fi
        echo "OK"
    fi
    ;;
    stop)
    if is_running; then
        echo -n "Stopping ${name}.."
        kill `get_pid`
        for i in 1 2 3 4 5 6 7 8 9 10
        do
            if ! is_running; then
                break
            fi

            echo -n "."
            sleep 1
        done
        echo

        if is_running; then
            echo "Not stopped; may still be shutting down or shutdown may have failed"
            exit 1
        else
            echo "Stopped"
            if [ -f "${pid_file}" ]; then
                rm "${pid_file}"
            fi
        fi
    else
        echo "Not runnning"
    fi
    ;;
    restart)
    $0 stop
    if is_running; then
        echo "Unable to stop, will not attempt to start"
        exit 1
    fi
    $0 start
    ;;
    status)
    if is_running; then
        echo "Running"
    else
        echo "Stopped"
        exit 1
    fi
    ;;
    *)
    echo "Usage: $0 {start|stop|restart|status}"
    exit 1
    ;;
esac

exit 0
}

pidファイルは /var/run/ktd.pid に、
ログは標準出力とエラー出力それぞれが /var/log/ktd/running.log/var/log/ktd/error.log に出力されます。
ディレクトリは事前に用意しておく必要があります。

mkdir /var/log/ktd

あとは、serviceコマンドに登録してあげます。

chmod 755 /etc/init.d/ktd
chown root:root /etc/init.d/ktd
service ktd start
service ktd status # Runningと表示されればOK

chkconfig コマンドでランレベルを設定し、自動起動をオンにしてあげます。

chkconfig --add ktd
chkconfig ktd on
chkconfig --list ktd # 確認

logrotate.d

次に、1日ごとにログを圧縮ファイルにしていきたいです。
logrotateは便利です。

以下の内容を /etc/logrotate.d/ktd というファイルで保存します。

/var/log/ktd/*.log {
    daily
    rotate 7
    missingok
    notifempty
    compress
    postrotate
        /sbin/service ktd restart
    endscript
}

参考: ログローテーションするためのlogrotate設定とちょっとしたtips - Qiita

rotate後に1度リスタートをする理由は、プログラムがログファイルを噛んでしまっているのを解除する必要があるからです。

chown root:root /etc/logrotate.d/ktd

これで1日ごとにログファイルが切り出されます。

まとめ

kotlinの自作プログラムをinit.dでデーモン登録し、logrotateでログ管理するようにしました。
思いの外JVMでHUPシグナルのフックすることが簡単だったので助かりました。
JVMガベージコレクションタイミングや、色々と理解しておくべきものがありそうです。

*1:例えばBigQueryに対し、重めのクエリをユーザドリブンで同期的実行させるのではなく、キューのように非同期ハンドリングするクエリ管理プログラム

Pythonで乃木坂46公式ブログをクローリング・スクレイピングしてCloud Storageに永続化する

tl;dr

2017年も残りわずかなので、乃木坂46のブログをスクレイピングし、Cloud Storageに保存しておきましょう。
卒業していくメンバーのブログも永遠のものとなる!!!!!優勝!!!!!!!!!!!!

注意: 勢いで書いたコードなので、例外周りとか適当です。見逃してください。

環境

  • Python 3.6.2
    • pyenv/virtualenv
  • tools
    • requests
    • BeautifulSoup
    • google.cloud.storage

Install

requirements.txt

requests
BeautifulSoup4
lxml
python-dotenv
google-cloud-storage
six
$ virtualenv -p python3 env
$ source env/bin/activate
$ pip install -r requirements.txt

サービスアカウントの準備

  • クライアントからGCPサービスを扱う際に認証が必要です
  • サービスアカウントを作成しjsonキーファイルのパスを環境変数GOOGLE_APPLICATION_CREDENTIALSに設定します

サービスアカウントの作成

必要な権限

  • ストレージのオブジェクト作成者
  • ストレージのオブジェクト管理者

注意: ストレージのオブジェクト作成者だけでは同名オブジェクトの上書きができません。 オブジェクトを上書きするには管理者の権限も必要です。

認証jsonキーをダウンロードします。

python上から環境変数を設定

import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'KEY_FILE_PATH'

設計

バケット nogizaka 構造

  • nogizaka/
    • members.txt
    • member/
      • MEMBER_NAME/
        • detail_urls.txt
        • posts/
          • POST_BLOB

スクレイピング手順

  1. 公式ブログTOPからメンバーリストを抽出し members.txt に格納する
  2. メンバー別もしくは members.txt を基に全てのメンバーブログのスクレイピングを行う

メンバーブログのスクレイピング

  1. メンバーブログTOPから全記事のURLをクローリングする
  2. 各記事に対しスクレイピングを行う。タイトル、内容などを member/MEMBER_NAME/posts/ 配下にblobを保存する

コーディング

Cloud Storage

from google.cloud import storage
import six


def _get_storage_client():
    return storage.Client(
        project=os.environ.get('PROJECT_ID')
    )


def upload_file(file_stream, filename, content_type):
    blob = _get_blob(filename)

    blob.upload_from_string(
        file_stream,
        content_type=content_type)

    url = blob.public_url

    if isinstance(url, six.binary_type):
        url = url.decode('utf-8')

    return url


def download_string(filename):
    blob = _get_blob(filename)
    source = blob.download_as_string().decode()
    return source


def is_exists_file(filename):
    blob = _get_blob(filename)

    return blob.exists()


def _get_blob(filename):
    client = _get_storage_client()
    bucket = client.bucket(os.environ.get('CLOUD_STORAGE_BUCKET'))
    blob = bucket.blob(filename)

    if blob is None:
        from google.cloud.storage import Blob
        blob = Blob(filename, bucket)

    return blob


def read_lines(path):
    """
    ストレージからダウンロードしたファイルを行ごとにの要素で配列にする
    """
    source = download_string(path)
    rows = source.split('\n')
    return rows

これを storage.py として使うことにします。

スクレイピング部分

import requests
from bs4 import BeatulfulSoup

from . import storage

class Blog(object):
    """
    乃木坂公式ブログをスクレイピングする
    """


    URL_PREFIX = 'http://blog.nogizaka46.com/'
    HEADERS = {
        'User-Agent': 'HogeHoge',
    }


    @staticmethod
    def create_members_list(path=None):
        """
        全メンバー情報を更新し, 全記事に対してスクレイピングを行い結果をストレージに保存する
        """
        if path is None:
            path = 'members.txt'

        res = requests.get(Blog.URL_PREFIX, headers=Blog.HEADERS)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        unit_tags = soup.find(attrs={'id': 'sidemember'}).findAll(attrs={'class': 'unit'})
        members = [unit_tag.find('a').get('href').rsplit('/', 1)[1] for unit_tag in unit_tags]
        raw = '\n'.join(members)
        return storage.upload_file(raw, path, 'text/plain')

Blog.create_members_list()公式ブログTOPからメンバーリストを作成し、 Cloud Storageにアップロードします。

members.txt はこんな感じになります。

manatsu.akimoto
erika.ikuta
rina.ikoma
karin.itou
junna.itou
marika.ito
sayuri.inoue
misa.eto
hina.kawago
mahiro.kawamura
hinako.kitano
asuka.saito
chiharu.saito
yuuri.saito
iori.sagara
reika.sakurai
kotoko.sasaki
mai.shiraishi
mai.shinuchi
ayane.suzuki
kazumi.takayama
ranze.terada
kana.nakada
himeka.nakamoto
nanase.nishino
ami.noujo
hina.higuchi
minami.hoshino
miona.hori
sayuri.matsumura
rena.yamazaki
yumi.wakatsuki
miria.watanabe
maaya.wada
third

メンバー別にスクレイピング

members.txt にあるメンバー全員分の全記事をスクレイピングするのはきついので、
個人別に操作が効くような設計にしています。

    def __init__(self, base_url=None, member=None, replace=False):
        if base_url is None and member is not None:
            self.member = member
            self.base_url = Blog.URL_PREFIX + self.member
        elif base_url is not None and member is None:
            self.base_url = base_url
            self.member = self.base_url.rsplit('/', 1)[1]
        elif base_url is None and member is None:
            print('error')
            return
        else:
            self.base_url = base_url
            self.member = member
        self.replace = replace
        self.headers = Blog.HEADERS
        self.detail_urls = []
        self.detail_list_url = 'member/' + self.member + '/detail_urls.txt'
        res = requests.get(self.base_url, headers=self.headers)
        if res.status_code != 200:
            raise Exception()

replace プロパティはBooleanで、False であれば既にスクレイピングし永続化し終わった記事に対してはスルーをします。

    def run_crawling_urls_by_member(self):
        """
        メンバーの全記事のURL情報を更新しストレージに保存する
        """
        self.crawl_urls()
        return self.upload_detail_urls(self.detail_list_url)


    def crawl_urls(self):
        """
        全記事のURLをdetail_urlsに格納する
        """
        month_urls = self._get_month_urls(self.base_url)
        for month in month_urls:
            self._add_detail_link(month)


    def _get_month_urls(self, base_url):
        res = requests.get(base_url, headers=self.headers)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        month_tags = soup.find(attrs={'id': 'sidearchives'}).find('select').findAll('option')
        month_urls = [option['value'] for option in month_tags[1:]]
        return month_urls


    def _add_detail_link(self, url):
        res = requests.get(url, headers=self.headers)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        day_table = soup.find(id='daytable')
        detail_urls = [detail_link.get('href') for detail_link in day_table.find_all('a')]
        self.detail_urls.extend(detail_urls)


    def upload_detail_urls(self, dst_filename):
        """
        detail_urls情報をストレージに保存する
        """
        raw = '\n'.join(self.detail_urls)
        return storage.upload_file(raw, dst_filename, 'text/plain')
  1. メンバーブログの月別リスト(id=sidearchives)から月別にURLを取得します。
  2. 日付テーブル(id=daytable)から日別にURLを取得し、detail_urls プロパティへ格納していきます。
  3. 記事URL情報をCloud Storageに保存します。

member/nanase.nishino/detail_urls.txt
f:id:i101330:20180101001128p:plain

    def run_scraping(self):
        """
        メンバーブログの全記事に対してスクレイピングを行い結果をストレージに保存する
        """
        if not storage.is_exists_file(self.detail_list_url):
            self.run_crawling_urls_by_member()

        detail_urls = storage.read_lines(self.detail_list_url)
        for detail_url in detail_urls:
            file_name = 'member/' + self.member + '/post/' + detail_url.rsplit('/', 1)[1]
            self.upload_post_detail(detail_url, file_name)


    def upload_post_detail(self, url, dst_filename):
        """
        author, title, contentをJson形式でストレージに格納する
        """

        if self.replace is False:
            exist = storage.is_exists_file(dst_filename)
            if exist:
                return

        res = requests.get(url, headers=self.headers)
        if res.status_code != 200:
            return

        soup = BeautifulSoup(res.text, 'lxml')
        author = soup.find(class_='author').text
        title = soup.find(class_='entrytitle').text
        content = soup.find(class_='entrybody')
        output = json.dumps({
            'postUrl': url,
            'author': author,
            'title': title,
            'content': content.prettify(),
        }, ensure_ascii=False)

        return storage.upload_file(
            output,
            dst_filename,
            'application/json',
        )

スクレイピング対象の記事URLとともに、投稿者名、記事タイトル、記事内容をJson形式でblobに保存します。

結果

例えば西野七瀬が2017年7月29日に投稿したブログは
gs://BUCKET_NAME/member/nanase.nishino/post/?d=20120729 に格納され、
中身はこんな感じです。

{
    "postUrl": "http://blog.nogizaka46.com/nanase.nishino/?d=20120729", 
    "author": "西野七瀬",
     "title": " ホームゲートに手を引き込まれないよう",
     "content": "<div class=\"entrybody\">\n <div>\n </div>\n <div>\n  ん~~~~~!ななせまる!!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  今日は新技術イベントなり\n </div>\n <div>\n  起こしの方は楽しみにしててくださいね\n </div>\n <div>\n  そして乃木どこもぜひ見てください。\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  <img src=\"http://img.nogizaka46.com/blog/photos/entry/2012/07/29/3049950/0000.jpeg\" width=\"240\"/>\n </div>\n <div>\n  ネイル!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  昨日の夜ぜんぜん寝られへんくて、なんかアプリ探したり考えごとしてました\n </div>\n <div>\n </div>\n <div>\n  そのなかのひとつは歯について!\n </div>\n <div>\n  そういえば小さい頃って歯抜けてたなーって思って\n </div>\n <div>\n  わたしははじめ、ぐらぐらしてきて、そのうち糸一本だけで繋がってる状態になって、それがめっちゃ気になるからベロで遊んでました( ・´ー・`)\n </div>\n <div>\n  歯を一回転してみたり変な方向にねじったりして、抜けることを期待してた\n </div>\n <div>\n  鏡をみながら歯をひっぱっても抜けへんから、諦めてご飯食べてたら、ブチっと歯がとれたことがあった\n </div>\n <div>\n  上の歯はトイレに流す\n </div>\n <div>\n  下の歯は屋根の上に投げる\n </div>\n <div>\n  これが西野家流やねん( ・´ー・`)\n </div>\n <div>\n </div>\n <div>\n  歯が抜けてから処理するまでは引き出しに保管してたんやけど\n </div>\n <div>\n  なんか\"抜けた歯\"が好きで何回も取り出してはいろんな方向から歯をまじまじ見てました\n </div>\n <div>\n  それで昨日、「あ!なな抜けた歯が好きやったんか」って思い出したのさ\n </div>\n <div>\n </div>\n <div>\n  親知らずを歯医者さんで抜いたときも\n </div>\n <div>\n  銀色のプレートに転がってる、すこし血のついた歯をみると「yeah!」って感じでしたな( ・´ー・`)ゼア\n </div>\n <div>\n </div>\n <div>\n  歯について語りすぎた?故に口の中がかゆくなってきた...\n </div>\n <div>\n </div>\n <div>\n  抜けた歯が好きな人\n </div>\n <div>\n  手あーげて!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  <img src=\"http://img.nogizaka46.com/blog/photos/entry/2012/07/29/3049950/0001.jpeg\" width=\"241\"/>\n </div>\n <div>\n  浪漫より\n </div>\n <div>\n </div>\n <div>\n  バイッッッ\n </div>\n <div>\n </div>\n</div>\n"
}

はい可愛い!!!!!!!!!優勝!!!!!!!!!!!!!!

また、卒業するため近日にブログが閉鎖してしまう中元日芽香伊藤万理華のブログもこんな感じで無事確保できました。

f:id:i101330:20171231231004p:plain

Kotlinでデータ加工クエリのDSLを書く

tl;dr

ありとあらゆるクエリをなんでもかんでもメソッドチェーンDSLで書きたい

メソッドチェーンシンタックスでパイプラインを考える

データ加工にはイマドキMapReduceとしてApache Beam *1 が優れています。
Beam Modelではそれぞれデータ加工ロジックを任意の粒度で切り分け、
パイプラインに肉付けしていくイメージでプログラミングしていきます。

Beamが提供しているCoreクラスを使えばかなり短く処理を記述できますが、
実際のデータ加工ロジックは複雑なことが多いのでDoFnサブクラスを定義し、ParDo処理を行うことが多いです。

BeamはJava版の開発が盛んで、ver.2.2にもなって充実さが増してきました。
しかしJavaの書き方は少し冗長でBeam Modelの関数型ライクなモデリングとは相性が微妙です。
そのため僕は同じJVM言語であるKotlinを選択します。

val p = Pipeline.create(options) // パイプライン作成

p.apply("Read from BigQuery", BigQueryIO.readTableRows().fromQuery(sql)) // データソースからデータを取得
    .apply("Phase 1", ParDo.of(Phase1Fn())) // 加工1
    .apply("Phase 2", ParDo.of(Phase2Fn())) // 加工2
    .apply("Phase 3", ParDo.of(Phase3Fn())) // 加工3
    // ...
    .apply("Write to Datastore", DatastoreIO.write().withProjectId("hogehoge")) // データシンクへデータを書き込み

クエリ = 関数(全データ) という考え方に落とし込むと、
データソースからデータを受け取りメソッドチェーンで関数を複数回適用していき、
手に入れたい形へ持っていく、というBeam Modelは比較的理解しやすいです。
Kotlinはmap, filter, reduceなどコレクション操作が充実しているため、
Beam Modelのデータ加工ロジックをKotlinのシンタックスで一度記述してしまえば
クエリをスムーズに考えることができます。

まずはパイプラインのイメージをDSLで書いてみる

データソースD1D2があるとします。
D1はBigQueryなどのDWHで、大量のユーザの行動ログデータがありここから柔軟な分析をしたいと思います。
D2MySQLで管理されたデータソースであり、ユーザデータなどが別WEBサービスからCRUDな操作が行われているものです。

D1はテーブルが日付分割されて1テーブルに1億レコードほども格納されていて、そこからSQLを叩いてある程度中間表現に絞っても1000万レコードほどが吐き出されるほどのビッグデータだとします。
D2はユーザ数分くらいのレコード数であり、その数は3000ほどだとします。

今回の場合、D1をメインとし、D2が副入力として結合されるパイプラインという構築方法をとります。

KotlinライクなDSLで書いてしまって、そのあとBeam Modelを実際に実装していくというやり方が個人的にしっくりきます。

D2.filter {
    DelFlg == 0
}.map {
    (
        , id
        , name
        , age
    ) as s1
}.forEach { s1 ->
    // D2は最終的にListとして保持する
    // Listを副入力としてD1のパイプラインにjoinする
    D1.filter {
        s1.id == it.id
    }.map {
        (
            s1.id
            s1.name
            s1.age
            it.date
            it.event
        ) as s2
    }
}

こんな感じでクエリをDSLで書いてみてしまえば、頭が整理されます。
あとはBeam Modelを実装していきます。

実際にBeam Modelを書く

// D2 -> List<s1>までのPTransformロジック
@DefaultCoder(AvroCoder::class)
data class UserInfo (
    var id: Long = 0L
    , var name: String = ""
    , var age: Long = 0L
)

class ReadUserFn : JdbcIO.RowMapper<UserInfo> {
    override fun mapRow(rs: ResultSet): UserInfo {
        val id = rs.getLong(1)
        val name = rs.getString(2)
        val age = rs.getLong(3)
        return UserInfo(id, name, age)
    }
}

class UserListFn : CombineFn<UserInfo, UserListFn.Accum, MutableList<UserInfo>>() {
    @DefaultCoder(AvroCoder::class)
    data class Accum (
        var master: MutableList<UserInfo> = mutableListOf<UserInfo>()
    )
    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, input: UserInfo): Accum {
        accum.master.add(input)
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        var merged = createAccumulator()
        accums.forEach {
            it.master.forEach { merged.master.add(it) }
        }
        return merged
    }

    override fun extractOutput(accum: Accum): MutableList<UserInfo> = accum.master
}

// D1と副入力List<s1>のマージロジック
@DefaultCoder(AvroCoder::class)
data class UserEventInfo (
    var user: UserInfo
    , var date: String
    , var event: String
)

class MapFn : DoFn<TableRow, UserEventInfo> {
    var view: PCollectionView<MutableList<UserInfo>>? = null

    constructor() {}
    constructor(_view: PCollectionView<MutableList<UserInfo>>) {
        view = _view
    }

    @ProcessElement fun processElement(c: ProcessContext) {
        val row = c.element()
        val side = c.sideInput(view) ?: return

        side.filter { s1 ->
            s1.id == row.get("id").toString().toLongOrNull()
        }.map { s1 ->
            val date = row.get("date").toString() ?: return
            val event = row.get("event").toString() ?: return
            UserEventInfo(s1, date, event)
        }.forEach { s2 ->
            c.output(s2)
        }
    }
}

fun main(args: Array<String>) {
    // ...

    val p = Pipeline.create(options)

    val jdbcInput = JdbcIO.read<UserInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("username")
            .withPassword("password")
        ).withQuery(d2Sql)
        .withCoder(AvroCoder.of(UserInfo::class.java))
        .withRowMapper(ReadUserFn())

    val view = p.apply("Read from D2", jdbcInput)
        .apply("Create List<s1>", Combine.globally(UserListFn()).asSingletonView())

    p.apply("Read from D1", BigQueryIO.readTableRows().fromQuery(d1Sql))
        .apply("Merge List<s1>", ParDo.of(MapFn(view)).withSideInputs(view))
}

SQL <=> メソッドチェーンの互換

世の中には逆にSQLでなんでも書きたいと考える人がいます。 そんなSQL信者にも朗報として、さきほどのDSLから「SELECT <=> map」「WHERE <=> filter」など変換則を見出しさえすれば、比較的簡単にコンバート可能です。

SELECT s1.id, s1.name, s1.age, date, event
    FROM (
        SELECT id, date, event
            FROM D1
    ) as d1
    JOIN (
        SELECT id, name, age
            FROM D2
            WHERE DelFlg = 0
    ) as s1
    ON d1.id = s1.id

SQLインターフェースが提供されているデータベースによってオプティマイザが異なります。
実際にはJOIN句の左と右どちらがデータサイズが大きいものを選ぶべきかはデータベースによって異なります。

まとめ

大概、パイプラインに関数を適用していくやり方は クエリ = 関数(全データ) という方程式で説明されるようにデータ加工ロジックと相性があいます。
こんな感じでKotlinの簡潔なコレクション操作シンタックスで一度DSLを書いてみて、
そのあとBeam Modelを書いてしまう、というのが思考のコンテキストスイッチが発生せず生産性が高くなると見込んでいます。
楽しいデータフローライフを!!!!!!!!

*1:Cloud DataflowがOSS

VAEと時系列を扱うVRAE

Google日英翻訳がNMT(ニューラルネットワークを利用した翻訳モデル)になったらしいです。
文語ベースでバリバリ翻訳してくれてるみたいで、
単語にマウスオーバー当てても単語間の対応関係がわからなくなったのはさみしいけど、
たしかに精度がよくなったような、そんな気がするぜ。
というわけで英語論文めっちゃ読んだろ。。

VAEで何かしたいなーと思ってて、時系列を扱えるVRAE周りを調べました。

Variational AutoEncoder

Variational AutoEncoderをついにしっかり理解できた気がする。

はっつーさんのkeras実装ブログがとても勉強になりました。
有難うございます。
ralo23.hatenablog.com

Variational AutoEncoderについてまとめたスライドを作りました。

続きを読む

情報幾何学の参考書2つ。

情報幾何学を勉強中。
目的はDeep Neural Networkの解析。

情報幾何学

  • Fisher情報量(情報行列) 確率変数がパラメーターに関して持つ情報量
  • Riemann計量 微分幾何学ででてくる計量

若き頃のC.R.Raoが「Fisher情報行列とRiemann計量って一緒じゃね?」と言い始め、
いろいろな可能性が湧き出てきて、甘利先生、長岡先生たちが理論体系としてまとめあげた。
情報幾何学のはじまりはじまり。

確率分布空間を多様体構造で幾何的に捉える。
狭義では「双対アファイン接続の微分幾何学」。

続きを読む

情報幾何の勉強。機械学習の数理的研究。

(勉強中のブログです。気をつけてるんですが厳密さに欠けることも書きがちなので、あくまで備忘録という免罪符を利用します。暖かい目でみてください。)

tl;dr

情報幾何学という分野を知った

機械学習、深層学習の仕組みを理解する一つの方法論
日本から生まれた理論体系。

続きを読む