2008-05-14

Ward法再実装で少し速くなった

PythonでWard法によるクラスタリング
Ward法のパフォーマンスと分散処理のための考察

このシリーズの続き。
プログラムを書き直し、真っ当な行列的データを用意したところ、ずいぶん速くなり、588件でも19秒程度となった。(1100件でも2分程度なので、ずいぶんと自分が想定している実用化に近づいたと思う。)

しかし、真っ当な行列データにすると、今度は最小距離の探索に9割の時間を取られるようになったため、ここを改造すべきと強く思う。

ここまでくれば、常に行列の成分のソート情報をなんらか記憶しておけばいいように思うのだが、そのソート情報の維持にはまたコストがかかるというジレンマ。

今は可変長の行列として、pythonのdefaultdict(lambda: defaultdict(float))を使っている。これで定義した簡易行列クラスは非常に便利なのでそんなにシビアでない計算の目的にはよく使っている。可変長&メモリ節約というのがとくにすばらしい。

これがC++であったらどうするべきであろうか。
こうした細かい計算アルゴリズムの王道としては最初から使用するメモリをまとめて最小限確保しておき、その上でメモリを上書きしながら計算して行くべきか。
それとも今回のPythonでの実装にならってSTLを使って map<size_t, map<size_t, double>*>* などとするべきか。ああ、こういう場合shared_ptrにするべきなんだっけ、、、。ここはいつも覚えてないので参考書を読み直さなくては。
はたまたboostになんかいいのないかと見ていたら、いつの間にかboostのコンテナ系が増えている!調査しなければ。
うーん、前途多難。

以下、今日の改造版。

#!/usr/local/python25/bin/python
# _*_ coding: utf-8 _*_
# Copyright (C) 2007 Ayukawa Hiroshi
import sys
import time
from operator import itemgetter
from collections import defaultdict
from itertools import repeat, groupby
from math import sqrt

def distmatrix_by_dict(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルがdict型のsparseベクトルの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
for i in xrange(len(vectors)):
for j in xrange(i):
dist = 0.0
for k in set(vectors[i].keys()) & set(vectors[j].keys()): #ユークリッド距離
dist += (vectors[i].get(k, 0.0) - vectors[j].get(k, 0.0))**2
yield (sqrt(dist), i, j)

def distmatrix_by_list(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルが通常のリストの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
for i in xrange(len(vectors)):
for j in xrange(i):
dist = sum([(x[0]-x[1])**2 for x in zip(vectors[i], vectors[j])]) #ユークリッド距離
yield (sqrt(dist), i, j)

def minmatrix(mat, size):
d = min((min(((v, j) for j,v in mat[i].iteritems())), i) for i in mat.iterkeys() if mat[i])
return (d[0][0], d[1], d[0][1])

def cluster(vectors, distfunc=distmatrix_by_list):
"""
クラスタリング(ウォード法)
返り値 クラスタ結合履歴配列
"""
#距離表の用意
mat = defaultdict(lambda : defaultdict(float))
for d in distfunc(vectors):
mat[d[1]][d[2]] = d[0]

size = len(vectors) #全体のサイズ
sizes = list(repeat(1, size)) # 各クラスタのサイズ

#ノードを結合した履歴を保存する配列(この列を最終的に返します)
hist = [dict(
joined=None, #joined: この回で結合したインデックス
dist=0.0, #dist: この回で結合したインデックス間の距離
newid=None, #newid: この回で新設されたインデックス
)]

class Rap:
def __init__(self):
self.t = 0
def b(self):
self.start = time.time()
def e(self):
self.t += time.time() - self.start
rap1 = Rap()
rap2 = Rap()
nextnodenum = size #次に使うクラスタ番号
#size-1回繰り返して結合していきます。
kieta = set()
while len(hist) < size:
#最小距離のものを選ぶ
rap1.b()
dist, q, p = minmatrix(mat, nextnodenum) # p < q
rap1.e()

#消えるもの集合を更新
kieta.add(p)
kieta.add(q)

#高速化のための事前計算
pairsize = sizes[p]+sizes[q]
r = mat[q][p]
obj = mat[nextnodenum]
sp = sizes[p]
sq = sizes[q]

#距離の計算しなおし
rap2.b()
for i in xrange(nextnodenum):
if i in kieta: continue #すでに消えているなら何もしない

#距離の計算 Ward法
if i < p:
obj[i] = ((sp+sizes[i])*mat[p][i]+(sq+sizes[i])*mat[q][i]-sizes[i]*r)/float(pairsize+sizes[i])
elif p < i and i < q:
obj[i] = ((sp+sizes[i])*mat[i][p]+(sq+sizes[i])*mat[q][i]-sizes[i]*r)/float(pairsize+sizes[i])
mat[i].pop(p) #今後使わなくなる成分の消去
else:
obj[i] = ((sp+sizes[i])*mat[i][p]+(sq+sizes[i])*mat[i][q]-sizes[i]*r)/float(pairsize+sizes[i])
mat[i].pop(p) #今後使わなくなる成分の消去
mat[i].pop(q) #今後使わなくなる成分の消去
if p in mat: mat.pop(p) #今後使わなくなる成分の消去
if q in mat: mat.pop(q) #今後使わなくなる成分の消去
sizes.append(pairsize) #新クラスタのサイズデータを格納
rap2.e()
#履歴情報に今回の結合結果を格納
hist.append(dict(
joined=(p,q),
dist=dist,
newid=nextnodenum,
))
nextnodenum += 1 #次のクラスタID
print "rap1", rap1.t
print "rap2", rap2.t

return hist

def main(nottest=False):
"""
引数 nottest パフォーマンステスト用フラグ Falseの場合はテストのために、結果出力しません。
"""
ids = [] # データラベル格納用
vectors = [] # ベクトル格納用
for line in sys.stdin: #標準入力からベクトルを読む
data = line.strip().split("\t")
ids.append(data[0])
vectors.append(map(float, data[1:]))

#クラスタリング実行
hist = cluster(vectors)

if nottest: #出力処理
for c, h in enumerate(hist):
j = h["joined"]
if not j: continue
i = "(%s+%s)" % (ids[j[0]], ids[j[1]])
ids.append(i)
print c, ":", h["dist"], ":", i

if __name__ == "__main__":
import optparse

parser = optparse.OptionParser(u"""
Ward法によるクラスタリング。
""")
parser.add_option("-t", "--test", dest="test", help=u"パフォーマンステスト実行", default=False, action="store_true")
(options, args) = parser.parse_args()

if options.test:
#パフォーマンステスト
import hotshot, hotshot.stats
prof = hotshot.Profile("cluster.prof")
prof.runcall(main)
prof.close()
stats = hotshot.stats.load("cluster.prof")
stats.strip_dirs()
stats.sort_stats('time', 'calls')
stats.print_stats(20)
else:
#通常実行
main(True)