feat: - vector
This commit is contained in:
+149
@@ -0,0 +1,149 @@
|
||||
# 导入默认字典 defaultdict,便于自动初始化桶结构
|
||||
from collections import defaultdict
|
||||
|
||||
# 导入数学库,提供 sqrt、pow 等基础数学函数
|
||||
import math
|
||||
|
||||
# 导入随机库,支持随机抽样与随机数生成
|
||||
import random
|
||||
|
||||
|
||||
# 计算两个向量之间的欧氏距离(L2 距离)
|
||||
def l2_distance(a, b):
|
||||
# zip 将两个向量按维度配对,平方差求和后开平方
|
||||
return math.sqrt(sum((x - y) ** 2 for x, y in zip(a, b)))
|
||||
|
||||
|
||||
# K-Means++ 初始化:挑选 k 个初始中心,并做一次分配+更新
|
||||
def kmeans(vectors, k):
|
||||
# 第一个中心直接从所有样本中随机挑选
|
||||
centers = [random.choice(vectors)]
|
||||
|
||||
# 解释:下面这段 while 是 K-Means++ 的“轮盘赌”抽样逻辑
|
||||
# 步骤:计算每个点到最近中心的距离平方 -> 得到概率权重 -> 依概率抽样新的中心
|
||||
|
||||
# 当中心数量不足 k 个时,持续抽样新的中心
|
||||
while len(centers) < k:
|
||||
# 第一步:每个数据点距离最近中心的平方,作为该点的权重
|
||||
dists = [min(l2_distance(v, c) ** 2 for c in centers) for v in vectors]
|
||||
# 第二步:对所有权重求和,得到轮盘赌的总长度
|
||||
total = sum(dists)
|
||||
|
||||
# 第三步:在 [0, total) 区间上随机抽一个阈值 r,相当于轮盘指针
|
||||
r = random.random() * total
|
||||
|
||||
# prefix 累加当前遍历到的权重,用于定位 r 落在哪个区间
|
||||
prefix = 0.0
|
||||
|
||||
# 遍历所有点和对应权重,寻找累积首次超过 r 的那个点
|
||||
for v, d in zip(vectors, dists):
|
||||
# 不断累加权重,等价于在“条带”上向前走
|
||||
prefix += d
|
||||
# 一旦累计值覆盖了随机阈值 r,就把该点选为新中心
|
||||
if prefix >= r:
|
||||
centers.append(v)
|
||||
break
|
||||
|
||||
# 初始化一个“簇字典”,key 是中心索引,value 是该中心下的样本列表
|
||||
clusters = {i: [] for i in range(k)}
|
||||
|
||||
# 遍历所有向量,找到距离最近的中心,将该向量归入对应簇
|
||||
for v in vectors:
|
||||
idx = min(range(k), key=lambda i: l2_distance(v, centers[i]))
|
||||
clusters[idx].append(v)
|
||||
# 对每个非空簇计算均值,作为中心的最新位置
|
||||
for i, points in clusters.items():
|
||||
if points:
|
||||
# zip(*points) 将点按维度拆开,sum(dim)/len(points) 是维度均值
|
||||
centers[i] = [sum(dim) / len(points) for dim in zip(*points)]
|
||||
# 返回完成一次“初始化+更新”的中心列表
|
||||
return centers
|
||||
|
||||
|
||||
# 构建 IVF-Flat 索引:输出中心列表和倒排桶
|
||||
def build_ivf_flat(vectors, nlist):
|
||||
# 先通过 K-Means++ 得到 nlist 个中心
|
||||
centers = kmeans(vectors, nlist)
|
||||
|
||||
# defaultdict(list) 便于向桶里 append,无需检查 key 是否存在
|
||||
buckets = defaultdict(list)
|
||||
|
||||
# 遍历所有向量,依据最近中心编号将其放入对应桶
|
||||
for vec in vectors:
|
||||
idx = min(range(nlist), key=lambda i: l2_distance(vec, centers[i]))
|
||||
buckets[idx].append(vec)
|
||||
|
||||
# 返回构建好的中心列表与倒排结构
|
||||
return centers, buckets
|
||||
|
||||
|
||||
# 在 IVF-Flat 索引上执行查询,近似返回 topk 个最近向量
|
||||
def search_ivf_flat(query, centers, buckets, nprobe, topk):
|
||||
# 先按查询向量到各中心的距离排序,得到“探查顺序”
|
||||
probe_order = sorted(
|
||||
range(len(centers)), key=lambda i: l2_distance(query, centers[i])
|
||||
)
|
||||
print(probe_order)
|
||||
print(buckets)
|
||||
|
||||
# 用列表保存候选 (距离, 向量) 对
|
||||
candidate_pairs = []
|
||||
|
||||
# 只深入距离最近的 nprobe 个中心对应的桶
|
||||
for idx in probe_order[:nprobe]:
|
||||
for vec in buckets[idx]:
|
||||
candidate_pairs.append((l2_distance(query, vec), vec))
|
||||
|
||||
# 候选列表按距离升序排列,便于截取前 topk 个
|
||||
candidate_pairs.sort(key=lambda x: x[0])
|
||||
|
||||
print(candidate_pairs)
|
||||
|
||||
# 返回最接近查询的 topk 个候选
|
||||
return candidate_pairs[:topk]
|
||||
|
||||
|
||||
# 主程序:演示 IVF-Flat 的构建与查询流程
|
||||
def main():
|
||||
# 设定随机种子,保证示例结果可复现
|
||||
random.seed(0)
|
||||
|
||||
# 构造 8 个二维向量作为“数据库”
|
||||
database = [
|
||||
[0.1, 0.2],
|
||||
[0.15, 0.18],
|
||||
[0.8, 0.85],
|
||||
[0.82, 0.79],
|
||||
[0.4, 0.4],
|
||||
[0.42, 0.45],
|
||||
[0.9, 0.1],
|
||||
[0.88, 0.15],
|
||||
]
|
||||
|
||||
# 查询向量:我们要找与它相似的数据库向量
|
||||
query = [0.12, 0.22]
|
||||
|
||||
# nlist 表示建索引时划分的中心(簇)个数
|
||||
nlist = 4
|
||||
|
||||
# nprobe 表示查询时会探查多少个最近的中心
|
||||
nprobe = 2
|
||||
|
||||
# 构建 IVF-Flat 索引,得到中心与倒排桶
|
||||
centers, buckets = build_ivf_flat(database, nlist=nlist)
|
||||
|
||||
# 执行检索,返回距离最近的 top3 个结果
|
||||
results = search_ivf_flat(query, centers, buckets, nprobe=nprobe, topk=3)
|
||||
|
||||
# 打印本次检索的参数设置
|
||||
print(f"nlist={nlist}, nprobe={nprobe}")
|
||||
|
||||
# 打印检索到的候选向量及其距离
|
||||
print("最近向量:")
|
||||
for dist, vec in results:
|
||||
print(f"向量={vec}, 距离={dist:.4f}")
|
||||
|
||||
|
||||
# Python 入口:若当前脚本以 main 方式执行,则运行示例
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user