Dataflow modelができるまで

この記事は、「CyberAgent 19新卒 エンジニア Advent Calendar 2018」18日目のものです。面白い記事が上がってきているので興味のある方は見てみてください。

今回は最近バイトで触れる機会が多いDataflow modelの話をしたいと思います。Dataflow modelと言ってピンとこない人はGCPのCloud Dataflowのことだと思っていいただければ大丈夫です。この記事では、分散処理基盤のちょとした歴史を振り返りながら、どのような要素でDataflow modelが成り立っているかみたいなことを、自分のための整理も含め軽くまとめます。前半部分は「Spark in Action」という本の著者が、Hadoopについての歴史(The history of Hadoop)をいい感じにまとめてくれてあるので、その内容を参考にしています。詳しめに知りたくなった人は読んでみてください。

はじめに、Dataflow modelをまったく知らない人のために軽く説明すると、Dataflow modelとは、膨大なリアルタイムデータをどう捌いていくかを記述するためのモデルです。ユーザー数の多いサービスでも、リアルタイムにデータを処理して検索を行うためにインデキシングしたり、ユーザの行動分析したりできるようになります。このリアルタイムデータの処理みたいな文言だけ聞くと、簡単そうに聞こえますがDataflow modelを実際に実現するためには、長い(と行っても十数年くらい)技術の積み重ねがありました。始まりは、GoogleのWebクローラーの処理が一つのマシンでは追いつかなくなるところからでした。そこから処理をスケールさせるために様々な試みが行われます。

HDFSの原形となるNDFS

処理をスケールさせるためにはまず、一つのマシンでは収まり切らなくなったディスクの管理をなんとかするところから入ります。このとき生まれたのが現在のHDFSの原形となるNDFS(Nutchさんが作ったNutch Distributed File System)という分散ファイルシステムです。このNDFSが登場したことによって、ディスクの管理は隠蔽され、複数のマシンのディスクを一つのファイルシステムとして扱えるようになります。さらに、ディスクが故障しても、複数マシンにデータが分散して置かれているので、データが失われるリスクが減りました。ファイルシステムが故障を前提として設計されたことで、高価な信頼性の高いディスクを買う必要もなくなりました。

MapReduceによる分散処理

ファイルシステムが出来上がると次は、そのシステムに適した処理基盤が求められることになります。ファイルシステムがスケーリングしても処理するコアの能力が1インスタンス分ではなんの意味もありません。そこで生まれたのがMapReduceという概念です。MapReduceでは、MapとReduceという二つの処理を組み合わせてデータの集計を行っていきます。文章中の単語数のカウントを例に上げて説明すると図のような処理になります。ここでは、Map処理で文章中の単語を<単語,単語の数>という構造にマッピングし、Reduce処理でマッピングしたデータを少しずつ集めて合計の値を計算しています。 MapReduceでは、このようなモデルで処理を行うことで、複数マシンの処理能力を効率よく利用していきます。

分散処理基盤の実装

HDFSやMapReduceはApache Hadoopというプロジェクト内で開発が進められていきました。このとき、耐障害性を保ったまま、各サーバーに役割をうまく割り振るため、様々な試みがされてきました。大きな試みを一つ上げると、初期のHadoopではMapReduceを行うサーバーがリソースの割当や耐障害性を持たせるための役割をすべて担っていました。そこで、途中からYARNというそれらのMapReduceの本質とは関係のない部分を引き受けるサービスが分離しました。このYARNの独立はMapReduce以外のフレームワークの開発にも大きな貢献をしました。この後説明するストリーミング処理等もそれらの恩恵をうけて開発されています。

ストリーミング処理の実現

大量のデータを処理できる基盤が出来上がると、今度はそれをいかにリアルタイムなデータに適用していくかという話になってきます。MapReduceの処理を1時間おきに定期実行していたらすべての処理が終わるまで、直近1時間のデータは確認できないわけです。そこで、登場するのがApache SparkやApache Flinkなどのストリーミング処理を行えるフレームワークです。これらのフレームワークはキューイングサービスや、HDFSなどと連携して、入力されたデータから順番に処理を行い、終わったものから次のサービスに渡すというようなことができます。これによって、必要な処理結果がほぼリアルタイムに得られるシステムが組めるようになります。

Dataflow modelによる処理の記述

リアルタイムにデータが処理実現できるようになって、いよいよやることなくなってきたかと思うと実はそうでもありません。このリアルタイムなデータは到達順が保証されてなかったりする関係で、ログの前後関係を考慮した処理を記述するような場合、とてもコードが複雑になります。なので、この複雑な処理を一般化して、枠組みを作る必要が出てきます。そこで、登場するのが今回の主題であるDataflow modelです。実際にDataflow modelでは以下のようなWindowという概念が作られ、時系列データを一定時間ごとに区切って行う処理を記述できるようになっています。 Dataflow modelは現在Apache Beamという名前で開発が行われており、上記の処理モデルが既存のApache SparkやApache Flinkに適用できるようになっています。そのため、いろいろなフレームワークの学習をしなくてもとりあえずApache Beamだけ知っておけば、複数の環境で同じ処理が実行できることになるので、学習コストの面でも大きなメリットがあります。

まとめ

今回は、分散システムから初めて、Dataflow modelが出来上がるまでの流れをざっくり遡ってみました。正直この歴史を知らなくても、Dataflow model自体(Apache Beam)だけ知っていれば処理の記述はできます。しかし、処理基盤を独自で持つような場合は、今日やった歴史(それらの概念を含むコード)に触れる機会も多いと思うので、知っておいて損はないのかなって思います。

以上、ちょっとしたDataflow modelとそれにまつわる歴史の話でした。

Redisを使ったwork queueの作成(k8s用)

最近k8sを使ってウェブクローラーを作っているのですが、処理をスケールさせるためにwork queueを作成する必要があったのでその方法をまとめておきます。k8sのチュートリアルを見ているとRedisを使ったwork queueの作成方法が出てきますが、今回はGoでこれを実現したかったのでGoのライブラリを使って処理を記述していきます。

とりあえずk8sをいじりだすといろいろややこしくなるので、ローカルでRedisを立ち上げてタスクを登録するプログラムとそれを読むプログラムを書いていきます。

RedisはQuick startあたりを見てそれぞれの環境で立ち上がるようにしてください。

このRedis上にGoのライブラリを使ってwork queueを作っていきます。

使うライブラリはこちら。

https://github.com/adjust/redismq

これを使ってタスクを登録するコードを書くとこんな感じになります。

func main() {
    // Redisのホスト、ポート、パスワード(設定してない場合は空文字列)、データベース番号、Queueの名前を指定
    workQueue := redismq.CreateQueue(
        "localhost", "6379", "", 0, "test_queue")
    // 文字列を3つ登録
    for _, task := range []string{"task1", "task2", "task3"} {
        err := workQueue.Put(task)
        if err != nil {
            panic(err)
        }
    }
}

これを実行するとRedis上にタスクが登録されていることがわかります。

$ redis-cli keys "*"                                                                           !+[master]
1) "redismq::test_queue"
2) "redismq::queues"
$ redis-cli lrange redismq::test_queue 0 2
1) "{\"Payload\":\"task3",\"CreatedAt\":\"2018-08-31T03:37:24.306993098+09:00\"}"
2) "{\"Payload\":\"task2",\"CreatedAt\":\"2018-08-31T03:37:24.306471076+09:00\"}"
3) "{\"Payload\":\"task1",\"CreatedAt\":\"2018-08-31T03:37:24.306012003+09:00\"}"

登録できたらこれをこんな感じのworkerで読み込んでいきます。

func main() {
    workQueue := redismq.CreateQueue(
        "localhost", "6379", "", 0, "test_queue")
    // コンシューマーを作成
    consumer, err := workQueue.AddConsumer("test_consumer")
    if err != nil {
        panic(err)
    }
    // 前回のプログラムがAckを返さず終了していると処理がコケるのでconsumerを初期化
    err = consumer.ResetWorking()
    if err != nil {
        panic(err)
    }
    for {
        // 一つずつtaskを取得
        pkg, err := consumer.Get()
        if err != nil {
            panic(err)
        }
        // taskの中身を表示
        fmt.Println(pkg.Payload)
        [必要な処理を実行]
        // 処理が終了したらAckを返してtaskをconsumerから削除
        pkg.Ack()
    }
}

これを実行するとRedis上にconsumerが作成されてそこへtaskが移動します。実際に確認してみると指定した名前でconsumer(test_queue::workers)が作成されています。かるく動作を確認してみた結果consumer.Get()を実行した時点でredismq::test_queueからtest_queue::workersにtaskが移動して、pkg.Ack()でtest_queue::workers内のtaskが消される様子が見えました。

redis-cli keys "*"                                                                           !+[master]
1) "test_queue::workers"
2) "redismq::test_queue"
3) "redismq::queues"

実際にこれをk8sで実行しようと思うと先程のworkerのプログラムをスケールさせて、複数実行することになります。しかし、そのままのプログラムでworkerを複数立ち上げるとAckを返していない状態で一つのconsumerから複数のデータを取りににいってしまい処理が固まります。なので、実際にはconsumerをPodごとに新しく作ってtaskを取得していく必要があります。k8sのpod内でpodごとにユニークなidを取得する必要が出てきますがそれはどうやら、k8sの機能をうまく使ってやれば取得できるみたいです。

とりあえずこれでk8s上での処理の流れは見通しが経ったのでウェブクローラー作成引き続き頑張ります。

Minikubeが起動しない問題

MinikubeがLinux上でうまく起動しなくてだいぶ苦戦したのでまとめておきます。

[環境]

  • Manjaro Linux
  • minikube v0.28.2

[問題]
minikubeを起動した後「Starting cluster components…」の部分で固まる。

$ minikube start
Starting local Kubernetes v1.10.0 cluster...
Starting VM...
Getting VM IP address...
Moving files into cluster...
Downloading kubeadm v1.10.0
Downloading kubelet v1.10.0
Finished Downloading kubelet v1.10.0
Finished Downloading kubeadm v1.10.0
Setting up certs...
Connecting to cluster...
Setting up kubeconfig...
Starting cluster components...

結論から話すとbootstrapperを指定して$ minikube start –bootstrapper=localkubeのように起動するとうまくいくようです。

https://github.com/kubernetes/minikube/issues/2765←すごく長いissueが過去に上がってた。

キャッシュが残っているとうまく動かない場合もあるようなので上記のコマンドでうまくいかない人は$ minikube delete && rm ~/.minikubeで一旦キャッシュを消してみるのもありかも。

ただ、issue内の議論を見た感じ根本的な議論はされてなくて、動かなくなる原因はいまいちわかってないみたい。最後の方で「初期のコメントからまったく有益な情報が書き込まれていない」みたいなコメントがあってissueもロックされていました。今回指定したlocakubeも非推奨のbootstrapperで近いうちにサポートが切れる可能性もあるので一時的な回避方法です。

本当はしっかり内部まで調べてデバッグできたらいいのですが、なかなか難しそうなので、ひとまず問題が解決するのを待ちます。

ブログの更新頻度落とします

ブログ毎日更新は週一更新にしようと思います。純粋にきついのと、長期的にやりたい事柄が進まなくなったことが主な要因です。あと、わざわざ共有するような内容でもない記事が増えたこともあります。そのため、もう少し踏み込んだ作業をしてまとめて行くためにも更新頻度は週一にします。

ただ、何も考えずに毎日の更新を止めてしまうと、全く作業しない日が出てきそうなので自分の中でマイルストーンは決めて、週末にはちゃんとブログ書こうと思います。

今年は学生最後の年になるので、エンジニアとしてのスキルに幅を持たせるためにあと半年しっかり走り切りたいと思います。よろしくお願いします。

エンジニアの守備範囲

10人以下のプロジェクトを動かすようなときに、純粋なプログラミング周りの知識だけでエンジニアをやっていると、他の業種と意思疎通をとる上で障害が発生するので、エンジニアもある程度守備範囲を広くもつべきだと最近思います。しかし、プログラミング周りの知識だけでも学習にそうとうな時間を要するのに、それ以外の知識も含め学習を進めるとなると現実的に無理が出てきます。そのため、自分の専門外はある程度妥協しつつ効率よく情報を仕入れていく必要があります。

具体的には、自分の専門外の領域(デザインやビジネス)ではなぜそういった決断に至るのかなんとなく理解できる程度の知識を身に着けていくことになると思います。しかし、このなんとなく理解というのが意外と難しく、やりこみすぎるといくらでも掘り下げられるのでエンジニアとしての職務がおろそかになり、逆にやらなすぎるとそれぞれの領域の人間がなにを考えているのか理解できずに実装を行うための議論がなかなか進まないといった事態になります。

デザイナーの仕事を例として上げると、特定のサービスをブランディングをするときにコンセプトから考えてカラーやフォント等を統一し、サービス内でのユースケースも考慮した上で最終的な決断をしていくといった一般的な流れはざっくり理解しておく必要があります。これがまったく理解できていないと、実装コストを考慮したスケジュールのすり合わせで、それぞれの項目の重要度の理解に多くの時間を費やすことになります。逆に最初からその重要性を理解していればコードを組むときに変更の可能性を考慮したコードが書けたり、戦略上失敗に終わりそうなところは一時的な実装で妥協したりといった柔軟な対応ができます。ビジネスサイドの決定も同じで、ある程度先にあるビジョンを共有できていれば、無駄な実装を減らすことができます。

ただ、あれも知りたいこれも知りたいで深入りしすぎるのもお互いの作業が重複して全体で見たときのパフォーマンスが低下するので妥協点は必要です。そういう意味では他領域の人と飲み会で話たり愚痴を言い合ったりするみたいな場が意外とちょうどいいなと最近思っています。あと、別領域の人に好きな本を一冊聞いて読んで見るというのもなかなかよかったりします。

結論、機会があればエンジニア以外の人に積極的に絡んでいきたい今日この頃です。

 

仮想通貨ボットで稼ぐ

不労所得って夢ですよね。もし、生活に困らないだけの収入が不労所得として実現したら、仕事という概念にとらわれず好きなことだけに専念できるわけです。自分への投資という意味でも最高の環境が手に入ります。

というわけで今日は、僕がこの欲求に突き動かされ開発したBitCoinの取引ボットの話をします。

結論からいくと現状大して儲かっていません。しかも、得られる利益に対して仮想通貨の値動きの影響が大きいので非常に不安定です。値上がり狙いの取引と大差ない…

ロジックの話をすると基本は取引所間の差額を抜くアービトラージをやっています。取引所はBitflyerの値段をベースに手数料の低いZaifで価格差が出たときに取引を行っています。それに加え、ただ単純に差額が出たときだけの取引をするといつまで経っても、どちらかの取引所の方が価格が高いみたいなことが続くので、ある程度の期間でその差が埋まるように直近の取引の情報を考慮してバイアスをかけています。

理論的には価格の上下がランダムで、BitCoinの価格が一定であればこれで利益が出るはずです。しかし実際には、BitCoinの価格の上下があったり、板情報を取得してから取引を行うまでの遅延等で利益が出たりでなかったりです。BitCoinの価格の下落もあり、結果的には今年の5月頭あたりから運用して現状ほぼプラマイゼロです。

さらに、目的が不労所得であるにもかかわらず、時期によって取引の特徴が変わってくる関係で各種パラメータの調整は定期的にやったりしています。本末転倒もいいところ。正直現状労力に見合った収入が得られていないのでこれからの運用どうしたものか悩んでいるところです。

そろそろ、アービトラージとは別に新しい指標を取り入れたロジックは考えていかないといけなさそうです。不労所得の夢は遠い。

 

バブルチャートでスキルセット紹介

ポートフォリオ更新しました。デザイナーでもなんでもないけどなんちゃってポートフォリオです。

https://tdaira.github.io/

今回は、先日実験していたバブルチャート(D3.jsで遊んでみた)を使って、いままで触れてきた技術の一覧が見られるようにしました。なかなかいい感じの動きになったと思います。

こだわりポイントとしては、関連性の強い技術(CloudであればCloud、言語であれば言語)が一定周期でまとまってサイズ変更されるところです。ただある程度ランダム性はもたせてあるので見てて飽きないと思います。

コード読みたい方はHTMLベタ書きで汚いですがこちらからどうぞ。

https://github.com/tdaira/tdaira.github.io/blob/master/index.html

D3.jsで遊んでみた

リアルタイムで更新されるバブルチャートを作りたかったのでD3.jsを使って試行錯誤してみました。

今回は、静止しているバブルチャートにアニメーションを加えていきます。もとのコードはbl.ocks.orgで公開されているものを使わせていただきました。

https://bl.ocks.org/mbostock/4063269

早速実際に書いたアニメーションでバブルのサイズと位置が変わっていくコードを貼ります。

var svg = d3.select("svg"),
    width = +svg.attr("width"),
    height = +svg.attr("height");

var format = d3.format(",d");

var color = d3.scaleOrdinal(d3.schemeCategory20c);

var pack = d3.pack()
    .size([width, height])
    .padding(1.5);

d3.csv("flare.csv", function(d) {
    d.value = +d.value;
    if (d.value) return d;
}, function(error, classes) {
    if (error) throw error;

    var root = d3.hierarchy({children: classes})
        .sum(function(d) { return d.value; })
        .each(function(d) {
            if (id = d.data.id) {
                var id, i = id.lastIndexOf(".");
                d.id = id;
                d.package = id.slice(0, i);
                d.class = id.slice(i + 1);
            }
        });

    var node = svg.selectAll(".node")
        .data(pack(root).leaves())
        .enter().append("g")
        .attr("class", "node")
        .attr("transform", function(d) { return "translate(" + d.x + "," + d.y + ")"; });

    var circle = node.append("circle")
        .attr("id", function(d) { return d.id; })
        .attr("r", function(d) { return d.r; })
        .style("fill", function(d) { return color(d.package); });

    node.append("clipPath")
        .attr("id", function(d) { return "clip-" + d.id; })
        .append("use")
        .attr("xlink:href", function(d) { return "#" + d.id; });

    node.append("text")
        .attr("clip-path", function(d) { return "url(#clip-" + d.id + ")"; })
        .selectAll("tspan")
        .data(function(d) { return d.class.split(/(?=[A-Z][^A-Z])/g); })
        .enter().append("tspan")
        .attr("x", 0)
        .attr("y", function(d, i, nodes) { return 13 + (i - nodes.length / 2 - 0.5) * 10; })
        .text(function(d) { return d; });

    node.append("title")
        .text(function(d) { return d.id + "\n" + format(d.value); });

    // ***********************  ここから書き換え *************************** //

    // バブルのサイズ変更を連続的に行う関数を呼び出し
    repeat();

    function repeat() {
        // ツリーの重みを計算する関数を更新
        root.sum(function (d) {
                d.value = d.value + (Math.random() - 0.5) * 10000;
                return d.value;
            });

        // nodeの情報を上書き
        svg.selectAll(".node")
            .data(pack(root).leaves());

        // 半径を5秒かけて更新
        circle
            .transition()
            .duration(5000)
            .attr("r", function(d) { return d.r; });

        var transitions = 0;
        // 位置を5秒かけて更新
        node
            .transition()
            .duration(5000)
            .attr("transform", function (d) {
                return "translate(" + d.x + "," + d.y + ")";
            }).on("start", function() {
                transitions++;
            }).on("end", function() {
                if( --transitions === 0 ) {
                    // すべての位置変更が終わったら次の変更を呼び出し
                    repeat();
                }
            });
    }
});

最初の設定は全部そのまま使って最後にアニメーション用のコードを挿入しています。手順としては乱数で重みを設定するための関数を設定した後、svgのパラメーターを更新していってます。更新は基本的に重みから自動で計算される値を使っていきます。値の計算は.data(pack(root).leaves())の中でやっています。位置変更の更新完了タイミングは判定方法がいまいちわからなかったのでtransitionsカウンタを作って判定しています。

実際に動いているコードはこちらから見られます。

https://tdaira.github.io/bubble_chart/

やってみた感想ですが正直D3.jsわけがわからなかったです。値はどうやらSelectionクラスが一貫して保持してるようでいろんなところから呼び出せるのですが、そのときのラムダ関数の指定方法とかでなかなか苦戦しました。selectAll()で何もないSelectionにいきなりselectする仕様もなかなか謎でした。また呼び出しのタイミングを間違えると実行したときに怒られることが多くて辛かったです。ただ、慣れればかっこいいチャートが柔軟に色々つくれるんだろうなという印象でした。

JS力つけてちゃんと使えるようにしたい。

 

ブログ連投14日目

ブログを毎日投稿し始めて二週間経ちましたが、連日投稿するという制限にいろいろと無理もあるので、ここで一旦今の考えをまとめておきます。

毎日投稿することで、ふんわりとした内容の記事はだいぶ多くなりました。自分の中の理想としては、日々やった技術的な内容でおもしろい部分をコンパクトに記事にしていきたいと思っていたのですが、やったことが形にならないとどうしても言語を触った感想とか、自分の好みの話とかがメインになります。そうすると結局この話を他の人に共有する意味あるの?っていう記事になることが多くなります。

ただ、進捗がない中無理やりまとめた記事の中にも、まとめてよかったと思える記事はありました。というのも、言語化はごまかしが効かないので対象となる分野の整理が自分の中で進みます。TDDの話しなんかは、文章に起こすことで自分の癖を把握して思考手順の最適化を図るのにだいぶ役立っていると思います。

しかし、純粋な技術の記事以外だと現状自分で書き終わって、これはまとめてよかったと思える記事はまだ現状多くありません。そのため、ふんわりした感想を書いている記事を、感想だけで終わらせるのではなくその思考に至った過程までちゃんとロジカルに判断して解説するところまでもっていく必要があるのだと思います。

おそらくこの作業は毎日やるのは相当しんどいです。正直やめたいとも思います。でもここで諦めたら、一生言語化が苦手な人間で終わってしまいそうなのでもう少し頑張ってみたいと思います。

しばらくは糞みたいな記事を量産すると思いますがよろしくお願いします。

Manjaro Linuxいいよ

最近Manjaro LinuxというLinuxのディストリビューションが調子いいので紹介したいと思います。

もともと、僕は憧れもあってブートローダーやGUI環境のインストール等を全部マニュアルでやるArch LinuxというOSを使っていました。しかし、半年くらいしてブートローダー周りやアップデートの関係で動作が不安定になった場合の対応が辛くなり乗り換え先を考えるようになりました。そして候補として上がったのがManjaro Linuxです。

Manjaro LinuxはベースがArch Linuxなので、その利点である豊富なドキュメント(ArchWiki)がそのまま活かせます。さらに、インストールされるソフトウェアはある程度事前にチェックされたものを公式のリポジトリからダウンロードできるので不具合も比較的起こりずらくなっています。チェックされた上で最終的にはローリング・リリースなのでUbuntu等のディストリビューションよりは早く最新の修正がかかります。

実際に使ってみると、インストールはUbuntu等のディストリビューションと同じようにGUIから完了させることができ簡単にデスクトップ環境が立ち上がります。インストール後に、ドライバの不具合等で動作が不安定になるといったような場合も、ArchWikiから情報取得ができるので問題への対処がしやすいです。

情報の充実度合いでみるとUbuntu等のメジャーなディストリビューションも魅力的ですが、常にシステムの更新が行われれるManjaro Linuxも魅力的なので興味のある人は使ってみてください。