向量数据库:AI时代的核心基础设施
向量数据库是存储和检索高维向量数据的专用数据库,是构建RAG、语义搜索、推荐系统等AI应用的关键组件。 本文将全面介绍向量数据库的技术原理和最佳实践。
向量数据库核心概念
什么是向量数据库?
向量数据库专门用于存储和检索向量嵌入(embeddings),这些向量通常由AI模型生成, 代表文本、图像或其他数据的语义特征。
核心特性
- • 高维向量存储:支持数百到数千维的向量
- • 相似度搜索:基于余弦相似度、欧氏距离等
- • 实时索引:支持动态添加和更新向量
- • 可扩展性:处理十亿级向量规模
- • 混合搜索:结合向量和标量过滤
主流向量数据库对比
| 数据库 | 类型 | 性能 | 易用性 | 成本 | 适用场景 |
|---|---|---|---|---|---|
| Pinecone | SaaS | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 💰💰💰 | 快速原型、中小规模 |
| Milvus | 开源 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐ | 💰 | 大规模生产环境 |
| Weaviate | 开源+云 | ⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 💰💰 | 全栈AI应用 |
| Qdrant | 开源+云 | ⭐⭐⭐⭐⭐ | ⭐⭐⭐⭐ | 💰💰 | 高性能需求 |
| Chroma | 开源 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ | 💰 | 开发测试、小型项目 |
向量索引算法
高效检索的核心技术
HNSW (分层导航小世界)
- • 构建多层图结构
- • 查询时间复杂度 O(log n)
- • 高召回率 (> 95%)
- • 内存消耗较大
IVF (倒排文件索引)
- • 向量空间聚类
- • 支持GPU加速
- • 内存效率高
- • 适合大规模数据
LSH (局部敏感哈希)
- • 概率性算法
- • 极快的查询速度
- • 准确率相对较低
- • 适合近似搜索
Annoy (Spotify开发)
- • 基于随机投影树
- • 内存映射文件
- • 静态索引
- • 适合只读场景
实战:使用不同向量数据库
Pinecone示例
import pinecone
from openai import OpenAI
# 初始化
pinecone.init(api_key="your-pinecone-key")
index = pinecone.Index("my-index")
openai_client = OpenAI(api_key="your-openai-key")
# 创建向量
def create_embedding(text):
response = openai_client.embeddings.create(
model="text-embedding-ada-002",
input=text
)
return response.data[0].embedding
# 插入数据
docs = [
{"id": "doc1", "text": "Python是一种编程语言"},
{"id": "doc2", "text": "JavaScript用于Web开发"}
]
for doc in docs:
embedding = create_embedding(doc["text"])
index.upsert([(doc["id"], embedding, {"text": doc["text"]})])
# 查询
query = "什么是Python?"
query_embedding = create_embedding(query)
results = index.query(
vector=query_embedding,
top_k=5,
include_metadata=True
)
for match in results.matches:
print(f"ID: {match.id}, Score: {match.score}")
print(f"Text: {match.metadata['text']}")Milvus示例
from pymilvus import connections, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np
# 连接Milvus
connections.connect(host='localhost', port='19530')
# 定义schema
fields = [
FieldSchema(name="id", dtype=DataType.INT64, is_primary=True),
FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=1536),
FieldSchema(name="text", dtype=DataType.VARCHAR, max_length=5000)
]
schema = CollectionSchema(fields, description="Document embeddings")
# 创建集合
collection = Collection(name="documents", schema=schema)
# 创建索引
index_params = {
"metric_type": "L2",
"index_type": "IVF_FLAT",
"params": {"nlist": 1024}
}
collection.create_index(field_name="embedding", index_params=index_params)
# 插入数据
entities = [
[1, 2, 3], # IDs
[np.random.rand(1536).tolist() for _ in range(3)], # embeddings
["text1", "text2", "text3"] # texts
]
collection.insert(entities)
# 加载到内存
collection.load()
# 搜索
search_params = {"metric_type": "L2", "params": {"nprobe": 10}}
results = collection.search(
data=[query_embedding],
anns_field="embedding",
param=search_params,
limit=5,
output_fields=["text"]
)
for hit in results[0]:
print(f"ID: {hit.id}, Distance: {hit.distance}")
print(f"Text: {hit.entity.get('text')}")Chroma示例(轻量级)
import chromadb
from chromadb.utils import embedding_functions
# 初始化
client = chromadb.PersistentClient(path="/path/to/db")
embedding_fn = embedding_functions.OpenAIEmbeddingFunction(
api_key="your-key",
model_name="text-embedding-ada-002"
)
# 创建或获取集合
collection = client.get_or_create_collection(
name="my_collection",
embedding_function=embedding_fn
)
# 添加文档
collection.add(
documents=["Python编程", "Java开发", "数据分析"],
metadatas=[
{"source": "doc1", "type": "programming"},
{"source": "doc2", "type": "programming"},
{"source": "doc3", "type": "data"}
],
ids=["id1", "id2", "id3"]
)
# 查询
results = collection.query(
query_texts=["编程语言"],
n_results=2,
where={"type": "programming"} # 元数据过滤
)
print(f"Documents: {results['documents']}")
print(f"Distances: {results['distances']}")向量数据库选型指南
如何选择合适的向量数据库?
场景1:快速原型开发
需求:简单易用,快速上手
推荐:Chroma(本地开发)或 Pinecone(云服务)
场景2:生产环境部署
需求:高性能,可扩展,稳定性
推荐:Milvus(自建)或 Pinecone Cloud(托管)
场景3:混合搜索需求
需求:向量搜索 + 全文搜索 + 过滤
推荐:Weaviate 或 Elasticsearch with vector search
场景4:成本敏感
需求:开源免费,资源占用少
推荐:Chroma、Qdrant 或 pgvector(PostgreSQL扩展)
性能优化技巧
提升向量检索性能
索引优化
- ✅ 选择合适的索引类型
- ✅ 调整索引参数(nlist, nprobe)
- ✅ 定期重建索引
- ✅ 使用量化技术减少内存
查询优化
- ✅ 批量查询减少开销
- ✅ 预过滤减少搜索空间
- ✅ 缓存热点查询结果
- ✅ 异步并发查询
数据优化
- ✅ 向量维度降维(PCA)
- ✅ 数据分片和分区
- ✅ 定期清理无效数据
- ✅ 使用二进制量化
系统优化
- ✅ GPU加速(如适用)
- ✅ 内存和缓存配置
- ✅ 负载均衡和副本
- ✅ 监控和告警设置
向量数据库与LLM集成
构建完整的AI应用栈
class VectorRAGSystem:
"""向量数据库 + LLM的完整RAG系统"""
def __init__(self, vector_db, llm_client):
self.vector_db = vector_db
self.llm = llm_client
def add_knowledge(self, documents):
"""添加知识到向量数据库"""
for doc in documents:
# 生成向量
embedding = self.llm.create_embedding(doc.text)
# 存储到向量数据库
self.vector_db.insert({
'id': doc.id,
'vector': embedding,
'metadata': {
'text': doc.text,
'source': doc.source,
'timestamp': doc.timestamp
}
})
def answer_question(self, question):
"""基于向量检索回答问题"""
# 1. 问题向量化
question_vector = self.llm.create_embedding(question)
# 2. 向量检索
results = self.vector_db.search(
vector=question_vector,
limit=5
)
# 3. 构建上下文
context = "\n".join([r.metadata['text'] for r in results])
# 4. LLM生成答案
prompt = f"""
基于以下信息回答问题:
信息:{context}
问题:{question}
"""
answer = self.llm.generate(prompt)
return {
'answer': answer,
'sources': [r.metadata['source'] for r in results]
}