2008-05-11

Parallel Pythonで分散処理

Erlangでなくて、Pythonで分散処理を書く意義はなんだろうかと考えた。
Erlangはたしかに分散処理が得意なんだろうけど、いろいろ調べた感じでは、複雑な数値計算などの分散処理には向いていないというウワサだ。良く知られているTwitterや通信の例のようなシンプルな処理を膨大な量さばくにはいいようだけど。

計算を分散で行う場合、本来はGoogleが採用しているように、C++をベースにすべきだろう。
(ただし、GoogleではSawzallという独自言語で記述し、それをC++に変換して実行するそうな。)

そうなると、「なぜPythonで分散処理?」というのが重要になる訳だが、おそらく以下のようなことだろうか。

  1. 既存の豊富なモジュール(しかも多くはCで書かれている)を使える。
  2. C、C++で書かれたルーチンをswig等でPythonに連結し、それを分散させれば、実質C、C++で実行しているようなものだろう。
  3. 簡単で読みやすい。(重要!)

そこで、まさに自分が今仕事で考えているものの要件でぴったりの問題があった。
問題
文の列(数十文程度)の各文どうしの総当たりのdiff(通常の行単位ではなく、
単語単位でのdiff)を取り、diffで異なった割合を数値にした総当たり表を
リアルタイムで返す。

都合のいいことに、Pythonには標準でdiffを取るためのライブラリdifflibがついているので、こむずかしいdiff計算はそいつに丸投げできる。
これが、文の数が10、20程度なら分散させずに普通に解いても速度上問題はない。
しかし、これが40、50になってくると、ちょっと「リアルタイム」とは言えなくなってくる。
(文の数をnとすると、対称であることと、自分自身とのdiffは計算しないとしても、n*(n-1)/2回diffを取る必要がある。)

ということで、これを分散処理させてみよう。

準備
Parallel Pythonを使う。インストールは通常のpython setup.py install 一発で完了。
実験に使ったソースdiffdist.pyは本記事末尾に添付した。
以下のように実行できる。
分散処理で働いてもらいたいマシン群で、Parallel Pythonのサーバーを起動しておく。
ppserver.pyコマンドはParallel Pythonをインストールした時に、pythonコマンドと同じディレクトリ(Linux, OSX)又は、c:¥Python25¥Scritpsフォルダ(Windows)に格納されているものです。
$ /PATH/TO/PYTHONBIN/ppserver.py

あとは、diffdist.pyのppservers変数(113行あたり)にサーバー群のアドレスを記入しておいて、
$ ./diffdist.py

で実行。
(※Parallel Python本家ドキュメントに従ってauto discovery機能を使えば、IPアドレスの設定記入を省く事もできる。)
今回は、140文(合計サイズ32kb程度)の英文の総当たり表を計算した。(今回このデータは添付しない。)
英文データファイルの各行どうしで、単語ごとのdiffを取るというサンプルになっている。

結果
まず普通に分散させないコードだとどれぐらいになるかというと、
ayu@~/work/difftest% time ./diffdist.py 
Starting pp with 2 workers
./diffdist.py 26.64s user 0.36s system 97% cpu 27.705 total

ていう感じ。余裕で20秒以上かかります。

これを、自分の手元マシン(MacBook白2GHz Intel Core2 Duo & メモリ2G)でppserverを起動して分散処理させると、
ayu@~/work/difftest% time ./diffdist.py 
Starting pp with 2 workers
./diffdist.py 14.22s user 0.62s system 90% cpu 16.412 total

ここに、さらに別マシン(DELL SC1420、Intel Xeon 2.8GHz & メモリ1G)を追加すると、、、
ayu@~/work/difftest% time ./diffdist.py 
Starting pp with 2 workers
./diffdist.py 9.62s user 0.59s system 89% cpu 11.394 total

順調に速くなっている。。
おまけだが、我が家の物置で眠っていたSharpのメビウス君(AMD Duron 897MHz & メモリ256M)を追加すると、、、
ayu@~/work/difftest% time ./diffdist.py 
Starting pp with 2 workers
./diffdist.py 8.03s user 0.58s system 85% cpu 10.100 total

ほんのちょっとだけど速くなるね!

この調子で、どんどんマシンを追加していけば、この処理はすごく速くなっていくだろう。
明日会社で本格的に試してみよう。

ソース diffdist.py
#!/usr/local/Python25/bin/python
# _*_ coding: utf-8 _*_
# Copyright (C) 2007 Ayukawa Hiroshi
import sys
import difflib
from itertools import islice
from collections import defaultdict

import pp

def getdiffrate(src1, src2):
"""
diffを取って、異なった割合(diff割合)を返す。
"""
SPC = " "
same = 0
diff = 0
for x in difflib.ndiff(src1, src2):
if x[0] == SPC:
same += 2 #一致部分カウントはsrc1とsrc2の分で+2
else:
diff += 1
return 100 * same / float(same+diff)

def diffrow(src):
"""
一つの文と、対比する複数の文のdiff割合をバッチ的に計算する。

引数 src
[(文番号, 原文, [(対比する文1の番号,対比する文1), (対比する文2の番号,対比する文), ..]), ...]

返り値
[(文番号, 対比する文の番号, 原文と対比文のdiff割合), ...]
"""
ans = []
for i1, s1, osrc in src:
for i2, s2 in osrc:
rate = getdiffrate(s1, s2)
ans.append((i1, i2, rate))
return ans

def d_diffmatrix(job_server, srcs, unit=140):
"""
文一覧を与えて、各文同士の間のdiff割合行列を返す。(分散処理バージョン)

引数
job_server 分散処理サーバー
srcs 文一覧(文字列の配列)
unit バッチ処理する単位(文の数)

返り値
行列データ(dict型データで、res[1][2]のようにdiff割合を格納しています。)
ただし、対角線成分上には、同じ文同士のdiff割合を格納すべきですが、計算を省略するため、
0.0を格納しています。
"""
#各タスク実行に必要なデータを作成する。
task = [] #タスクに与えるデータ格納用
subtask = [] #サブタスクとりわけ用
sublen = 0 #サブタスクの分量計算用
#全タスクを、文の数に従って、サブタスクに分割します。
#サブタスクごとに分散処理させます。
for i1, s1 in enumerate(srcs):
osrc = list(islice(enumerate(srcs), i1))
subtask.append((i1, s1, osrc))
sublen += len(osrc)
if sublen > unit: #文の数がunitを超えたら、一回分としてtaskに格納。
task.append(subtask)
subtask = []
sublen = 0
if subtask:
task.append(subtask)

#分散処理します。
jobs = [] #実行ジョブ格納用
for t in task:
jobs.append(job_server.submit(diffrow,(t, ), (getdiffrate,), ("difflib",)))

#結果を取得して格納
diffrates = defaultdict(lambda: defaultdict(float))
for job in jobs:
for i1, i2, rate in job():
#行列状に格納
diffrates[i1][i2] = rate
diffrates[i2][i1] = rate
return diffrates

def diffmatrix(srcs):
"""
文一覧を与えて、各文同士の間のdiff割合行列を返す。(比較のための通常処理バージョン)

引数
srcs 文一覧(文字列の配列)

返り値
行列データ(dict型データで、res[1][2]のようにdiff割合を格納しています。)
ただし、対角線成分上には、同じ文同士のdiff割合を格納すべきですが、計算を省略するため、
0.0を格納しています。
"""

#全タスクのジェネレーター
def gen():
for i1, s1 in enumerate(srcs):
osrc = list(islice(enumerate(srcs), i1))
yield (i1, s1, osrc)
#処理しながら、結果を行列に格納
diffrates = defaultdict(lambda: defaultdict(float))
for i1, i2, rate in diffrow(gen()):
diffrates[i1][i2] = rate
diffrates[i2][i1] = rate
return diffrates

if __name__ == "__main__":
ppservers = ("192.168.1.21","192.168.1.22","192.168.1.23")

if len(sys.argv) > 1:
ncpus = int(sys.argv[1])
job_server = pp.Server(ncpus, ppservers=ppservers)
else:
job_server = pp.Server(ppservers=ppservers)

print "Starting pp with", job_server.get_ncpus(), "workers"

src = file("samplepatent.txt").readlines()
res = d_diffmatrix(job_server, [x.split(" ") for x in src])

#比較用
#res = diffmatrix([x.split(" ") for x in src])