1.
k
k
k 近邻算法:
给定一个训练数据集,对新的输入实例,在训练数据集中 找到与该实例最邻近的
k
k
k 个实例,这
k
k
k 个实例的多数属于某个类,就把该输入实例分为这个类。
1.1k 近邻法
输入: 训练数据集
T
=
{
(
x
1
,
y
1
)
,
(
x
2
,
y
2
)
,
⋯
,
(
x
N
,
y
N
)
}
T=\left\{\left(x_{1}, y_{1}\right),\left(x_{2}, y_{2}\right), \cdots,\left(x_{N}, y_{N}\right)\right\}
T={(x1,y1),(x2,y2),⋯,(xN,yN)} 其中,
x
i
∈
X
⊆
R
n
x_{i} \in \mathcal{X} \subseteq \mathbf{R}^{n}
xi∈X⊆Rn 为实例的特征向量,
y
i
∈
Y
=
{
c
1
,
c
2
,
⋯
,
c
K
}
y_{i} \in \mathcal{Y}=\left\{c_{1}, c_{2}, \cdots, c_{K}\right\}
yi∈Y={c1,c2,⋯,cK} 为实例的类 别,
i
=
1
,
2
,
⋯
,
N
;
i=1,2, \cdots, N ;
i=1,2,⋯,N; 实例特征向量
x
x
x 输出: 实例
x
x
x 所属的类
y
y
y 。 (1)根据给定的距离度量(欧式马氏距离等等),在训练集 T 中找出与 x 最邻近的
k
k
k 个点,涵盖这
k
k
k 个 点的
x
x
x 的邻域记作
N
k
(
x
)
N_{k}(x)
Nk(x); (2)在
N
k
(
x
)
N_{k}(x)
Nk(x) 中根据分类决策规则 (如多数表决) 决定
x
x
x 的类别
y
:
y:
y:
y
=
arg
max
c
j
∑
x
i
∈
N
k
(
x
)
I
(
y
i
=
c
j
)
,
i
=
1
,
2
,
⋯
,
N
;
j
=
1
,
2
,
⋯
,
K
y=\arg \max _{c_{j}} \sum_{x_{i} \in N_{k}(x)} I\left(y_{i}=c_{j}\right), \quad i=1,2, \cdots, N ; j=1,2, \cdots, K
y=argcjmaxxi∈Nk(x)∑I(yi=cj),i=1,2,⋯,N;j=1,2,⋯,K 其中,I 为指示函数, 即当
y
i
=
c
j
y_{i}=c_{j}
yi=cj 时
I
I
I 为
1
,
1,
1, 否则
I
I
I 为 0 k 近邻法的特殊情况是
k
=
1
k=1
k=1 的情形, 称为最近邻算法。对于输入的实例点 (特征 向量)x,最近邻法将训练数据集中与
x
x
x 最邻近点的类作为
x
x
x 的类。
1.2 模型
k 近邻法中,当训练集、距离度量、k 值及分类决策规则(如多数表 决)确定后,对于任何一个新的输入实例,它所属的类唯一地确定。这相当于根据上 述要素将特征空间划分为一些子空间,确定子空间里的每个点所属的类。这一事实从最近邻算法中可以看得很清楚。 各种距离度量请参考距离度量 特征空间中,对每个训练实例点
x
i
x_{i}
xi, 距离该点比其他点更近的所有点组成一个区域,叫作单元(cell)。每个训练实例点拥有一个单元,所有训练实例点的单元构成对特 征空间的一个划分。最近邻法将实例
x
i
x_{i}
xi 的类
y
i
y_{i}
yi 作为其单元中所有点的类标记(class label)。这样,每个单元的实例点的类别是确定的。
1.21 k值的选择
可以看出:
k
k
k 值的选择会对
k
k
k 近邻法的结果产生重大影响。
如果选择较小的
k
k
k 值,就相当于用较小的邻域中的训练实例进行预测,“学习”的 近似误差 (approximation error) 会减小,只有与输入实例较近的 (相似的) 训练实例 才会对预测结果起作用。但缺点是“学习”的估计误差 (estimation error) 会增大, 预 测结果会对近邻的实例点非常敏感。如果邻近的实例点恰巧是噪声,预測就会出 错。换句话说,k 值的减小就意味者整体模型变得复杂, 容易发生过拟合。
如果选择较大的 k 值,就相当于用较大邻域中的训练实例进行预测。其优点是 可以减少学习的估计误差,但缺点是学习的近似误差会增大。这时与输入实例较远 的 (不相似的) 训练实例也会对顶测起作用, 使预测发生错误。
k
k
k 值的增大就意味着整体的模型变得简单。 如果
k
=
N
k=N
k=N, 那么无论输入实例是什么,都将简单地预测它属于在训练实例中最 多的类。这时,模型过于简单, 完全忽略训练实例中的大量有用信息,是不可取的。 在应用中,
k
k
k 值一般取一个比较小的数值。通常采用交叉验证法来选取最优的
k
k
k 值。
2.
k
d
kd
kd树
实现 k 近邻法时,主要考虑的问題是如何对训练数据进行快速
k
k
k 近邻搜索。这点 在特征空间的维数大及训练数据容量大时尤其必要。 k 近邻法最简单的实现方法是线性扫描 (linear scan)。这时要计算输入实例与每 一个训练实例的距离。当训练集很大时,计算非常耗时,这种方法是不可行的。 为了提高
k
k
k 近邻搜索的效率,可以考虑使用特殊的结构存储训练数据,以减少计 算距离的次数。具体方法很多,下面介绍其中的
k
d
k d
kd 树 (
k
d
k d
kd tree
)
)
) 方法
O
.
\mathbb{O} .
O. 注意,kd 树是存储
k
k
k 维空间数据的树结构, 这里的
k
k
k 与
k
k
k 近邻法的
k
k
k 意义不同,为了与习惯一致,仍用 kd 树的名称。
2.1构造
k
d
kd
kd树
输入:
k
k
k 维空间数据集
T
=
{
x
1
,
x
2
,
⋯
,
x
N
}
,
T=\left\{x_{1}, x_{2}, \cdots, x_{N}\right\},
T={x1,x2,⋯,xN}, 其中
x
i
=
(
x
i
(
1
)
,
x
i
(
2
)
,
⋯
,
x
i
(
k
)
)
T
x_{i}=\left(x_{i}^{(1)}, x_{i}^{(2)}, \cdots, x_{i}^{(k)}\right)^{\mathrm{T}}
xi=(xi(1),xi(2),⋯,xi(k))T
i
=
1
,
2
,
⋯
,
N
i=1,2, \cdots, N
i=1,2,⋯,N 输出:
k
d
k d
kd 树。 (1)开始: 构造根结点,根结点对应于包含 T 的 k 维空间的超矩形区域。 选择
x
(
1
)
x^{(1)}
x(1) 为坐标轴,以
T
T
T 中所有实例的
x
(
1
)
x^{(1)}
x(1) 坐标的中位数为切分点,将根结点 对应的超矩形区域切分为两个子区域。切分由通过切分点并与坐标轴
x
(
1
)
x^{(1)}
x(1) 垂直的超平 面实现。 由根结点生成深度为 1 的左、右子结点: 左子结点对应坐标
x
(
1
)
x^{(1)}
x(1) 小于切分点的子 区域,右子结点对应于坐标
x
(
1
)
x^{(1)}
x(1) 大于切分点的子区域。 将落在切分超平面上的实例点保存在根结点。 (2)重便: 对渔度为
j
j
j 的结点,选择
x
(
l
)
x^{(l)}
x(l) 为切分的坐标轴,
l
=
j
(
m
o
d
k
)
+
1
,
l=j(\bmod k)+1,
l=j(modk)+1, 以 该结点的区域中所有实例的 x
(
l
)
^{(l)}
(l) 坐标的中位数为切分点,将该结点对应的超矩形区域 切分为两个子区域。切分由通过切分点并与坐标轴
x
(
l
)
x^{(l)}
x(l) 垂直的超平面实现。 由该结点生成深度为
j
+
1
j+1
j+1 的左、右子结点: 左子结点对应坐标
x
(
l
)
x^{(l)}
x(l) 小于切分点 的子区域,右子结点对应坐标
x
(
l
)
x^{(l)}
x(l) 大于切分点的子区域。 将落在切分超平面上的实例点保存在该结点。 (3)直到两个子区域没有实例存在时停止。从而形成
k
d
kd
kd树的区域划分.
2.1
k
d
kd
kd树的搜索
输入: 已构造的 kd 树,目标点
x
x
x; 输出:
x
x
x 的最近邻。 (1)在 kd 树中找出包含目标点 x 的叶结点: 从根结点出发,递归地向下访问
k
d
k d
kd 树。若目标点
x
x
x 当前维的坐标小于切分点的坐标,则移动到左子结点,否则移动到右 子结点。直到子结点为叶结点为止。 (2)以此早结点为“当前最近点”。 (3)递归地向上回退,在每个结点进行以下操作: (a)如果该结点保存的实例点比当前最近点距离目标点更近,则以该实例点 为“当前最近点”。 (b)当前最近点一定存在于该结点一个子结点对应的区域。检查该子结点的父 结点的另一子结点对应的区域是否有更近的点。具体地,检本另一子结点对应的 区域是否与以目标点为球心、以目标点与“当前最近点”间的距离为半径的超球体 相交。 如果相交,可能在另一个子结点对应的区域内存在距目标点更近的点,移动 到另一个子结点。接售,递归地进行最近邻搜索: 如果不相交,向上回遇。 (4)当回退到根结点时,搜索结束。最后的“当前最近点”即为 x 的最近邻点。 如果实例点是随机分布的,kd 树搜索的平均计算复杂度是 O(log N),这里 N 是 训练实例数。kd 树更适用于训练实例数远大于空间维数时的 k 近邻搜索。当空间维数 接近训练实例数时,它的效率会迅速下降,几乎接近线性扫描。
3.代码实现
import numpy
as np
import pandas
as pd
from sklearn
.model_selection
import KFold
from sklearn
.preprocessing
import MinMaxScaler
import math
class kd_node():
def __init__(self
):
self
.point
=None
self
.value
= None
self
.left
= None
self
.right
=None
self
.father
= None
self
.split
= None
self
.depth
=None
self
.search
= None
self
.left_range
= pd
.DataFrame
()
self
.right_range
=pd
.DataFrame
()
class Kd_tree():
def __init__(self
):
self
.root
= None
self
.k
= None
self
.aa_score
= None
self
.oa_score
= None
self
.x_label
= None
self
.y_label
= None
self
.all_label
= None
self
.range = pd
.DataFrame
()
def fit(self
, x
, y
,k
, kf
=KFold
(10, True, 10)):
data
= pd
.concat
([x
,y
],axis
=1)
self
.x_label
= list(x
.columns
)
self
.y_label
= list(y
.name
)
self
.all_label
= list(pd
.concat
([x
, y
], axis
=1).columns
)
self
.k
= k
self
.range =self
.range.append
(x
.min(), ignore_index
=True)
self
.range = self
.range.append
(x
.max(), ignore_index
=True)
self
.aa_score
= 0
self
.oa_score
= 0
for train_index
, test_index
in kf
.split
(data
):
self
.root
= self
.create_kd_tree
(x
.iloc
[train_index
], y
.iloc
[train_index
],self
.range,None)
prectdict_y
= self
.predict
(x
.loc
[test_index
])
true_y
= list(y
[test_index
])
loss
= 0
for i
in range(len(test_index
)):
if true_y
[i
] != prectdict_y
[i
]:
loss
= loss
+ 1
print(loss
/len(test_index
))
self
.oa_score
=self
.oa_score
+loss
self
.aa_score
= self
.aa_score
+ loss
/len(test_index
)
self
.oa_score
= 1 - self
.oa_score
/len(x
)
self
.aa_score
= 1 - self
.aa_score
/kf
.n_splits
self
.root
= self
.create_kd_tree
(x
, y
, self
.range, None)
def create_kd_tree(self
, x
, y
, range,father
, last_x
=[], depth
=0):
if not x
.empty
:
split_point
, split_col
, left_data
, right_data
= self
.calcu_var
(x
, y
,last_x
)
node
= kd_node
()
node
.point
= split_point
.drop
('y')
node
.value
= split_point
.loc
['y']
node
.split
= split_col
,split_point
[split_col
]
node
.depth
= depth
node
.father
= father
node
.left_range
, node
.right_range
= self
.calcu_range
(split_point
,range)
node
.left
= self
.create_kd_tree
(left_data
[self
.x_label
], left_data
[self
.y_label
],node
.left_range
,node
,split_col
, depth
+ 1)
node
.right
= self
.create_kd_tree
(right_data
[self
.x_label
], right_data
[self
.y_label
],node
.right_range
,node
,split_col
, depth
+ 1)
else:
node
= None
return node
def calcu_var(self
, x
, y
, last_x
):
all_data
= pd
.concat
([x
, y
], axis
=1)
all_label
= self
.all_label
x_label
= self
.x_label
new_label
= [i
for i
in x_label
if i
not in last_x
]
left_data
= pd
.DataFrame
(columns
=all_label
)
right_data
= pd
.DataFrame
(columns
=all_label
)
col
= np
.var
(all_data
[new_label
], axis
=0).idxmax
()
row
= len(x
) // 2
sort_data
= all_data
.sort_values
(by
=col
, ascending
=False)
for i
in range(len(x
)):
if i
== row
:
split_point
= sort_data
.iloc
[i
]
if i
< row
:
right_data
= right_data
.append
(sort_data
.iloc
[[i
]], ignore_index
=True)
if i
> row
:
left_data
= left_data
.append
(sort_data
.iloc
[[i
]], ignore_index
=True)
return split_point
, col
, left_data
, right_data
def calcu_range(self
, split_point
, range):
left_range
= pd
.DataFrame
(columns
=self
.x_label
)
right_range
= pd
.DataFrame
(columns
=self
.x_label
)
left_range
= left_range
.append
(range.iloc
[0])
left_range
= left_range
.append
(split_point
[self
.x_label
])
right_range
= right_range
.append
(split_point
[self
.x_label
])
right_range
= right_range
.append
(range.iloc
[1])
return left_range
, right_range
def predict(self
, x
):
k
= self
.k
x
.reset_index
(drop
=True, inplace
=True)
y
= []
for i
in range(len(x
)):
dt
= pd
.DataFrame
(columns
=['dis', 'label'])
for j
in range(k
):
dt
.loc
[j
] = [float('inf'), 'A']
node_temp
, dt
= self
.down_search
(self
.root
, x
.loc
[i
], k
, dt
)
while(node_temp
!= self
.root
):
dt
, node_temp
= self
.up_search
(x
.loc
[i
], node_temp
.father
,dt
)
scaler
= MinMaxScaler
()
dt_dis
= np
.array
(scaler
.fit_transform
(np
.array
(list(dt
['dis'])).reshape
((-1,1))))
dt_dis
= np
.array
([1 - i
for i
in dt_dis
])
dt_dis_2
= pd
.DataFrame
(columns
=['dis'],data
=dt_dis
)
dt
.drop
('dis', axis
=1, inplace
=True)
dt
=pd
.concat
([dt_dis_2
,dt
],axis
=1)
judge
= dict()
for index
,row
in dt
.iterrows
():
if row
['label'] not in judge
:
judge
[row
['label']] = row
['dis']
else:
judge
[row
['label']] = judge
[row
['label']]+row
['dis']
max_value
= -float('inf')
max_label
= 0
for j
in judge
.items
():
if j
[1]>max_value
:
max_value
= j
[1]
max_label
= j
[0]
y
.append
(max_label
)
return y
def down_search(self
, root
, x
, k
, dt
):
while(True):
root
.search
= 1
vect1
= list(root
.point
)
vect2
= list(x
)
new_dist
= np
.linalg
.norm
(np
.array
(vect1
) - np
.array
(vect2
))
if new_dist
< dt
['dis'].max():
dt
.iloc
[0] = [new_dist
, root
.value
]
dt
= dt
.sort_values
(by
=['dis'], ascending
=False)
if root
.left
== None and root
.right
== None:
break
if x
[root
.split
[0]] <= root
.split
[1] and root
.left
!=None:
root
= root
.left
continue
if x
[root
.split
[0]] > root
.split
[1] and root
.right
!=None:
root
= root
.right
continue
if root
.left
==None:
root
= root
.right
continue
else:
root
= root
.left
continue
return root
, dt
def up_search(self
, x
,node
,dt
):
r
= dt
.iloc
[0]['dis']
node
.search
= 1
vect1
= list(node
.point
)
vect2
= list(x
)
new_dist
= np
.linalg
.norm
(np
.array
(vect1
) - np
.array
(vect2
))
if new_dist
< dt
['dis'].max():
dt
.iloc
[0] = [new_dist
, node
.value
]
dt
= dt
.sort_values
(by
=['dis'], ascending
=False)
if node
.left
!= None:
if node
.left
.search
== None and r
< self
.calcu_dis
(x
,node
.left_range
,r
):
dt
,node
= self
.up_search
(x
, node
.left
, dt
)
if node
.right
!= None:
if node
.right
.search
==None and r
> self
.calcu_dis
(x
,node
.right_range
,r
):
dt
,node
= self
.up_search
(x
, node
.right
, dt
)
return dt
,node
def calcu_dis(self
, x
, the_range
, r
):
temp
= the_range
.values
list_near
= []
list_far
= []
for i
in range(len(self
.x_label
)):
if abs(list(x
)[i
] - temp
[:, i
][0]) <= abs(list(x
)[i
] - temp
[:, i
][1]):
list_near
.append
(temp
[:, i
][0])
list_far
.append
(temp
[:, i
][1])
else:
list_near
.append
(temp
[:, i
][1])
list_far
.append
(temp
[:, i
][0])
A
= list_near
P
= list(x
)
for i
in range(len(self
.x_label
)):
B
= list_near
B
[i
] = list_far
[i
]
AP
= np
.array
(P
) - np
.array
(A
)
AB
= np
.array
(B
) - np
.array
(A
)
AC
= ((np
.dot
(AB
, AP
) + 6e-6)/(np
.linalg
.norm
(AB
) ** 2 + 6e-10))*AB
C
= AC
+ A
BC
= C
- B
cos_seita
= (np
.dot
(AC
, BC
) + 6e-6)/ (np
.linalg
.norm
(AC
) * np
.linalg
.norm
(BC
) + 6e-6)
if cos_seita
> 0:
dis
= np
.linalg
.norm
(AP
)
else:
dis
= np
.linalg
.norm
(AC
)
if dis
< r
:
return 1
return 0