日記マン

モチベはビッグデータに対する分散処理、機械学習、深層学習などです。

Pythonで乃木坂46公式ブログをクローリング・スクレイピングしてCloud Storageに永続化する

tl;dr

2017年も残りわずかなので、乃木坂46のブログをスクレイピングし、Cloud Storageに保存しておきましょう。
卒業していくメンバーのブログも永遠のものとなる!!!!!優勝!!!!!!!!!!!!

注意: 勢いで書いたコードなので、例外周りとか適当です。見逃してください。

環境

  • Python 3.6.2
    • pyenv/virtualenv
  • tools
    • requests
    • BeautifulSoup
    • google.cloud.storage

Install

requirements.txt

requests
BeautifulSoup4
lxml
python-dotenv
google-cloud-storage
six
$ virtualenv -p python3 env
$ source env/bin/activate
$ pip install -r requirements.txt

サービスアカウントの準備

  • クライアントからGCPサービスを扱う際に認証が必要です
  • サービスアカウントを作成しjsonキーファイルのパスを環境変数GOOGLE_APPLICATION_CREDENTIALSに設定します

サービスアカウントの作成

必要な権限

  • ストレージのオブジェクト作成者
  • ストレージのオブジェクト管理者

注意: ストレージのオブジェクト作成者だけでは同名オブジェクトの上書きができません。 オブジェクトを上書きするには管理者の権限も必要です。

認証jsonキーをダウンロードします。

python上から環境変数を設定

import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'KEY_FILE_PATH'

設計

バケット nogizaka 構造

  • nogizaka/
    • members.txt
    • member/
      • MEMBER_NAME/
        • detail_urls.txt
        • posts/
          • POST_BLOB

スクレイピング手順

  1. 公式ブログTOPからメンバーリストを抽出し members.txt に格納する
  2. メンバー別もしくは members.txt を基に全てのメンバーブログのスクレイピングを行う

メンバーブログのスクレイピング

  1. メンバーブログTOPから全記事のURLをクローリングする
  2. 各記事に対しスクレイピングを行う。タイトル、内容などを member/MEMBER_NAME/posts/ 配下にblobを保存する

コーディング

Cloud Storage

from google.cloud import storage
import six


def _get_storage_client():
    return storage.Client(
        project=os.environ.get('PROJECT_ID')
    )


def upload_file(file_stream, filename, content_type):
    blob = _get_blob(filename)

    blob.upload_from_string(
        file_stream,
        content_type=content_type)

    url = blob.public_url

    if isinstance(url, six.binary_type):
        url = url.decode('utf-8')

    return url


def download_string(filename):
    blob = _get_blob(filename)
    source = blob.download_as_string().decode()
    return source


def is_exists_file(filename):
    blob = _get_blob(filename)

    return blob.exists()


def _get_blob(filename):
    client = _get_storage_client()
    bucket = client.bucket(os.environ.get('CLOUD_STORAGE_BUCKET'))
    blob = bucket.blob(filename)

    if blob is None:
        from google.cloud.storage import Blob
        blob = Blob(filename, bucket)

    return blob


def read_lines(path):
    """
    ストレージからダウンロードしたファイルを行ごとにの要素で配列にする
    """
    source = download_string(path)
    rows = source.split('\n')
    return rows

これを storage.py として使うことにします。

スクレイピング部分

import requests
from bs4 import BeatulfulSoup

from . import storage

class Blog(object):
    """
    乃木坂公式ブログをスクレイピングする
    """


    URL_PREFIX = 'http://blog.nogizaka46.com/'
    HEADERS = {
        'User-Agent': 'HogeHoge',
    }


    @staticmethod
    def create_members_list(path=None):
        """
        全メンバー情報を更新し, 全記事に対してスクレイピングを行い結果をストレージに保存する
        """
        if path is None:
            path = 'members.txt'

        res = requests.get(Blog.URL_PREFIX, headers=Blog.HEADERS)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        unit_tags = soup.find(attrs={'id': 'sidemember'}).findAll(attrs={'class': 'unit'})
        members = [unit_tag.find('a').get('href').rsplit('/', 1)[1] for unit_tag in unit_tags]
        raw = '\n'.join(members)
        return storage.upload_file(raw, path, 'text/plain')

Blog.create_members_list()公式ブログTOPからメンバーリストを作成し、 Cloud Storageにアップロードします。

members.txt はこんな感じになります。

manatsu.akimoto
erika.ikuta
rina.ikoma
karin.itou
junna.itou
marika.ito
sayuri.inoue
misa.eto
hina.kawago
mahiro.kawamura
hinako.kitano
asuka.saito
chiharu.saito
yuuri.saito
iori.sagara
reika.sakurai
kotoko.sasaki
mai.shiraishi
mai.shinuchi
ayane.suzuki
kazumi.takayama
ranze.terada
kana.nakada
himeka.nakamoto
nanase.nishino
ami.noujo
hina.higuchi
minami.hoshino
miona.hori
sayuri.matsumura
rena.yamazaki
yumi.wakatsuki
miria.watanabe
maaya.wada
third

メンバー別にスクレイピング

members.txt にあるメンバー全員分の全記事をスクレイピングするのはきついので、
個人別に操作が効くような設計にしています。

    def __init__(self, base_url=None, member=None, replace=False):
        if base_url is None and member is not None:
            self.member = member
            self.base_url = Blog.URL_PREFIX + self.member
        elif base_url is not None and member is None:
            self.base_url = base_url
            self.member = self.base_url.rsplit('/', 1)[1]
        elif base_url is None and member is None:
            print('error')
            return
        else:
            self.base_url = base_url
            self.member = member
        self.replace = replace
        self.headers = Blog.HEADERS
        self.detail_urls = []
        self.detail_list_url = 'member/' + self.member + '/detail_urls.txt'
        res = requests.get(self.base_url, headers=self.headers)
        if res.status_code != 200:
            raise Exception()

replace プロパティはBooleanで、False であれば既にスクレイピングし永続化し終わった記事に対してはスルーをします。

    def run_crawling_urls_by_member(self):
        """
        メンバーの全記事のURL情報を更新しストレージに保存する
        """
        self.crawl_urls()
        return self.upload_detail_urls(self.detail_list_url)


    def crawl_urls(self):
        """
        全記事のURLをdetail_urlsに格納する
        """
        month_urls = self._get_month_urls(self.base_url)
        for month in month_urls:
            self._add_detail_link(month)


    def _get_month_urls(self, base_url):
        res = requests.get(base_url, headers=self.headers)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        month_tags = soup.find(attrs={'id': 'sidearchives'}).find('select').findAll('option')
        month_urls = [option['value'] for option in month_tags[1:]]
        return month_urls


    def _add_detail_link(self, url):
        res = requests.get(url, headers=self.headers)
        if res.status_code != 200:
            return
        soup = BeautifulSoup(res.text, 'lxml')
        day_table = soup.find(id='daytable')
        detail_urls = [detail_link.get('href') for detail_link in day_table.find_all('a')]
        self.detail_urls.extend(detail_urls)


    def upload_detail_urls(self, dst_filename):
        """
        detail_urls情報をストレージに保存する
        """
        raw = '\n'.join(self.detail_urls)
        return storage.upload_file(raw, dst_filename, 'text/plain')
  1. メンバーブログの月別リスト(id=sidearchives)から月別にURLを取得します。
  2. 日付テーブル(id=daytable)から日別にURLを取得し、detail_urls プロパティへ格納していきます。
  3. 記事URL情報をCloud Storageに保存します。

member/nanase.nishino/detail_urls.txt
f:id:i101330:20180101001128p:plain

    def run_scraping(self):
        """
        メンバーブログの全記事に対してスクレイピングを行い結果をストレージに保存する
        """
        if not storage.is_exists_file(self.detail_list_url):
            self.run_crawling_urls_by_member()

        detail_urls = storage.read_lines(self.detail_list_url)
        for detail_url in detail_urls:
            file_name = 'member/' + self.member + '/post/' + detail_url.rsplit('/', 1)[1]
            self.upload_post_detail(detail_url, file_name)


    def upload_post_detail(self, url, dst_filename):
        """
        author, title, contentをJson形式でストレージに格納する
        """

        if self.replace is False:
            exist = storage.is_exists_file(dst_filename)
            if exist:
                return

        res = requests.get(url, headers=self.headers)
        if res.status_code != 200:
            return

        soup = BeautifulSoup(res.text, 'lxml')
        author = soup.find(class_='author').text
        title = soup.find(class_='entrytitle').text
        content = soup.find(class_='entrybody')
        output = json.dumps({
            'postUrl': url,
            'author': author,
            'title': title,
            'content': content.prettify(),
        }, ensure_ascii=False)

        return storage.upload_file(
            output,
            dst_filename,
            'application/json',
        )

スクレイピング対象の記事URLとともに、投稿者名、記事タイトル、記事内容をJson形式でblobに保存します。

結果

例えば西野七瀬が2017年7月29日に投稿したブログは
gs://BUCKET_NAME/member/nanase.nishino/post/?d=20120729 に格納され、
中身はこんな感じです。

{
    "postUrl": "http://blog.nogizaka46.com/nanase.nishino/?d=20120729", 
    "author": "西野七瀬",
     "title": " ホームゲートに手を引き込まれないよう",
     "content": "<div class=\"entrybody\">\n <div>\n </div>\n <div>\n  ん~~~~~!ななせまる!!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  今日は新技術イベントなり\n </div>\n <div>\n  起こしの方は楽しみにしててくださいね\n </div>\n <div>\n  そして乃木どこもぜひ見てください。\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  <img src=\"http://img.nogizaka46.com/blog/photos/entry/2012/07/29/3049950/0000.jpeg\" width=\"240\"/>\n </div>\n <div>\n  ネイル!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  昨日の夜ぜんぜん寝られへんくて、なんかアプリ探したり考えごとしてました\n </div>\n <div>\n </div>\n <div>\n  そのなかのひとつは歯について!\n </div>\n <div>\n  そういえば小さい頃って歯抜けてたなーって思って\n </div>\n <div>\n  わたしははじめ、ぐらぐらしてきて、そのうち糸一本だけで繋がってる状態になって、それがめっちゃ気になるからベロで遊んでました( ・´ー・`)\n </div>\n <div>\n  歯を一回転してみたり変な方向にねじったりして、抜けることを期待してた\n </div>\n <div>\n  鏡をみながら歯をひっぱっても抜けへんから、諦めてご飯食べてたら、ブチっと歯がとれたことがあった\n </div>\n <div>\n  上の歯はトイレに流す\n </div>\n <div>\n  下の歯は屋根の上に投げる\n </div>\n <div>\n  これが西野家流やねん( ・´ー・`)\n </div>\n <div>\n </div>\n <div>\n  歯が抜けてから処理するまでは引き出しに保管してたんやけど\n </div>\n <div>\n  なんか\"抜けた歯\"が好きで何回も取り出してはいろんな方向から歯をまじまじ見てました\n </div>\n <div>\n  それで昨日、「あ!なな抜けた歯が好きやったんか」って思い出したのさ\n </div>\n <div>\n </div>\n <div>\n  親知らずを歯医者さんで抜いたときも\n </div>\n <div>\n  銀色のプレートに転がってる、すこし血のついた歯をみると「yeah!」って感じでしたな( ・´ー・`)ゼア\n </div>\n <div>\n </div>\n <div>\n  歯について語りすぎた?故に口の中がかゆくなってきた...\n </div>\n <div>\n </div>\n <div>\n  抜けた歯が好きな人\n </div>\n <div>\n  手あーげて!\n </div>\n <div>\n </div>\n <div>\n </div>\n <div>\n  <img src=\"http://img.nogizaka46.com/blog/photos/entry/2012/07/29/3049950/0001.jpeg\" width=\"241\"/>\n </div>\n <div>\n  浪漫より\n </div>\n <div>\n </div>\n <div>\n  バイッッッ\n </div>\n <div>\n </div>\n</div>\n"
}

はい可愛い!!!!!!!!!優勝!!!!!!!!!!!!!!

また、卒業するため近日にブログが閉鎖してしまう中元日芽香伊藤万理華のブログもこんな感じで無事確保できました。

f:id:i101330:20171231231004p:plain

Kotlinでデータ加工クエリのDSLを書く

tl;dr

ありとあらゆるクエリをなんでもかんでもメソッドチェーンDSLで書きたい

メソッドチェーンシンタックスでパイプラインを考える

データ加工にはイマドキMapReduceとしてApache Beam *1 が優れています。
Beam Modelではそれぞれデータ加工ロジックを任意の粒度で切り分け、
パイプラインに肉付けしていくイメージでプログラミングしていきます。

Beamが提供しているCoreクラスを使えばかなり短く処理を記述できますが、
実際のデータ加工ロジックは複雑なことが多いのでDoFnサブクラスを定義し、ParDo処理を行うことが多いです。

BeamはJava版の開発が盛んで、ver.2.2にもなって充実さが増してきました。
しかしJavaの書き方は少し冗長でBeam Modelの関数型ライクなモデリングとは相性が微妙です。
そのため僕は同じJVM言語であるKotlinを選択します。

val p = Pipeline.create(options) // パイプライン作成

p.apply("Read from BigQuery", BigQueryIO.readTableRows().fromQuery(sql)) // データソースからデータを取得
    .apply("Phase 1", ParDo.of(Phase1Fn())) // 加工1
    .apply("Phase 2", ParDo.of(Phase2Fn())) // 加工2
    .apply("Phase 3", ParDo.of(Phase3Fn())) // 加工3
    // ...
    .apply("Write to Datastore", DatastoreIO.write().withProjectId("hogehoge")) // データシンクへデータを書き込み

クエリ = 関数(全データ) という考え方に落とし込むと、
データソースからデータを受け取りメソッドチェーンで関数を複数回適用していき、
手に入れたい形へ持っていく、というBeam Modelは比較的理解しやすいです。
Kotlinはmap, filter, reduceなどコレクション操作が充実しているため、
Beam Modelのデータ加工ロジックをKotlinのシンタックスで一度記述してしまえば
クエリをスムーズに考えることができます。

まずはパイプラインのイメージをDSLで書いてみる

データソースD1D2があるとします。
D1はBigQueryなどのDWHで、大量のユーザの行動ログデータがありここから柔軟な分析をしたいと思います。
D2MySQLで管理されたデータソースであり、ユーザデータなどが別WEBサービスからCRUDな操作が行われているものです。

D1はテーブルが日付分割されて1テーブルに1億レコードほども格納されていて、そこからSQLを叩いてある程度中間表現に絞っても1000万レコードほどが吐き出されるほどのビッグデータだとします。
D2はユーザ数分くらいのレコード数であり、その数は3000ほどだとします。

今回の場合、D1をメインとし、D2が副入力として結合されるパイプラインという構築方法をとります。

KotlinライクなDSLで書いてしまって、そのあとBeam Modelを実際に実装していくというやり方が個人的にしっくりきます。

D2.filter {
    DelFlg == 0
}.map {
    (
        , id
        , name
        , age
    ) as s1
}.forEach { s1 ->
    // D2は最終的にListとして保持する
    // Listを副入力としてD1のパイプラインにjoinする
    D1.filter {
        s1.id == it.id
    }.map {
        (
            s1.id
            s1.name
            s1.age
            it.date
            it.event
        ) as s2
    }
}

こんな感じでクエリをDSLで書いてみてしまえば、頭が整理されます。
あとはBeam Modelを実装していきます。

実際にBeam Modelを書く

// D2 -> List<s1>までのPTransformロジック
@DefaultCoder(AvroCoder::class)
data class UserInfo (
    var id: Long = 0L
    , var name: String = ""
    , var age: Long = 0L
)

class ReadUserFn : JdbcIO.RowMapper<UserInfo> {
    override fun mapRow(rs: ResultSet): UserInfo {
        val id = rs.getLong(1)
        val name = rs.getString(2)
        val age = rs.getLong(3)
        return UserInfo(id, name, age)
    }
}

class UserListFn : CombineFn<UserInfo, UserListFn.Accum, MutableList<UserInfo>>() {
    @DefaultCoder(AvroCoder::class)
    data class Accum (
        var master: MutableList<UserInfo> = mutableListOf<UserInfo>()
    )
    override fun createAccumulator(): Accum = Accum()

    override fun addInput(accum: Accum, input: UserInfo): Accum {
        accum.master.add(input)
        return accum
    }

    override fun mergeAccumulators(accums: Iterable<Accum>): Accum {
        var merged = createAccumulator()
        accums.forEach {
            it.master.forEach { merged.master.add(it) }
        }
        return merged
    }

    override fun extractOutput(accum: Accum): MutableList<UserInfo> = accum.master
}

// D1と副入力List<s1>のマージロジック
@DefaultCoder(AvroCoder::class)
data class UserEventInfo (
    var user: UserInfo
    , var date: String
    , var event: String
)

class MapFn : DoFn<TableRow, UserEventInfo> {
    var view: PCollectionView<MutableList<UserInfo>>? = null

    constructor() {}
    constructor(_view: PCollectionView<MutableList<UserInfo>>) {
        view = _view
    }

    @ProcessElement fun processElement(c: ProcessContext) {
        val row = c.element()
        val side = c.sideInput(view) ?: return

        side.filter { s1 ->
            s1.id == row.get("id").toString().toLongOrNull()
        }.map { s1 ->
            val date = row.get("date").toString() ?: return
            val event = row.get("event").toString() ?: return
            UserEventInfo(s1, date, event)
        }.forEach { s2 ->
            c.output(s2)
        }
    }
}

fun main(args: Array<String>) {
    // ...

    val p = Pipeline.create(options)

    val jdbcInput = JdbcIO.read<UserInfo>()
        .withDataSourceConfiguration(
            JdbcIO.DataSourceConfiguration.create("com.mysql.jdbc.Driver", jdbcUrl)
            .withUsername("username")
            .withPassword("password")
        ).withQuery(d2Sql)
        .withCoder(AvroCoder.of(UserInfo::class.java))
        .withRowMapper(ReadUserFn())

    val view = p.apply("Read from D2", jdbcInput)
        .apply("Create List<s1>", Combine.globally(UserListFn()).asSingletonView())

    p.apply("Read from D1", BigQueryIO.readTableRows().fromQuery(d1Sql))
        .apply("Merge List<s1>", ParDo.of(MapFn(view)).withSideInputs(view))
}

SQL <=> メソッドチェーンの互換

世の中には逆にSQLでなんでも書きたいと考える人がいます。 そんなSQL信者にも朗報として、さきほどのDSLから「SELECT <=> map」「WHERE <=> filter」など変換則を見出しさえすれば、比較的簡単にコンバート可能です。

SELECT s1.id, s1.name, s1.age, date, event
    FROM (
        SELECT id, date, event
            FROM D1
    ) as d1
    JOIN (
        SELECT id, name, age
            FROM D2
            WHERE DelFlg = 0
    ) as s1
    ON d1.id = s1.id

SQLインターフェースが提供されているデータベースによってオプティマイザが異なります。
実際にはJOIN句の左と右どちらがデータサイズが大きいものを選ぶべきかはデータベースによって異なります。

まとめ

大概、パイプラインに関数を適用していくやり方は クエリ = 関数(全データ) という方程式で説明されるようにデータ加工ロジックと相性があいます。
こんな感じでKotlinの簡潔なコレクション操作シンタックスで一度DSLを書いてみて、
そのあとBeam Modelを書いてしまう、というのが思考のコンテキストスイッチが発生せず生産性が高くなると見込んでいます。
楽しいデータフローライフを!!!!!!!!

*1:Cloud DataflowがOSS

VAEと時系列を扱うVRAE

Google日英翻訳がNMT(ニューラルネットワークを利用した翻訳モデル)になったらしいです。
文語ベースでバリバリ翻訳してくれてるみたいで、
単語にマウスオーバー当てても単語間の対応関係がわからなくなったのはさみしいけど、
たしかに精度がよくなったような、そんな気がするぜ。
というわけで英語論文めっちゃ読んだろ。。

VAEで何かしたいなーと思ってて、時系列を扱えるVRAE周りを調べました。

Variational AutoEncoder

Variational AutoEncoderをついにしっかり理解できた気がする。

はっつーさんのkeras実装ブログがとても勉強になりました。
有難うございます。
ralo23.hatenablog.com

Variational AutoEncoderについてまとめたスライドを作りました。

続きを読む

情報幾何学の参考書2つ。

情報幾何学を勉強中。
目的はDeep Neural Networkの解析。

情報幾何学

  • Fisher情報量(情報行列) 確率変数がパラメーターに関して持つ情報量
  • Riemann計量 微分幾何学ででてくる計量

若き頃のC.R.Raoが「Fisher情報行列とRiemann計量って一緒じゃね?」と言い始め、
いろいろな可能性が湧き出てきて、甘利先生、長岡先生たちが理論体系としてまとめあげた。
情報幾何学のはじまりはじまり。

確率分布空間を多様体構造で幾何的に捉える。
狭義では「双対アファイン接続の微分幾何学」。

続きを読む

情報幾何の勉強。機械学習の数理的研究。

(勉強中のブログです。気をつけてるんですが厳密さに欠けることも書きがちなので、あくまで備忘録という免罪符を利用します。暖かい目でみてください。)

tl;dr

情報幾何学という分野を知った

機械学習、深層学習の仕組みを理解する一つの方法論
日本から生まれた理論体系。

続きを読む

Kerasつかってみた。紹介とか。おすすめサイトとか。

深層学習フレームワークとしてKerasを使ってみたら結構楽しかったです。
直感的なレイヤー型記述はChainerに似てます。
Theano または TensorFlowのラッパーとして動くので低レベル記述も効いて汎用性は高そうです。

おすすめのサイト(随時更新するつもり)

続きを読む

機械学習・深層学習の勉強したこと・したいこと、このブログの方針

ちょっと予告とは違う更新だけど。。

深層学習(Deep Learning)というものを知り、いろいろ調べて勉強したりした一年
紆余曲折がありつつも、Pythonを初めて扱い深層学習以外の機械学習全般に興味を持った
今取り組んでいることは、「機械学習を基礎から身につけ、pythonで実装していく力を身につける。」

ある程度参考書やサイトを漁った結果、自分なりのベストプラクティスが固まりつつある(前もこんなこと言った)

このブログは息抜き程度の備忘録の位置づけにしたいので、技術ブログではないことをあやふやな記述の免罪符にしたい
わりとまじで。切実に。

機械学習のベストプラクティス(今現在)

数学の事前準備

さすがに微分積分と行列、確率はある程度必要になる。
個人的に行列の微分が苦手。現役の学生なので覚えてるが、
一切を忘れてしまったという方は数学の参考書で数式慣れしたほうがいいかも

超初心者

Python触ったことないYO!なひと。

Numpyの勉強はここがためになる
機械学習の Python との出会い — 機械学習の Python との出会い

PythonじゃないけどMATLABに似たようなやつでOctaveという言語がある。
Courseraのビデオ学習が勉強になる。
www.coursera.org

日本語の字幕がユーモアあって楽しい。
ちなみにNg先生は深層学習での大家でもある

Kaggle(様々な問題(データセット)に世界中のデータサイエンティストが正答率を競い合うプログラミングサイト)に登録もしておく

qiita.com

いずれはKaggleに!と常々思っていたのでこのQiita記事の後押しもあり登録してみた
スコアの高いコードが参考になる

深層学習を学びたいひとは、大変だけど論文サーベイもできたらいいと思う
この一年で1000以上も投稿されてさすがに読みきれないので、
slideshareでまとめてくれてるひとのスライドにお世話になることも多い
Twitterはさみとぎ屋というアカウントで技術や論文概要をRTしたりしてるので
よかったらフォローしてください

続きを読む