2008-05-12

PythonでWard法によるクラスタリング

Pythonで、Ward法によるクラスタリング(デンドログラム作成まで)を実装してみた。
参考にしたのは、岡山大学が公開しているこの資料
(岡山大学の先生ありがとう!)

今回こんな車輪の再発明をしたのは、距離表と呼ばれる行列データを行列のイメージで保存せず、

[(ベクトル1とベクトル2の距離, ベクトル番号1, ベクトル番号2), ...]

という形式のリストにして、距離でソートしておき、そのソート結果を維持したままこのリストを分割、更新していくという方針で作ったらどうかと思ったからです。

196個の19次元の非スパースなデータのデンドログラム作成に2.5秒だからなかなかのパフォーマンスではないだろうか。
CもC++もNumpy系の数値計算ライブラリも使わないでこれだから、いいですねえ。

使い方
読み込むデータファイルをタブ区切りで用意する。
1カラム目はデータの識別用ID文字列、2カラム目以降にベクトルの数値データを記入。
あとはそのデータを標準入力から流し込むだけ。

以下のsample.txtを用意する。
A 1.0 1.1 0.2
B 1.1 0.5 0.3
C 3.1 0.5 0.2
D 4.0 1.1 1.2

以下を実行
ayu@~/work/cluster% ./cluster.py < sample.txt 
1 : (B+A)
2 : (D+C)
3 : ((D+C)+(B+A))

最初にBとAを結合し、次にDとCを結合し、最後に(D+C)と(B+A)を結合していることが分かる。

ソースcluster.py
#!/usr/local/Python25/bin/python
# _*_ coding: utf-8 _*_
# Copyright (C) 2007 Ayukawa Hiroshi
import sys
from collections import defaultdict
from itertools import repeat, groupby
from math import sqrt

def distmatrix_by_dict(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルがdict型のsparseベクトルの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
for i in xrange(len(vectors)):
for j in xrange(i):
dist = 0.0
for k in set(vectors[i].keys()) & set(vectors[j].keys()): #ユークリッド距離
dist += (vectors[i].get(k, 0.0) - vectors[j].get(k, 0.0))**2
yield (sqrt(dist), i, j)

def distmatrix_by_list(vectors):
"""
上三角部分のみの距離表を作ります(ベクトルが通常のリストの場合)
返り値 (距離, ベクトル番号1, ベクトル番号2) の列
"""
for i in xrange(len(vectors)):
for j in xrange(i):
dist = sum([(x[0]-x[1])**2 for x in zip(vectors[i], vectors[j])]) #ユークリッド距離
yield (sqrt(dist), i, j)

def cluster(vectors, distfunc=distmatrix_by_list):
"""
クラスタリング(ウォード法)
返り値 クラスタ結合履歴配列
"""
#距離表の用意
data = list(distfunc(vectors))
data.sort()

size = len(vectors) #全体のサイズ
sizes = list(repeat(1, size)) # 各クラスタのサイズ

#ノードを結合した履歴を保存する配列(この列を最終的に返します)
hist = [dict(
data=data, #data: まだ結合されていない成分一覧(常に距離でソート済みの状態で格納)
joined=None, #joined: この回で結合したインデックス
dist=0.0 #dist: この回で結合したインデックス間の距離
)]

nextnodenum = size #次に使うクラスタ番号

#size-1回繰り返して結合していきます。
while len(hist) < size:
#最小距離のものを選ぶ
firstpair = min([h["data"][0] for h in hist if h["data"]])
#最小距離のもののインデックスペア
pairindex = set(firstpair[1:])

movedata = [] #新クラスタ形成のために計算し直し対象とするもの格納用
for h in hist: #履歴をさかのぼって、計算しなおしとそうでないものに分ける。
newdata = [] #計算しなおさない居残り組格納用
for d in h["data"]:
if d == firstpair: #今回結合対象のものは除外
continue
intersect = set(d[1:]) & pairindex
if intersect: #今回結合対象とインデックスがかぶるなら、計算しなおし対象へ
movedata.append(d)
else: #そうでないなら居残り組
newdata.append(d)
h["data"] = newdata #居残り組に更新

newdata = [] #計算しなおしデータ格納用

dpq, p, q = firstpair #改めて今回の結合ペアをp,qなどと名付ける
#計算し直しデータを点ごとにグループ化。この点ごとに新距離を計算しなおします。
movedata = groupby(sorted((((set(x[1:])-pairindex).pop(), x) for x in movedata)), key=lambda x: x[0])
for r, d in movedata:
newdist = 0.0 #新距離計算用
for w in d: #ward法計算式にしたがって計算
w = w[1]
newdist += (sizes[w[1]]+sizes[w[2]])*w[0]
newdist = (newdist-sizes[r] * dpq) / float(sizes[r]+sizes[p]+sizes[q])
newdata.append((newdist, nextnodenum, r)) #新距離を格納
newdata.sort() #距離データは常にソートして整理整頓
sizes.append(sizes[p]+sizes[q]) #新クラスタのサイズデータを格納
nextnodenum += 1 #次のクラスタID
#履歴情報に今回の結合結果を格納
hist.append(dict(
data=newdata,
joined=(p,q),
dist=firstpair[0]
))
return hist

def main(nottest=False):
"""
引数 nottest パフォーマンステスト用フラグ Falseの場合はテストのために、結果出力しません。
"""
ids = [] # データラベル格納用
vectors = [] # ベクトル格納用
for line in sys.stdin: #標準入力からベクトルを読む
data = line.strip().split("\t")
ids.append(data[0])
vectors.append(map(float, data[1:]))

#クラスタリング実行
hist = cluster(vectors)

if nottest: #出力処理
for c, h in enumerate(hist):
j = h["joined"]
if not j: continue
i = "(%s+%s)" % (ids[j[0]], ids[j[1]])
ids.append(i)
print c, ":", i


if __name__ == "__main__":
import optparse

parser = optparse.OptionParser(u"""
Ward法によるクラスタリング。
""")
parser.add_option("-t", "--test", dest="test", help=u"パフォーマンステスト実行", default=False, action="store_true")
(options, args) = parser.parse_args()

if options.test:
#パフォーマンステスト
import hotshot, hotshot.stats
prof = hotshot.Profile("cluster.prof")
prof.runcall(main)
prof.close()
stats = hotshot.stats.load("cluster.prof")
stats.strip_dirs()
stats.sort_stats('time', 'calls')
stats.print_stats(20)
else:
#通常実行
main(True)