# 通过pymilvus sdk来操作数据库 from pymilvus import ( connections, db, FieldSchema, CollectionSchema, DataType, Collection, utility, ) def test_connection(db_name: str = "blog", collection_name: str = "articles") -> bool: try: # 建立与milvus的连接 connections.connect( "default", host="49.233.56.55", port="19530", timeout=10, db_name=db_name ) # 使用或者切换到db_name db.using_database(db_name) if utility.has_collection(collection_name): print(f"集合{collection_name}已经存在") return True # 定义id字段,类型为长整型,主键且非自增 id_field = FieldSchema( name="id", dtype=DataType.INT64, is_primary=True, auto_id=True, description="文章的ID", ) # 定义title字段,类型为变长字符串 title_field = FieldSchema( name="title", dtype=DataType.VARCHAR, max_length=1024, description="文章标题", ) # 定义title_vector字段,类型为3维浮点向量 title_vector_field = FieldSchema( name="tile_vector", dtype=DataType.FLOAT16_VECTOR, dim=3, description="标题向量", ) # 创建集合schema collectionSchema = CollectionSchema( fields=[id_field, title_field, title_vector_field], description="文章集合" ) Collection(name=collection_name, schema=collectionSchema) print(f"集合{collection_name}创建成功") except Exception as exc: print(f"连接失败,无法连接到Milvus服务器的数据库{collection_name}", exc) return False finally: connections.disconnect("default") if __name__ == "__main__": if not test_connection(): raise SystemExit(1)