2008-05-15

Ward法さらにキャッシュを効かせて高速に

さらに昨日の続き。

行列の各行の最小距離成分をキャッシュするようにしてみた。
この改造によって、昨日O(N**2)でネックとなっていた、距離表からの最小値の探索がほぼO(N)になったことになる。
さらにパフォーマンスは向上し、以下のようになった。

件数   処理時間(s)    そのうち初期行列作成時間
--------------------------------------------
196 0.677 0.334
392 2.158 1.356
588 4.636 3.073
1176 18.071 12.088
2352 79.09 49.507

昨日、おとといに比べると、だいぶ速くなった。例えば昨日は588件の処理に19秒かかっていたのが、今日は4.6秒!
しかし、計算は速くなったものの、今度は実は最初の行列作成に大半の時間を使われていることがわかる。

ここが速くなれば、あるいは、行列の作成時間を気にしなくていい状況とかであれば、十分使い物になるパフォーマンスになってきた。
C/C++にはとうていかなわないでしょうけど、Pythonでも工夫次第でいろいろがんばれるものだ。

以下、今日の改造版のソース。
#!/usr/local/python25/bin/python
# _*_ coding: utf-8 _*_
# Copyright (C) 2007 Ayukawa Hiroshi
import sys
import time
from operator import itemgetter
from collections import defaultdict
from itertools import repeat, groupby
from math import sqrt

def distmatrix_by_dict(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルがdict型のsparseベクトルの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
mat = defaultdict(lambda: defaultdict(float))
for i in range(len(vectors)):
dist = []
for j in range(i):
d = 0.0
for k in set(vectors[i].keys()) & set(vectors[j].keys()): #ユークリッド距離
d += (vectors[i].get(k, 0.0) - vectors[j].get(k, 0.0))**2
dist.append((j, sqrt(d)))
mat[i] = dafaultdict(float, dist)
return mat

def distmatrix_by_list(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルが通常のリストの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
mat = defaultdict(lambda: defaultdict(float))
for i in range(1,len(vectors)):
v = vectors[i]
dist = []
for j in range(i):
dist.append((j, sqrt(sum([(x[0]-x[1])**2 for x in zip(v, vectors[j])])))) #ユークリッド距離
mat[i] = defaultdict(float, dist)
return mat

def _initcache(mat):
"""
距離表の各行ごとの最小値をキャッシュする配列を初期化します。
"""
cache = []
for i in sorted(mat.keys()):
cache.append((i, min(((v, i, j) for j,v in mat[i].iteritems()))))
return dict(cache)

class _Rap:
"""
パフォーマンス測定用クラス
"""
def __init__(self):
self.t = 0
def b(self):
self.start = time.time()
def e(self):
self.t += time.time() - self.start

def cluster(vectors, distfunc=distmatrix_by_list):
"""
クラスタリング(ウォード法)
返り値 クラスタ結合履歴配列
"""
rap0 = _Rap()
rap00 = _Rap()

#距離表の用意
rap0.b()
mat = distfunc(vectors)
rap0.e()

#距離表の最小値キャッシュを初期化する
rap00.b()
cache = _initcache(mat)
rap00.e()

size = len(vectors) #全体のサイズ
sizes = list(repeat(1, size)) # 各クラスタのサイズ

#ノードを結合した履歴を保存する配列(この列を最終的に返します)
hist = [dict(
joined=None, #joined: この回で結合したインデックス
dist=0.0, #dist: この回で結合したインデックス間の距離
newid=None, #newid: この回で新設されたインデックス
)]
rap1 = _Rap()
rap2 = _Rap()
nextnodenum = size #次に使うクラスタ番号
kieta = set() #消えた行番号の保存用

#size-1回繰り返して結合していきます。
while len(hist) < size:
#最小距離のものを選ぶ
rap1.b()
dist, q, p = min(cache.values()) # p < q
rap1.e()

#消えるもの集合を更新
kieta.add(p)
kieta.add(q)

#高速化のための事前計算
pairsize = sizes[p]+sizes[q]
r = mat[q][p]
obj = mat[nextnodenum]
sp = sizes[p]
sq = sizes[q]

#距離の計算しなおし
rap2.b()
#キャッシュから、今回消えるものをあらかじめ排除
if p in cache: cache.pop(p)
if q in cache: cache.pop(q)
tmpdist = [] #新ノードからの最小距離を求めるための一時記憶場所
for i in xrange(nextnodenum):
if i in kieta: continue #すでに消えているなら何もしない

#距離の計算 Ward法
if i < p:
obj[i] = ((sp+sizes[i])*mat[p][i]+(sq+sizes[i])*mat[q][i]-sizes[i]*r)/float(pairsize+sizes[i])
elif p < i and i < q:
obj[i] = ((sp+sizes[i])*mat[i][p]+(sq+sizes[i])*mat[q][i]-sizes[i]*r)/float(pairsize+sizes[i])
mat[i].pop(p) #今後使わなくなる成分の消去
else:
obj[i] = ((sp+sizes[i])*mat[i][p]+(sq+sizes[i])*mat[i][q]-sizes[i]*r)/float(pairsize+sizes[i])
mat[i].pop(p) #今後使わなくなる成分の消去
mat[i].pop(q) #今後使わなくなる成分の消去

#キャッシュ更新
if i in cache:
if obj[i] < cache[i][0]: #新ノードとi番目の距離がより近い場合はi行目のキャッシュを置き換えます。
cache[i] = (obj[i], nextnodenum, i)
elif mat[i] and (p in cache[i][1:] or q in cache[i][1:]):
cache[i] = min(((v, i, j) for j,v in mat[i].iteritems()))
if not mat[i]: #i行が空になってたら、iのキャッシュは不要。matからもけずる
cache.pop(i)
mat.pop(i)
elif not mat[i]: mat.pop(i)
tmpdist.append((obj[i], i))
if p in mat: mat.pop(p) #今後使わなくなる成分の消去
if q in mat: mat.pop(q) #今後使わなくなる成分の消去

#新ノードからの最小距離をキャッシュに記録
if tmpdist:
mindist = min(tmpdist)
cache[nextnodenum] = (mindist[0], nextnodenum, mindist[1])

sizes.append(pairsize) #新クラスタのサイズデータを格納
rap2.e()
#履歴情報に今回の結合結果を格納
hist.append(dict(
joined=(p,q),
dist=dist,
newid=nextnodenum,
))
nextnodenum += 1 #次のクラスタID

print "rap0", rap0.t
print "rap00", rap00.t
print "rap1", rap1.t
print "rap2", rap2.t

return hist

def main(nottest=False):
"""
引数 nottest パフォーマンステスト用フラグ Falseの場合はテストのために、結果出力しません。
"""
ids = [] # データラベル格納用
vectors = [] # ベクトル格納用
for line in sys.stdin: #標準入力からベクトルを読む
data = line.strip().split("\t")
ids.append(data[0])
vectors.append(map(float, data[1:]))

#クラスタリング実行
hist = cluster(vectors)

if nottest: #出力処理
for c, h in enumerate(hist):
j = h["joined"]
if not j: continue
i = "(%s+%s)" % (ids[j[0]], ids[j[1]])
ids.append(i)
print c, ":", h["dist"], ":", i

if __name__ == "__main__":
import optparse

parser = optparse.OptionParser(u"""
Ward法によるクラスタリング。
""")
parser.add_option("-t", "--test", dest="test", help=u"パフォーマンステスト実行", default=False, action="store_true")
(options, args) = parser.parse_args()

if options.test:
#パフォーマンステスト
import hotshot, hotshot.stats
prof = hotshot.Profile("cluster.prof")
prof.runcall(main)
prof.close()
stats = hotshot.stats.load("cluster.prof")
stats.strip_dirs()
stats.sort_stats('time', 'calls')
stats.print_stats(20)
else:
#通常実行
main(True)