Qdrant Comprehensive Primer
The Ultimate Guide to Vector Similarity Search & AI Applications
Table of Contents
- Introduction & Philosophy
- Installation & Setup
- Core Concepts
- Collection Management
- Vector Operations
- Advanced Search Features
- Filtering & Querying
- Performance Optimization
- Distributed Deployment
- Client Libraries
- Integration Patterns
- Monitoring & Observability
- Security Best Practices
- Real-World Applications
- Troubleshooting
- Best Practices
Introduction & Philosophy
What is Qdrant?
Qdrant (pronounced "Quadrant") is an open-source vector database and similarity search engine designed specifically for AI applications. Written in Rust for performance and reliability, Qdrant provides production-ready vector similarity search with advanced filtering capabilities.
Core Philosophy
"Accuracy, Speed, and Simplicity" - Qdrant embodies the principle that vector search should be both powerful and accessible. It's designed to:
- Deliver accurate results through advanced quantization and indexing techniques
- Maintain high performance even with billions of vectors and complex filters
- Provide developer-friendly APIs that integrate seamlessly with AI workflows
- Scale horizontally across multiple nodes without sacrificing search quality
- Support rich metadata alongside vectors for context-aware applications
Key Differentiators
- Filtered Search: Advanced filtering capabilities without sacrificing performance
- Quantization Support: Multiple quantization options for memory efficiency
- On-disk Storage: Efficient memory usage with optional RAM acceleration
- Distributed Architecture: Built-in sharding and replication
- Rich Client SDKs: Support for Python, TypeScript/JavaScript, Go, Rust, and more
Architecture Overview
Qdrant uses a sophisticated architecture that combines:
- HNSW (Hierarchical Navigable Small World) indices for fast approximate nearest neighbor search
- Payload Storage: Flexible JSON document storage alongside vectors
- Quantization: Multiple compression options (Scalar, Product, Binary)
- Distributed System: Automatic sharding with consensus-based replication
Installation & Setup
Local Installation
Docker (Recommended)
# Pull the latest Qdrant image
docker pull qdrant/qdrant
# Run Qdrant server
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
# Or with custom configuration
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/config.yaml:/qdrant/config/production.yaml \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant
Python Client
# Install the Python client
pip install qdrant-client
# Quick start with in-memory storage
from qdrant_client import QdrantClient
# In-memory for testing
client = QdrantClient(":memory:")
# Connect to local server
client = QdrantClient(host="localhost", port=6333)
Binary Installation
# Download binary (Linux/macOS)
curl -L https://github.com/qdrant/qdrant/releases/latest/download/qdrant-linux-x86_64 -o qdrant
chmod +x qdrant
# Run Qdrant
./qdrant
# Or as a service
sudo ./qdrant service --config-path ./config/config.yaml
Cloud Installation
Qdrant Cloud
# Install Qdrant Cloud CLI
pip install qdrant-cloud
# Log in to Qdrant Cloud
qdrant-cloud login
# Create a new cluster
qdrant-cloud cluster create my-cluster --plan starter
Kubernetes
# qdrant-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: qdrant
spec:
replicas: 1
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
- containerPort: 6334
volumeMounts:
- name: storage
mountPath: /qdrant/storage
volumes:
- name: storage
persistentVolumeClaim:
claimName: qdrant-storage
Configuration
Basic Configuration (config.yaml)
# Qdrant configuration file
service:
host: '0.0.0.0' # Listen on all interfaces
http_port: 6333
grpc_port: 6334
max_request_size_mb: 32
enable_cors: true
# Storage configuration
storage:
storage_path: './storage'
# Snapshots configuration
snapshots_path: './snapshots'
# Performance tuning
performance:
max_search_threads: 0 # Auto-detect
max_workers: 0 # Equal to CPU cores
# Cluster configuration (for distributed mode)
cluster:
enabled: false
p2p:
port: 6335
consensus:
tick_period_ms: 100
Environment Variables
# Set configuration via environment variables
export QDRANT_SERVICE__HTTP_PORT=8080
export QDRANT_SERVICE__ENABLE_CORS=true
export QDRANT_STORAGE__STORAGE_PATH=/data/qdrant
# Run with environment overrides
qdrant
Core Concepts
Vectors & Embeddings
from qdrant_client import QdrantClient
import numpy as np
# Create sample vectors (embeddings)
dimensions = 384 # Common for text embeddings
vectors = [
np.random.rand(dimensions).astype(np.float32).tolist()
for _ in range(100)
]
# Vectors can be dense (float arrays) or sparse (key-value pairs)
dense_vector = [0.1, 0.2, 0.3, ..., 0.384]
sparse_vector = {
"indices": [0, 5, 10, 15],
"values": [0.8, 0.6, 0.9, 0.7]
}
Collections
from qdrant_client.models import Distance, VectorParams, CollectionInfo
# Define collection configuration
vector_params = VectorParams(
size=384, # Vector dimensions
distance=Distance.COSINE # Distance metric
)
# Create collection
client.create_collection(
collection_name="documents",
vectors_config=vector_params
)
# Get collection info
collection_info = client.get_collection("documents")
print(collection_info)
Distance Metrics
from qdrant_client.models import Distance
# Available distance metrics
distances = {
Distance.COSINE: "Cosine similarity",
Distance.EUCLID: "Euclidean distance",
Distance.DOT: "Dot product",
Distance.MANHATTAN: "Manhattan distance"
}
# Choose appropriate distance based on your use case
# - COSINE: Text embeddings, normalized vectors
# - EUCLID: General purpose, unnormalized vectors
# - DOT: Normalized embeddings (same as COSINE for unit vectors)
Payloads
# Payloads are JSON documents stored with vectors
payload = [
{
"id": "doc1",
"title": "Introduction to Vector Databases",
"content": "Vector databases are specialized for...",
"category": "database",
"tags": ["vector", "embedding", "similarity"],
"timestamp": "2024-01-01T00:00:00Z",
"metadata": {
"author": "John Doe",
"word_count": 1500
}
},
{
"id": "doc2",
"title": "Machine Learning Basics",
"content": "Machine learning algorithms learn...",
"category": "ml",
"tags": ["ml", "ai", "algorithms"],
"timestamp": "2024-01-02T00:00:00Z"
}
]
# Payloads can include any JSON-serializable data
# - Text fields
# - Numbers
# - Booleans
# - Arrays
# - Nested objects
Points
from qdrant_client.models import PointStruct
# A point combines vector, payload, and ID
points = [
PointStruct(
id=1,
vector=[0.1, 0.2, 0.3, ...],
payload={
"title": "Document 1",
"category": "tech"
}
),
PointStruct(
id=2,
vector=[0.4, 0.5, 0.6, ...],
payload={
"title": "Document 2",
"category": "science"
}
)
]
# IDs can be integers or UUID strings
point_with_uuid_id = PointStruct(
id="550e8400-e29b-41d4-a716-446655440000",
vector=[0.7, 0.8, 0.9, ...],
payload={"title": "UUID Document"}
)
Collection Management
Creating Collections
from qdrant_client.models import VectorParams, Distance, CollectionConfig
# Basic collection creation
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
# Advanced configuration with multiple vector types
from qdrant_client.models import VectorParams, VectorStructure
# Multiple named vectors (multi-modal)
vectors_config = VectorStructure(
{
"text": VectorParams(size=384, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID),
"metadata": VectorParams(size=64, distance=Distance.DOT)
}
)
client.create_collection(
collection_name="multi_modal",
vectors_config=vectors_config
)
# With on-disk storage and quantization
from qdrant_client.models import QuantizationConfig, ScalarQuantization
quantization_config = QuantizationConfig(
scalar=ScalarQuantization(
type="scalar",
quantile=0.99, # Quantile for bucket boundaries
always_ram=False # Keep quantized data on disk
)
)
client.create_collection(
collection_name="optimized",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config=quantization_config,
on_disk=True # Store vectors on disk
)
Collection Configuration
# Update collection settings
from qdrant_client.models import OptimizersConfig
client.update_collection(
collection_name="documents",
optimizer_config=OptimizersConfig(
indexing_threshold=10000, # Build index after this many points
flush_interval_sec=5, # Flush to disk every 5 seconds
max_segment_size=100000, # Maximum segment size
memmap_threshold=50000 # Use memory mapping for segments larger than this
)
)
# Configure HNSW parameters
from qdrant_client.models import HnswConfigDiff
client.update_collection(
collection_name="documents",
hnsw_config=HnswConfigDiff(
m=16, # Number of connections per node
ef_construct=100, # Size of dynamic list during construction
full_scan_threshold=10000 # Use brute force for smaller collections
)
)
Collection Operations
from qdrant_client.models import Filter
# List all collections
collections = client.get_collections()
print(collections)
# Get collection details
collection_info = client.get_collection("documents")
print(f"Vectors count: {collection_info.points_count}")
print(f"Status: {collection_info.status}")
print(f"Vectors configuration: {collection_info.config.params.vectors}")
# Delete collection
client.delete_collection("documents")
# Collection aliases
client.create_collection(
collection_name="documents_v2",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)
# Create alias
client.update_collection_aliases(
change_aliases_operations=[
{
"create_alias": {
"collection_name": "documents_v2",
"alias_name": "documents"
}
}
]
)
# Now you can use "documents" alias
# to refer to "documents_v2" collection
Snapshots & Backups
# Create snapshot
snapshot_info = client.create_snapshot("documents")
print(f"Snapshot created: {snapshot_info.name}")
# List snapshots
snapshots = client.list_snapshots("documents")
for snapshot in snapshots:
print(f"Snapshot: {snapshot.name}, Size: {snapshot.size} bytes")
# Restore from snapshot
client.restore_snapshot("documents", snapshot_path)
# Create full backup (all collections)
backup_info = client.create_full_backup()
print(f"Backup created: {backup_info.name}")
# Schedule automatic snapshots
# This is typically done through configuration or cron job
Vector Operations
Inserting Vectors
from qdrant_client.models import PointStruct
import uuid
# Insert single point
point = PointStruct(
id=str(uuid.uuid4()), # Use UUID for better distribution
vector=[0.1, 0.2, 0.3, ...], # Your embedding
payload={
"title": "New Document",
"content": "This is a new document...",
"category": "technology",
"timestamp": datetime.datetime.utcnow().isoformat()
}
}
client.upsert(
collection_name="documents",
points=[point]
)
# Batch insert for better performance
points = []
for i in range(1000):
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=np.random.rand(384).astype(np.float32).tolist(),
payload={
"id": f"doc_{i}",
"title": f"Document {i}",
"category": np.random.choice(["tech", "science", "business"]),
"score": np.random.random()
}
))
# Insert in batches
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(
collection_name="documents",
points=batch
)
Updating Vectors
# Update existing point
client.upsert(
collection_name="documents",
points=[PointStruct(
id="existing_point_id",
vector=[0.9, 0.8, 0.7, ...], # New vector
payload={
"title": "Updated Document",
"version": 2
}
)]
)
# Update only payload (keep vector unchanged)
client.set_payload(
collection_name="documents",
payload={
"last_updated": datetime.datetime.utcnow().isoformat(),
"status": "processed"
},
points=["doc_1", "doc_2", "doc_3"]
)
# Update specific payload fields
client.set_payload(
collection_name="documents",
payload={
"metadata.author": "New Author" # Nested update
},
points=["doc_1"]
)
# Delete payload fields
client.delete_payload(
collection_name="documents",
keys=["temp_field", "old_metadata"],
points=["doc_1", "doc_2"]
)
Deleting Vectors
# Delete by IDs
client.delete(
collection_name="documents",
points_selector=["doc_1", "doc_2", "doc_3"]
)
# Delete by filter
from qdrant_client.models import Filter
client.delete(
collection_name="documents",
points_selector=Filter(
must=[
{
"key": "category",
"match": {"value": "deprecated"}
}
]
)
)
# Clear entire collection
client.delete(
collection_name="documents",
points_selector=Filter(
must=[{"key": "category", "match": {"value": "any"}}]
)
)
Retrieving Vectors
# Retrieve specific points
points = client.retrieve(
collection_name="documents",
ids=["doc_1", "doc_2", "doc_3"],
with_payload=True,
with_vectors=False # Don't need vectors, just metadata
)
for point in points:
print(f"ID: {point.id}")
print(f"Title: {point.payload.get('title')}")
print(f"Category: {point.payload.get('category')}")
# Retrieve with specific payload fields
points = client.retrieve(
collection_name="documents",
ids=["doc_1"],
with_payload=["title", "category"], # Only these fields
with_vectors=True
)
# Scroll through collection (pagination)
from qdrant_client.models import ScrollRequest
scroll_result = client.scroll(
collection_name="documents",
scroll_request=ScrollRequest(
limit=10,
with_payload=True,
with_vectors=False,
filter=Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
}
]
)
)
)
points = scroll_result[0]
next_page_offset = scroll_result[1]
print(f"Retrieved {len(points)} points")
Advanced Search Features
Basic Similarity Search
from qdrant_client.models import SearchRequest
# Simple similarity search
query_vector = [0.1, 0.2, 0.3, ...] # Your query embedding
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10, # Return top 10 results
with_payload=True,
with_vectors=False
)
for hit in search_result:
print(f"ID: {hit.id}")
print(f"Score: {hit.score}")
print(f"Title: {hit.payload.get('title')}")
print(f"Category: {hit.payload.get('category')}")
print("---")
Multi-Vector Search
# Search with multiple query vectors
query_vectors = {
"text": text_embedding,
"image": image_embedding
}
# Search with weights for different vector types
search_result = client.search(
collection_name="multi_modal",
query_vector=query_vectors,
limit=10,
with_payload=True
)
# Or with explicit weights
from qdrant_client.models import QueryVector
search_result = client.search(
collection_name="multi_modal",
query_vector=[
QueryVector(
vector=text_embedding,
name="text",
weight=0.7 # 70% weight for text
),
QueryVector(
vector=image_embedding,
name="image",
weight=0.3 # 30% weight for image
)
],
limit=10
)
Hybrid Search
# Combine vector similarity with keyword search
from qdrant_client.models import Filter, SearchParams
# Search with filter
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "timestamp",
"range": {
"gte": "2024-01-01T00:00:00Z",
"lte": "2024-12-31T23:59:59Z"
}
}
]
),
limit=10,
search_params=SearchParams(
hnsw_ef=128, # Search accuracy vs speed
exact=False, # Use approximate search
quantization=None # Don't use quantization for this search
)
)
# Search with scoring based on payload
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
with_payload=True,
score_threshold=0.5, # Minimum similarity score
payload_filter=Filter(
should=[
{
"key": "tags",
"match": {"value": "featured"}
},
{
"key": "score",
"range": {"gte": 0.8}
}
]
)
)
Recommendation Search
# Find similar items to a set of positive examples
positive_ids = ["doc_1", "doc_5", "doc_10"]
negative_ids = ["doc_2"] # Optional negative examples
recommend_result = client.recommend(
collection_name="documents",
positive=positive_ids,
negative=negative_ids,
limit=10,
with_payload=True,
strategy="average_vector" # or "best_score"
)
# Find similar to specific query with context
context_result = client.discover(
collection_name="documents",
target=target_embedding, # What to find similar to
context=[
ContextExamplePair(
positive=context_positive_embedding,
negative=context_negative_embedding
)
],
limit=10
)
Grouping Results
# Group results by a field
from qdrant_client.models import GroupRequest
groups = client.search_groups(
collection_name="documents",
query_vector=query_vector,
group_by="category", # Group by category
limit=10, # Total groups to return
group_size=3, # Items per group
with_payload=True
)
for group in groups:
print(f"Group: {group.id}")
print(f"Hits: {len(group.hits)}")
for hit in group.hits:
print(f" - {hit.payload.get('title')}")
print()
Filtering & Querying
Basic Filters
from qdrant_client.models import Filter
# Exact match
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
}
]
)
# Multiple conditions (AND)
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "published",
"match": {"value": True}
}
]
)
# OR conditions
filter = Filter(
should=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "category",
"match": {"value": "science"}
}
]
)
# NOT conditions
filter = Filter(
must_not=[
{
"key": "status",
"match": {"value": "archived"}
}
]
)
Advanced Filters
# Range filters
filter = Filter(
must=[
{
"key": "score",
"range": {
"gte": 0.5,
"lte": 1.0
}
},
{
"key": "timestamp",
"range": {
"gte": "2024-01-01T00:00:00Z",
"lte": "2024-12-31T23:59:59Z"
}
}
]
)
# Nested field filters
filter = Filter(
must=[
{
"key": "metadata.author",
"match": {"value": "John Doe"}
},
{
"key": "metadata.word_count",
"range": {"gte": 1000}
}
]
)
# Array operations
filter = Filter(
must=[
{
"key": "tags",
"match": {"value": "featured"} # Value exists in array
},
{
"key": "tags",
"match": {"value": "technology"}
}
]
)
# Array contains all values
filter = Filter(
must=[
{
"key": "required_skills",
"match": {"value": "python"}
},
{
"key": "required_skills",
"match": {"value": "machine_learning"}
}
]
)
# Nested array filters
filter = Filter(
must=[
{
"key": "reviews.rating",
"range": {"gte": 4}
},
{
"key": "reviews.text",
"match": {"value": "excellent"}
}
]
)
Full-Text Search
# Text match with Qdrant's full-text search
filter = Filter(
must=[
{
"key": "content",
"match": {"text": "vector database"} # Full-text search
}
]
)
# Text search with fuzziness
filter = Filter(
must=[
{
"key": "title",
"match": {
"text": "vectr databse", # Typos allowed
"fuzziness": "auto" # Auto-detect fuzziness level
}
}
]
)
# Combine with boosting
from qdrant_client.models import QueryVectorBoost
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
{
"key": "content",
"match": {"text": "machine learning"}
}
]
),
limit=10,
query_vector_boosts=[
QueryVectorBoost(
vector=text_embedding,
name="text",
weight=0.7
),
QueryVectorBoost(
vector=keyword_boost_embedding,
name="keyword",
weight=0.3
)
]
)
Geo Filters
# Point within radius
filter = Filter(
must=[
{
"key": "location",
"geo_radius": {
"center": {
"lon": -73.9874,
"lat": 40.7589
},
"radius": 1000 # meters
}
}
]
)
# Point within bounding box
filter = Filter(
must=[
{
"key": "location",
"geo_bounding_box": {
"top_right": {"lon": -73.95, "lat": 40.78},
"bottom_left": {"lon": -74.02, "lat": 40.73}
}
}
]
)
# Complex geo filters
filter = Filter(
must=[
{
"key": "location",
"geo_polygon": {
"exterior": [
{"lon": -73.95, "lat": 40.78},
{"lon": -73.95, "lat": 40.73},
{"lon": -74.02, "lat": 40.73},
{"lon": -74.02, "lat": 40.78},
{"lon": -73.95, "lat": 40.78}
]
}
}
]
)
Performance Optimization
Quantization
from qdrant_client.models import QuantizationConfig, ScalarQuantization, ProductQuantization
# Scalar quantization (uniform)
scalar_config = QuantizationConfig(
scalar=ScalarQuantization(
type="scalar",
quantile=0.99, # Use 99th percentile for quantization
always_ram=False # Store quantized data on disk
)
)
# Product quantization (better compression)
product_config = QuantizationConfig(
product=ProductQuantization(
type="product",
compression="x32", # 32x compression
always_ram=False
)
)
# Binary quantization (fastest)
binary_config = QuantizationConfig(
binary=BinaryQuantization(
type="binary",
always_ram=False
)
)
# Apply quantization to collection
client.update_collection(
collection_name="documents",
quantization_config=scalar_config
)
# Create collection with quantization
client.create_collection(
collection_name="optimized_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config=scalar_config,
on_disk=True
)
Indexing Strategies
from qdrant_client.models import HnswConfigDiff, OptimizersConfig
# Configure HNSW for optimal performance
hnsw_config = HnswConfigDiff(
m=16, # Number of connections per node (16-64)
ef_construct=100, # Size of dynamic list during construction
full_scan_threshold=10000, # Use brute force for smaller collections
max_indexing_threads=0, # Use all available threads
on_disk=False # Keep index in RAM for faster search
)
optimizer_config = OptimizersConfig(
indexing_threshold=10000, # Build index after this many points
flush_interval_sec=5, # Flush to disk every 5 seconds
max_segment_size=100000, # Maximum segment size
memmap_threshold=50000 # Use memory mapping for larger segments
)
client.update_collection(
collection_name="documents",
hnsw_config=hnsw_config,
optimizer_config=optimizer_config
)
Memory Management
# Configure memory limits
# This is typically done in config.yaml
"""
storage:
storage_path: "./storage"
max_segment_size: 1000000 # 1M points per segment
vectors_data_threshold: 100000 # Use disk for larger segments
cache_vector_size: 1000 # Cache size for recently accessed vectors
"""
# Use on-disk storage for large collections
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
on_disk=True, # Store vectors on disk
quantization_config=QuantizationConfig(
scalar=ScalarQuantization(type="scalar", always_ram=False)
)
)
# Optimize payload storage
# Store only necessary fields in payload
# Use external storage for large content
"""
payload_example = {
"id": "doc_1",
"title": "Document Title",
"summary": "Brief summary...", # Keep summary in payload
"content_url": "s3://bucket/documents/doc_1.txt", # Store large content externally
"metadata": {
"size": 1024,
"format": "txt"
}
}
"""
Query Optimization
from qdrant_client.models import SearchParams, SamplingStrategy
# Use appropriate search parameters
search_params = SearchParams(
hnsw_ef=128, # Balance between accuracy and speed
exact=False, # Use approximate search
quantization=None, # Don't use quantization for critical searches
indexing_threshold=10000
)
# Use sampling for very large collections
sampling = SamplingStrategy(
strategy="random",
rate=0.1 # Sample 10% of collection
)
# Optimized search with early stopping
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
search_params=search_params,
timeout=5.0, # 5 second timeout
score_threshold=0.3, # Skip low-scoring results
with_payload=False # Don't return payload if not needed
)
# Batch searches for better throughput
batch_queries = [query1, query2, query3, ...]
batch_results = client.search_batch(
collection_name="documents",
requests=[
SearchRequest(
vector=q,
limit=10,
with_payload=True
)
for q in batch_queries
]
)
Distributed Deployment
Cluster Setup
# config.yaml for cluster mode
service:
host: '0.0.0.0'
http_port: 6333
grpc_port: 6334
cluster:
enabled: true
p2p:
port: 6335
consensus:
raft_tick_period_ms: 100
peers:
- "qdrant1:6335"
- "qdrant2:6335"
- "qdrant3:6335"
storage:
storage_path: "/qdrant/storage"
# Connect to cluster
from qdrant_client import QdrantClient
# Connect to specific node
client = QdrantClient(host="qdrant1", port=6333)
# Or use all nodes for load balancing
client = QdrantClient(
hosts=["qdrant1", "qdrant2", "qdrant3"],
port=6333,
https=False
)
Sharding & Replication
# Create collection with sharding
from qdrant_client.models import CreateCollection, ShardingMethod
client.create_collection(
collection_name="distributed_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
sharding_method=ShardingMethod.AUTO, # or CUSTOM
replication_factor=2, # Each shard has 2 replicas
write_consistency_factor=2, # Write must succeed on 2 replicas
# shard_number=4 # Number of shards (optional, auto by default)
)
# Manual sharding
client.update_collection(
collection_name="distributed_docs",
shard_key=0 # Specify shard for operation
)
# Get cluster information
cluster_info = client.get_cluster_info()
print(f"Number of peers: {len(cluster_info.peers)}")
print(f"Collection shards: {cluster_info.collection_shards}")
Load Balancing
# Configure client-side load balancing
from qdrant_client import QdrantClient
client = QdrantClient(
hosts=["node1", "node2", "node3"],
port=6333,
https=False,
timeout=10.0,
prefer_grpc=True # Use gRPC for better performance
)
# The client will automatically:
# - Retry failed requests
# - Distribute read operations
# - Handle node failures
# Health check
try:
cluster_info = client.get_cluster_info()
print("Cluster is healthy")
except Exception as e:
print(f"Cluster issue: {e}")
Scaling Strategies
# Horizontal scaling with auto-sharding
client.create_collection(
collection_name="scalable_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
sharding_method=ShardingMethod.AUTO,
replication_factor=1, # Start with 1 replica
# Qdrant will automatically add shards as needed
)
# Vertical scaling within nodes
# Monitor resource usage and adjust:
"""
- Increase RAM for larger in-memory indices
- Use SSD for storage_path
- Add more CPU cores for indexing
- Adjust hnsw_ef for search quality
"""
# Hybrid approach
# Use distributed cluster with large nodes
# and proper sharding strategy
Client Libraries
Python Client
from qdrant_client import QdrantClient, models
import numpy as np
# Initialize client
client = QdrantClient(host="localhost", port=6333)
# Async client
from qdrant_client.async_client import AsyncQdrantClient
async_client = AsyncQdrantClient(host="localhost", port=6333)
# Batch operations
async def batch_upload():
points = []
for i in range(10000):
points.append(models.PointStruct(
id=i,
vector=np.random.rand(384).tolist(),
payload={"text": f"Document {i}"}
))
# Upload in batches
batch_size = 500
for i in range(0, len(points), batch_size):
await async_client.upsert(
collection_name="docs",
points=points[i:i+batch_size]
)
# Search with async
async def search_async(query_vector):
results = await async_client.search(
collection_name="docs",
query_vector=query_vector,
limit=10
)
return results
JavaScript/TypeScript Client
import { QdrantClient } from '@qdrant/js-client-rest';
const client = new QdrantClient({
host: 'localhost',
port: 6333,
https: false
});
// Create collection
await client.createCollection('documents', {
vectors: {
size: 384,
distance: 'Cosine'
}
});
// Insert points
const points = Array.from({ length: 100 }, (_, i) => ({
id: i,
vector: Array(384).fill(0).map(() => Math.random()),
payload: {
title: `Document ${i}`,
category: ['tech', 'science', 'business'][Math.floor(Math.random() * 3)]
}
}));
await client.upsert('documents', { points });
// Search
const searchResults = await client.search('documents', {
vector: Array(384).fill(0).map(() => Math.random()),
limit: 10,
filter: {
must: [
{
key: 'category',
match: { value: 'tech' }
}
]
}
});
// gRPC client for better performance
import { QdrantGrpcClient } from '@qdrant/js-client-grpc';
const grpcClient = new QdrantGrpcClient('localhost:6334');
Go Client
package main
import (
"context"
"fmt"
"log"
"github.com/qdrant/go-client/qdrant"
)
func main() {
// Create client
client, err := qdrant.NewClient("localhost:6334")
if err != nil {
log.Fatal(err)
}
// Create collection
vectorParams := qdrant.VectorParams{
Size: 384,
Distance: qdrant.Distance_Cosine,
}
err = client.CreateCollection(context.Background(), "documents", vectorParams)
if err != nil {
log.Fatal(err)
}
// Insert points
points := make([]*qdrant.PointStruct, 100)
for i := 0; i < 100; i++ {
points[i] = &qdrant.PointStruct{
Id: qdrant.NewPointIdNum(uint64(i)),
Vectors: qdrant.NewVectors(generateVector(384)),
Payload: map[string]interface{}{
"title": fmt.Sprintf("Document %d", i),
"category": "tech",
},
}
}
err = client.Upsert(context.Background(), "documents", points)
if err != nil {
log.Fatal(err)
}
// Search
searchRequest := &qdrant.SearchRequest{
Vector: generateVector(384),
Limit: 10,
WithPayload: true,
}
results, err := client.Search(context.Background(), "documents", searchRequest)
if err != nil {
log.Fatal(err)
}
for _, hit := range results {
fmt.Printf("ID: %v, Score: %f\n", hit.Id, hit.Score)
}
}
Rust Client
use qdrant_client::Qdrant;
use qdrant_client::qdrant::{
CreateCollection, VectorParams, Distance,
PointStruct, UpsertPoints, SearchPoints,
WithPayloadSelector,
};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = Qdrant::from_url("http://localhost:6334").build()?;
// Create collection
client
.create_collection(&CreateCollection {
collection_name: "documents".to_string(),
vectors_config: Some(VectorParams::from(VectorParams {
size: 384,
distance: Distance::Cosine.into(),
..Default::default()
})),
..Default::default()
})
.await?;
// Insert points
let points: Vec<PointStruct> = (0..100)
.map(|i| PointStruct {
id: Some(i.into()),
vectors: Some(generate_vector(384).into()),
payload: serde_json::json!({
"title": format!("Document {}", i),
"category": "tech"
})
.as_object()
.unwrap()
.clone(),
})
.collect();
client
.upsert_points(UpsertPoints {
collection_name: "documents".to_string(),
points,
..Default::default()
})
.await?;
// Search
let search_result = client
.search_points(&SearchPoints {
collection_name: "documents".to_string(),
vector: generate_vector(384),
limit: 10,
with_payload: Some(WithPayloadSelector::Enable(true)),
..Default::default()
})
.await?;
for point in search_result.result {
println!("ID: {:?}, Score: {}", point.id, point.score);
}
Ok(())
}
Integration Patterns
RAG (Retrieval-Augmented Generation)
import openai
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer
class RAGSystem:
def __init__(self):
# Initialize components
self.client = QdrantClient(host="localhost", port=6333)
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.llm = openai.OpenAI()
def add_document(self, text: str, metadata: dict):
# Generate embedding
embedding = self.embedding_model.encode(text).tolist()
# Store with metadata
self.client.upsert(
collection_name="documents",
points=[PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={
"text": text,
**metadata
}
)]
)
def query(self, question: str, context_size: int = 5) -> str:
# Generate question embedding
question_embedding = self.embedding_model.encode(question).tolist()
# Retrieve relevant documents
results = self.client.search(
collection_name="documents",
query_vector=question_embedding,
limit=context_size,
with_payload=True
)
# Prepare context
context = "\n\n".join([
hit.payload["text"] for hit in results
])
# Generate answer
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"Answer the question based on this context:\n{context}"
},
{
"role": "user",
"content": question
}
]
)
return response.choices[0].message.content
Recommendation System
from qdrant_client import QdrantClient
from collections import defaultdict
class RecommendationEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
def add_user_interaction(self, user_id: str, item_id: str, rating: float):
# Store interaction as user-item vector
user_vector = self.get_user_embedding(user_id)
item_vector = self.get_item_embedding(item_id)
# Create combined interaction vector
interaction_vector = self.combine_vectors(user_vector, item_vector, rating)
self.client.upsert(
collection_name="interactions",
points=[PointStruct(
id=f"{user_id}_{item_id}",
vector=interaction_vector,
payload={
"user_id": user_id,
"item_id": item_id,
"rating": rating,
"timestamp": datetime.utcnow()
}
)]
)
def recommend_items(self, user_id: str, limit: int = 10):
# Get user's positive interactions
positive_interactions = self.client.scroll(
collection_name="interactions",
scroll_request=ScrollRequest(
filter=Filter(
must=[
{"key": "user_id", "match": {"value": user_id}},
{"key": "rating", "range": {"gte": 4.0}}
]
),
limit=100,
with_vectors=True
)
)[0]
if not positive_interactions:
# Return popular items for new users
return self.get_popular_items(limit)
# Extract item IDs
positive_item_ids = [hit.payload["item_id"] for hit in positive_interactions]
# Get recommendations
recommendations = self.client.recommend(
collection_name="items",
positive=positive_item_ids,
limit=limit,
with_payload=True
)
return recommendations
def collaborative_filtering(self, user_id: str):
# Find similar users
user_vector = self.get_user_embedding(user_id)
similar_users = self.client.search(
collection_name="users",
query_vector=user_vector,
limit=10,
with_payload=True
)
# Get items liked by similar users
similar_user_ids = [user.id for user in similar_users]
recommended_items = self.client.search(
collection_name="interactions",
query_filter=Filter(
must=[
{"key": "user_id", "match": {"any": similar_user_ids}},
{"key": "rating", "range": {"gte": 4.0}}
]
),
limit=20,
group_by="item_id",
group_size=1
)
return recommended_items
Semantic Search Engine
from qdrant_client import QdrantClient
from typing import List, Dict, Any
class SemanticSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.embedding_model = SentenceTransformer('multi-qa-mpnet-base-dot-v1')
def index_documents(self, documents: List[Dict[str, Any]]):
points = []
for doc in documents:
# Generate embeddings for title and content
title_embedding = self.embedding_model.encode(doc['title']).tolist()
content_embedding = self.embedding_model.encode(doc['content'][:512]).tolist()
points.append(PointStruct(
id=doc['id'],
vector={
"title": title_embedding,
"content": content_embedding
},
payload={
**doc,
"indexed_at": datetime.utcnow().isoformat()
}
))
# Batch upload
self.client.upsert(
collection_name="search_docs",
points=points
)
def search(self, query: str, filters: Dict = None, limit: int = 10):
# Generate query embeddings
query_embedding = self.embedding_model.encode(query).tolist()
# Build filter if provided
query_filter = None
if filters:
conditions = []
for key, value in filters.items():
if isinstance(value, list):
conditions.append({"key": key, "match": {"any": value}})
else:
conditions.append({"key": key, "match": {"value": value}})
if conditions:
query_filter = Filter(must=conditions)
# Perform search
results = self.client.search(
collection_name="search_docs",
query_vector={
"title": query_embedding,
"content": query_embedding
},
query_filter=query_filter,
limit=limit,
with_payload=True,
score_threshold=0.3
)
return results
def hybrid_search(self, query: str, filters: Dict = None, limit: int = 10):
# Combine vector search with keyword matching
vector_results = self.search(query, filters, limit * 2)
# Extract keywords from query
keywords = self.extract_keywords(query)
# Boost results with keyword matches
boosted_results = []
for result in vector_results:
score_boost = 1.0
# Check for keyword matches in title
title = result.payload.get('title', '').lower()
for keyword in keywords:
if keyword in title:
score_boost += 0.2
# Apply boost
result.score *= score_boost
boosted_results.append(result)
# Sort by boosted score
boosted_results.sort(key=lambda x: x.score, reverse=True)
return boosted_results[:limit]
Image Similarity Search
from qdrant_client import QdrantClient
import torch
from torchvision import models, transforms
from PIL import Image
class ImageSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
# Load pre-trained model for image embeddings
self.model = models.resnet50(pretrained=True)
self.model.eval()
# Image preprocessing
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])
def extract_features(self, image_path: str) -> list:
# Load and preprocess image
image = Image.open(image_path).convert('RGB')
image_tensor = self.transform(image).unsqueeze(0)
# Extract features
with torch.no_grad():
features = self.model(image_tensor)
# Normalize features
features = torch.nn.functional.normalize(features, p=2, dim=1)
return features.flatten().tolist()
def index_image(self, image_path: str, metadata: dict):
features = self.extract_features(image_path)
self.client.upsert(
collection_name="images",
points=[PointStruct(
id=metadata.get('id', str(uuid.uuid4())),
vector=features,
payload={
"image_path": image_path,
**metadata
}
)]
)
def find_similar_images(self, query_image_path: str, limit: int = 10):
query_features = self.extract_features(query_image_path)
results = self.client.search(
collection_name="images",
query_vector=query_features,
limit=limit,
with_payload=True
)
return results
def search_by_text(self, text_query: str, limit: int = 10):
# Use CLIP or similar model for text-to-image search
# This is a simplified example
text_embedding = self.text_to_image_embedding(text_query)
results = self.client.search(
collection_name="images",
query_vector=text_embedding,
limit=limit,
with_payload=True
)
return results
Monitoring & Observability
Metrics Collection
# Qdrant provides built-in metrics endpoint
# Accessible at /metrics in Prometheus format
import requests
import time
class QdrantMonitor:
def __init__(self, base_url="http://localhost:6333"):
self.base_url = base_url
def get_metrics(self):
"""Get Prometheus metrics"""
response = requests.get(f"{self.base_url}/metrics")
return response.text
def get_collection_stats(self, collection_name):
"""Get collection-specific statistics"""
client = QdrantClient(host="localhost", port=6333)
collection_info = client.get_collection(collection_name)
return {
"points_count": collection_info.points_count,
"vectors_count": collection_info.vectors_count,
"status": collection_info.status,
"config": collection_info.config.dict(),
"indexed_vectors_count": collection_info.indexed_vectors_count
}
def monitor_performance(self, interval=60):
"""Continuously monitor performance"""
while True:
stats = self.get_collection_stats("documents")
# Calculate performance indicators
indexing_rate = stats.get("points_count", 0) / interval
memory_usage = self.calculate_memory_usage(stats)
print(f"Points: {stats['points_count']}")
print(f"Indexing rate: {indexing_rate:.2f} points/sec")
print(f"Memory usage: {memory_usage:.2f} MB")
time.sleep(interval)
def calculate_memory_usage(self, stats):
"""Estimate memory usage based on collection stats"""
# Rough estimation
vector_size = 384 * 4 # bytes per vector (float32)
points_count = stats.get("points_count", 0)
return (points_count * vector_size) / (1024 * 1024) # MB
Logging & Debugging
import logging
from qdrant_client import QdrantClient
# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
logger = logging.getLogger('qdrant')
class MonitoredQdrantClient(QdrantClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger('qdrant.client')
def search(self, collection_name, *args, **kwargs):
start_time = time.time()
try:
result = super().search(collection_name, *args, **kwargs)
# Log search metrics
duration = time.time() - start_time
self.logger.info(
f"Search in {collection_name}: "
f"limit={kwargs.get('limit', 10)}, "
f"duration={duration:.3f}s, "
f"results={len(result)}"
)
return result
except Exception as e:
self.logger.error(
f"Search failed in {collection_name}: {str(e)}"
)
raise
def upsert(self, collection_name, points):
start_time = time.time()
try:
result = super().upsert(collection_name, points)
duration = time.time() - start_time
self.logger.info(
f"Upsert to {collection_name}: "
f"points={len(points)}, "
f"duration={duration:.3f}s"
)
return result
except Exception as e:
self.logger.error(
f"Upsert failed in {collection_name}: {str(e)}"
)
raise
Alerting
import smtplib
from email.mime.text import MIMEText
from prometheus_client import start_http_server, Gauge
class QdrantAlertManager:
def __init__(self):
self.alerts = []
# Prometheus metrics
self.points_count = Gauge('qdrant_points_count', 'Number of points in collection')
self.search_latency = Gauge('qdrant_search_latency_seconds', 'Search latency')
self.error_rate = Gauge('qdrant_error_rate', 'Error rate')
def check_health(self, client):
"""Perform health checks"""
alerts = []
try:
# Check collection status
collections = client.get_collections()
for collection in collections.collections:
info = client.get_collection(collection.name)
# Check for low disk space
if info.points_count > 1000000: # Threshold
alerts.append({
"type": "warning",
"message": f"Collection {collection.name} has {info.points_count} points"
})
# Update metrics
self.points_count.set(info.points_count)
# Test search latency
start = time.time()
client.search("test_collection", [0] * 384, limit=1)
latency = time.time() - start
self.search_latency.set(latency)
if latency > 1.0: # 1 second threshold
alerts.append({
"type": "critical",
"message": f"High search latency: {latency:.2f}s"
})
except Exception as e:
self.error_rate.inc()
alerts.append({
"type": "critical",
"message": f"Health check failed: {str(e)}"
})
return alerts
def send_alert(self, alert):
"""Send alert via email"""
msg = MIMEText(f"""
Qdrant Alert: {alert['type'].upper()}
Message: {alert['message']}
Timestamp: {datetime.utcnow().isoformat()}
""")
msg['Subject'] = f"Qdrant Alert: {alert['type'].upper()}"
msg['From'] = "qdrant-monitor@example.com"
msg['To'] = "admin@example.com"
# Send email (configure SMTP settings)
with smtplib.SMTP('localhost') as server:
server.send_message(msg)
def start_monitoring(self, interval=60):
"""Start continuous monitoring"""
start_http_server(8000) # Expose metrics
client = QdrantClient(host="localhost", port=6333)
while True:
alerts = self.check_health(client)
for alert in alerts:
self.send_alert(alert)
self.alerts.append(alert)
time.sleep(interval)
Security Best Practices
Authentication & Authorization
# Configure API key authentication
"""
config.yaml:
service:
api_key: "your-secret-api-key"
# Or use environment variable
export QDRANT_SERVICE__API_KEY="your-secret-api-key"
"""
# Use authenticated client
from qdrant_client import QdrantClient
client = QdrantClient(
host="localhost",
port=6333,
api_key="your-secret-api-key"
)
# JWT authentication (for enterprise)
client = QdrantClient(
host="localhost",
port=6333,
jwt_token="your-jwt-token"
)
Network Security
# Configure HTTPS
"""
config.yaml:
service:
enable_https: true
https_cert_path: "/path/to/cert.pem"
https_key_path: "/path/to/key.pem"
"""
# Use client with HTTPS
client = QdrantClient(
host="your-qdrant-domain.com",
port=6333,
https=True,
verify=True, # Verify SSL certificate
timeout=30
)
# Network restrictions
"""
config.yaml:
service:
host: "127.0.0.1" # Listen only on localhost
http_port: 6333
# Or use specific interface
service:
host: "10.0.0.100" # Internal network only
"""
Data Encryption
# Enable encryption at rest (enterprise feature)
"""
config.yaml:
storage:
encrypt_storage: true
encryption_key_path: "/path/to/encryption.key"
"""
# Client-side encryption for sensitive payloads
from cryptography.fernet import Fernet
class EncryptedQdrantClient:
def __init__(self, encryption_key):
self.client = QdrantClient(host="localhost", port=6333)
self.cipher = Fernet(encryption_key)
def encrypt_payload(self, payload):
"""Encrypt sensitive fields"""
encrypted = payload.copy()
if 'sensitive_data' in encrypted:
data = json.dumps(encrypted['sensitive_data'])
encrypted['sensitive_data'] = self.cipher.encrypt(
data.encode()
).decode()
return encrypted
def decrypt_payload(self, payload):
"""Decrypt sensitive fields"""
decrypted = payload.copy()
if 'sensitive_data' in decrypted:
data = self.cipher.decrypt(
decrypted['sensitive_data'].encode()
).decode()
decrypted['sensitive_data'] = json.loads(data)
return decrypted
def upsert(self, collection_name, points):
"""Upsert with encryption"""
encrypted_points = []
for point in points:
point.payload = self.encrypt_payload(point.payload)
encrypted_points.append(point)
return self.client.upsert(collection_name, encrypted_points)
Access Control
# Implement RBAC (Role-Based Access Control)
class QdrantAccessManager:
def __init__(self):
self.roles = {
"admin": ["read", "write", "delete", "manage"],
"writer": ["read", "write"],
"reader": ["read"]
}
self.users = {
"user1": {"role": "admin"},
"user2": {"role": "writer"},
"user3": {"role": "reader"}
}
def check_permission(self, user, operation):
"""Check if user has permission for operation"""
role = self.users.get(user, {}).get("role")
if not role:
return False
return operation in self.roles.get(role, [])
def authorize_operation(self, user, operation, collection_name):
"""Authorize operation on collection"""
if not self.check_permission(user, operation):
raise PermissionError(f"User {user} cannot {operation} on {collection_name}")
return True
# Usage example
access_manager = QdrantAccessManager()
class SecureQdrantClient(QdrantClient):
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.access_manager = QdrantAccessManager()
def search(self, collection_name, *args, **kwargs):
self.access_manager.authorize_operation(self.user, "read", collection_name)
return super().search(collection_name, *args, **kwargs)
def upsert(self, collection_name, points):
self.access_manager.authorize_operation(self.user, "write", collection_name)
return super().upsert(collection_name, points)
Real-World Applications
E-commerce Product Search
class ProductSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.text_encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.image_encoder = self.load_image_encoder()
def index_product(self, product):
# Create multi-modal embeddings
text_embedding = self.text_encoder.encode(
f"{product['title']} {product['description']} {' '.join(product['tags'])}"
).tolist()
image_embedding = None
if product.get('image_url'):
image_embedding = self.encode_image(product['image_url'])
# Store product with rich metadata
point = PointStruct(
id=product['id'],
vector={
"text": text_embedding,
"image": image_embedding
} if image_embedding else {"text": text_embedding},
payload={
"title": product['title'],
"description": product['description'],
"price": product['price'],
"category": product['category'],
"brand": product['brand'],
"tags": product['tags'],
"in_stock": product['in_stock'],
"rating": product.get('rating', 0),
"popularity_score": product.get('popularity_score', 0)
}
)
self.client.upsert("products", [point])
def search_products(self, query, filters=None, limit=20):
# Generate query embedding
query_embedding = self.text_encoder.encode(query).tolist()
# Build filter
query_filter = None
if filters:
conditions = []
if 'category' in filters:
conditions.append({
"key": "category",
"match": {"value": filters['category']}
})
if 'price_range' in filters:
conditions.append({
"key": "price",
"range": filters['price_range']
})
if 'in_stock' in filters:
conditions.append({
"key": "in_stock",
"match": {"value": filters['in_stock']}
})
if conditions:
query_filter = Filter(must=conditions)
# Search with boosting
results = self.client.search(
collection_name="products",
query_vector={"text": query_embedding},
query_filter=query_filter,
limit=limit,
with_payload=True,
score_threshold=0.3
)
# Re-rank based on business rules
reranked_results = []
for hit in results:
# Apply business logic boosts
boost = 1.0
# Boost popular items
popularity = hit.payload.get('popularity_score', 0)
boost += popularity * 0.1
# Boost in-stock items
if hit.payload.get('in_stock', False):
boost += 0.2
# Boost highly rated items
rating = hit.payload.get('rating', 0)
if rating >= 4.0:
boost += 0.15
hit.score *= boost
reranked_results.append(hit)
# Sort by final score
reranked_results.sort(key=lambda x: x.score, reverse=True)
return reranked_results[:limit]
def get_recommendations(self, user_id, product_id, limit=10):
# Get similar products
similar = self.client.recommend(
collection_name="products",
positive=[product_id],
limit=limit * 2,
with_payload=True
)
# Filter based on user preferences
user_prefs = self.get_user_preferences(user_id)
filtered_results = []
for hit in similar:
# Apply user preference filters
if user_prefs.get('preferred_brands'):
if hit.payload.get('brand') in user_prefs['preferred_brands']:
hit.score *= 1.2
if user_prefs.get('price_range'):
price = hit.payload.get('price', 0)
if user_prefs['price_range'][0] <= price <= user_prefs['price_range'][1]:
hit.score *= 1.1
filtered_results.append(hit)
# Sort and return
filtered_results.sort(key=lambda x: x.score, reverse=True)
return filtered_results[:limit]
Document Analysis System
class DocumentAnalysisSystem:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.text_encoder = SentenceTransformer('all-mpnet-base-v2')
self.nlp = spacy.load("en_core_web_lg")
def process_document(self, file_path, metadata):
# Extract text from document
text = self.extract_text(file_path)
# Split into chunks
chunks = self.chunk_document(text)
# Process each chunk
points = []
for i, chunk in enumerate(chunks):
# Generate embedding
embedding = self.text_encoder.encode(chunk).tolist()
# Extract entities
entities = self.extract_entities(chunk)
# Generate summary
summary = self.generate_summary(chunk)
points.append(PointStruct(
id=f"{metadata['doc_id']}_chunk_{i}",
vector=embedding,
payload={
"doc_id": metadata['doc_id'],
"chunk_id": i,
"text": chunk,
"summary": summary,
"entities": entities,
"doc_type": metadata['doc_type'],
"author": metadata.get('author'),
"date": metadata.get('date'),
"source_file": file_path
}
))
# Store chunks
self.client.upsert("document_chunks", points)
# Store document metadata
self.client.upsert("documents", [PointStruct(
id=metadata['doc_id'],
vector=self.text_encoder.encode(text[:2000]).tolist(), # Document summary embedding
payload=metadata
)])
def semantic_search(self, query, doc_type=None, date_range=None, limit=20):
# Generate query embedding
query_embedding = self.text_encoder.encode(query).tolist()
# Build filter
conditions = []
if doc_type:
conditions.append({"key": "doc_type", "match": {"value": doc_type}})
if date_range:
conditions.append({
"key": "date",
"range": date_range
})
query_filter = Filter(must=conditions) if conditions else None
# Search for relevant chunks
results = self.client.search(
collection_name="document_chunks",
query_vector=query_embedding,
query_filter=query_filter,
limit=limit,
with_payload=True,
group_by="doc_id",
group_size=3
)
return results
def entity_search(self, entity_type, entity_value, limit=10):
# Search by extracted entities
results = self.client.search(
collection_name="document_chunks",
query_filter=Filter(
must=[
{
"key": "entities.type",
"match": {"value": entity_type}
},
{
"key": "entities.text",
"match": {"value": entity_value}
}
]
),
limit=limit,
with_payload=True
)
return results
def find_similar_documents(self, doc_id, limit=10):
# Find documents similar to given document
doc_info = self.client.retrieve(
collection_name="documents",
ids=[doc_id],
with_vectors=True
)[0]
if doc_info.vector:
similar = self.client.search(
collection_name="documents",
query_vector=doc_info.vector,
limit=limit,
with_payload=True,
score_threshold=0.5
)
return similar
return []
def generate_document_summary(self, doc_id):
# Retrieve all chunks
chunks = self.client.scroll(
collection_name="document_chunks",
scroll_request=ScrollRequest(
filter=Filter(
must=[{"key": "doc_id", "match": {"value": doc_id}}]
),
limit=100,
with_payload=True
)
)[0]
# Sort by chunk ID and concatenate
chunks.sort(key=lambda x: x.payload['chunk_id'])
full_text = " ".join([chunk.payload['text'] for chunk in chunks])
# Generate summary (could use LLM here)
summary = self.generate_summary(full_text, max_length=500)
return {
"doc_id": doc_id,
"summary": summary,
"chunk_count": len(chunks),
"total_length": len(full_text)
}
Fraud Detection System
class FraudDetectionSystem:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.feature_extractor = TransactionFeatureExtractor()
def record_transaction(self, transaction):
# Extract features for vector embedding
features = self.feature_extractor.extract(transaction)
# Create transaction vector
vector = self.create_transaction_vector(features, transaction)
# Store with rich metadata
point = PointStruct(
id=transaction['transaction_id'],
vector=vector,
payload={
"transaction_id": transaction['transaction_id'],
"user_id": transaction['user_id'],
"amount": transaction['amount'],
"merchant": transaction['merchant'],
"category": transaction['category'],
"timestamp": transaction['timestamp'],
"location": transaction.get('location'),
"device_id": transaction.get('device_id'),
"ip_address": transaction.get('ip_address'),
"features": features,
"is_fraud": transaction.get('is_fraud', False),
"risk_score": 0.0 # Will be updated
}
)
self.client.upsert("transactions", [point])
# Check for fraud in real-time
return self.check_fraud_risk(transaction, vector)
def check_fraud_risk(self, transaction, vector):
# Find similar transactions
similar_transactions = self.client.search(
collection_name="transactions",
query_vector=vector,
limit=10,
query_filter=Filter(
must_not=[
{"key": "transaction_id", "match": {"value": transaction['transaction_id']}}
]
),
with_payload=True
)
# Calculate risk score
risk_score = self.calculate_risk_score(transaction, similar_transactions)
# Update risk score in database
self.client.set_payload(
collection_name="transactions",
payload={"risk_score": risk_score},
points=[transaction['transaction_id']]
)
# Check thresholds
if risk_score > 0.8:
return {"risk": "high", "score": risk_score, "action": "block"}
elif risk_score > 0.6:
return {"risk": "medium", "score": risk_score, "action": "review"}
else:
return {"risk": "low", "score": risk_score, "action": "allow"}
def calculate_risk_score(self, transaction, similar_transactions):
risk_score = 0.0
# Check for unusual amounts
if similar_transactions:
avg_amount = sum(t.payload['amount'] for t in similar_transactions) / len(similar_transactions)
if transaction['amount'] > avg_amount * 3:
risk_score += 0.3
# Check for unusual locations
transaction_location = transaction.get('location')
if transaction_location:
location_distances = []
for t in similar_transactions:
if t.payload.get('location'):
distance = self.calculate_distance(
transaction_location,
t.payload['location']
)
location_distances.append(distance)
if location_distances and min(location_distances) > 1000: # 1000 km
risk_score += 0.4
# Check for unusual timing
transaction_time = datetime.fromisoformat(transaction['timestamp'])
if transaction_time.hour < 6 or transaction_time.hour > 23:
risk_score += 0.2
# Check fraud rate in similar transactions
fraud_count = sum(1 for t in similar_transactions if t.payload.get('is_fraud'))
if similar_transactions:
fraud_rate = fraud_count / len(similar_transactions)
risk_score += fraud_rate * 0.3
# Check device/IP anomalies
if self.is_new_device_for_user(
transaction['user_id'],
transaction.get('device_id')
):
risk_score += 0.2
if self.is_new_ip_for_user(
transaction['user_id'],
transaction.get('ip_address')
):
risk_score += 0.2
return min(risk_score, 1.0)
def detect_fraud_patterns(self):
# Analyze patterns across all transactions
patterns = []
# Look for high-frequency transactions
high_freq_users = self.client.search(
collection_name="transactions",
query_vector=[0] * 128, # Dummy vector
limit=1000,
with_payload=True,
group_by="user_id",
group_size=1,
score_threshold=0
)
for group in high_freq_users:
if group.hits[0].payload.get('transaction_count', 0) > 50: # Threshold
patterns.append({
"type": "high_frequency",
"user_id": group.id,
"count": group.hits[0].payload.get('transaction_count')
})
# Look for unusual merchant patterns
unusual_merchants = self.client.scroll(
collection_name="transactions",
scroll_request=ScrollRequest(
limit=10000,
with_payload=["merchant", "amount", "timestamp"]
)
)[0]
# Analyze merchant patterns
merchant_stats = {}
for point in unusual_merchants:
merchant = point.payload['merchant']
if merchant not in merchant_stats:
merchant_stats[merchant] = {
"count": 0,
"total_amount": 0,
"unique_users": set()
}
merchant_stats[merchant]["count"] += 1
merchant_stats[merchant]["total_amount"] += point.payload['amount']
merchant_stats[merchant]["unique_users"].add(point.payload['user_id'])
# Flag suspicious merchants
for merchant, stats in merchant_stats.items():
if stats["count"] > 100 and len(stats["unique_users"]) < 5:
patterns.append({
"type": "suspicious_merchant",
"merchant": merchant,
"transaction_count": stats["count"],
"unique_users": len(stats["unique_users"])
})
return patterns
Troubleshooting
Common Issues
# Memory issues
# Problem: Out of memory errors
# Solution 1: Use on-disk storage
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
on_disk=True,
quantization_config=QuantizationConfig(
scalar=ScalarQuantization(type="scalar", always_ram=False)
)
)
# Solution 2: Optimize HNSW parameters
client.update_collection(
collection_name="large_collection",
hnsw_config=HnswConfigDiff(
m=16, # Reduce from default
ef_construct=100,
max_indexing_threads=4 # Limit threads
)
)
# Slow search performance
# Solution: Check and optimize configuration
collection_info = client.get_collection("my_collection")
print(f"Indexed vectors: {collection_info.indexed_vectors_count}")
# If not all vectors indexed, wait for indexing to complete
# Or increase indexing threads
client.update_collection(
collection_name="my_collection",
optimizer_config=OptimizersConfig(
max_indexing_threads=8 # Use more threads
)
)
# Connection issues
# Solution: Check network and timeouts
try:
client = QdrantClient(
host="localhost",
port=6333,
timeout=30.0, # Increase timeout
prefer_grpc=False # Try HTTP if gRPC fails
)
# Test connection
client.get_collections()
except Exception as e:
print(f"Connection error: {e}")
# Check if Qdrant is running
# Check firewall settings
Debug Mode
# Enable debug logging
import logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('qdrant_client')
# Create client with debug
client = QdrantClient(
host="localhost",
port=6333,
log_level=logging.DEBUG
)
# Monitor operations
class DebugQdrantClient(QdrantClient):
def search(self, collection_name, *args, **kwargs):
start_time = time.time()
# Log search parameters
logger.debug(f"Searching in {collection_name}")
logger.debug(f"Limit: {kwargs.get('limit', 10)}")
logger.debug(f"Filter: {kwargs.get('query_filter')}")
try:
result = super().search(collection_name, *args, **kwargs)
duration = time.time() - start_time
logger.debug(f"Search completed in {duration:.3f}s")
logger.debug(f"Results: {len(result)}")
return result
except Exception as e:
logger.error(f"Search failed: {str(e)}")
raise
Performance Profiling
import cProfile
import pstats
def profile_qdrant_operation():
def decorator(func):
def wrapper(*args, **kwargs):
# Profile the function
profiler = cProfile.Profile()
profiler.enable()
result = func(*args, **kwargs)
profiler.disable()
# Print stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)
return result
return wrapper
return decorator
# Usage
@profile_qdrant_operation()
def batch_upload():
points = generate_test_points(10000)
client.upsert("test_collection", points)
# Or profile specific operations
def profile_search():
query_vector = generate_test_vector(384)
with cProfile.Profile() as pr:
for _ in range(100):
client.search(
"test_collection",
query_vector,
limit=10
)
# Save stats to file
pr.dump_stats('qdrant_search_profile.prof')
Recovery & Repair
# Recover from corrupted collection
def repair_collection(client, collection_name):
try:
# Try to get collection info
info = client.get_collection(collection_name)
# If successful, create backup
snapshot = client.create_snapshot(collection_name)
print(f"Created snapshot: {snapshot.name}")
except Exception as e:
print(f"Collection is corrupted: {e}")
# Options:
# 1. Restore from snapshot
snapshots = client.list_snapshots(collection_name)
if snapshots:
client.restore_snapshot(collection_name, snapshots[-1].name)
print("Restored from latest snapshot")
# 2. Recreate collection (if you have the data)
else:
print("No snapshots available. Collection may need to be recreated.")
Best Practices
Collection Design
# Choose appropriate vector dimensions
# Common dimensions:
# - all-MiniLM-L6-v2: 384
# - text-embedding-ada-002: 1536
# - BERT-base: 768
# - Custom models: variable
# Use descriptive collection names
# Good: user_embeddings, product_vectors, document_chunks
# Bad: vec1, test, temp
# Plan for growth
# Estimate your needs:
# - Small project (<1M vectors): Single node
# - Medium project (1M-10M): Consider sharding
# - Large project (>10M): Distributed cluster
# Set appropriate distance metrics
# - COSINE: Text embeddings, normalized vectors
# - EUCLID: General purpose, unnormalized
# - DOT: Normalized vectors (same as COSINE)
# - MANHATTAN: High-dimensional spaces
collection_config = {
"user_profiles": {
"dimensions": 384,
"distance": Distance.COSINE,
"on_disk": False,
"quantization": None
},
"product_catalog": {
"dimensions": 512,
"distance": Distance.EUCLID,
"on_disk": True,
"quantization": QuantizationConfig(
scalar=ScalarQuantization(type="scalar")
)
}
}
Data Management
# Use UUIDs for point IDs
import uuid
# Good
point_id = str(uuid.uuid4())
# Bad for distributed systems
point_id = incremental_number
# Version your data schema
payload_schema = {
"version": "1.0",
"fields": {
"title": {"type": "string", "required": True},
"content": {"type": "string", "required": True},
"metadata": {"type": "object", "optional": True}
}
}
# Implement data validation
def validate_payload(payload, schema):
"""Validate payload against schema"""
for field, config in schema["fields"].items():
if config.get("required") and field not in payload:
raise ValueError(f"Required field missing: {field}")
return True
# Use consistent timestamp formats
from datetime import datetime, timezone
timestamp = datetime.now(timezone.utc).isoformat()
# "2024-01-01T12:00:00+00:00"
Performance Optimization
# Batch operations
def batch_upsert(client, collection_name, points, batch_size=100):
"""Upload points in batches"""
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(collection_name, batch)
# Optional: Add delay to avoid overwhelming
# time.sleep(0.1)
# Use appropriate HNSW parameters
hnsw_configs = {
"small": { # <10K vectors
"m": 16,
"ef_construct": 100,
"ef": 64
},
"medium": { # 10K-1M vectors
"m": 32,
"ef_construct": 200,
"ef": 128
},
"large": { # >1M vectors
"m": 64,
"ef_construct": 400,
"ef": 256
}
}
# Monitor and adjust
def optimize_hnsw_parameters(client, collection_name, vector_count):
"""Adjust HNSW parameters based on collection size"""
if vector_count < 10000:
config = hnsw_configs["small"]
elif vector_count < 1000000:
config = hnsw_configs["medium"]
else:
config = hnsw_configs["large"]
client.update_collection(
collection_name,
hnsw_config=HnswConfigDiff(**config)
)
Query Optimization
# Use filters effectively
# Good: Use indexed fields for filtering
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "electronics"} # Fast
}
]
)
# Bad: Filter on nested complex objects
filter = Filter(
must=[
{
"key": "metadata.attributes.specifications.weight",
"range": {"gte": 1.0} # Slow
}
]
)
# Limit returned payload
# Only request needed fields
client.search(
collection_name="products",
query_vector=query,
limit=10,
with_payload=["title", "price", "image_url"], # Only these fields
with_vectors=False # Don't need vectors in results
)
# Use score thresholds
client.search(
collection_name="documents",
query_vector=query,
limit=10,
score_threshold=0.5 # Skip low-similarity results
)
Monitoring & Maintenance
# Regular health checks
def health_check(client):
"""Perform comprehensive health check"""
checks = {
"connection": False,
"collections": [],
"disk_usage": {},
"memory_usage": {}
}
try:
# Check connection
client.get_collections()
checks["connection"] = True
# Check each collection
collections = client.get_collections()
for collection in collections.collections:
info = client.get_collection(collection.name)
checks["collections"].append({
"name": collection.name,
"status": info.status,
"points_count": info.points_count,
"indexed_vectors": info.indexed_vectors_count
})
# Check disk usage (implementation depends on OS)
checks["disk_usage"] = get_disk_usage()
# Estimate memory usage
checks["memory_usage"] = estimate_memory_usage(checks["collections"])
except Exception as e:
print(f"Health check failed: {e}")
return checks
# Implement alerting
def setup_alerts(client):
"""Set up monitoring alerts"""
# Monitor collection size
collections = client.get_collections()
for collection in collections.collections:
info = client.get_collection(collection.name)
# Alert if collection is too large
if info.points_count > 1000000: # 1M
send_alert(
f"Collection {collection.name} has {info.points_count} points"
)
# Alert if indexing is slow
if info.status != "green":
send_alert(
f"Collection {collection.name} status: {info.status}"
)
Security Best Practices
# Never commit credentials
# Use environment variables
import os
API_KEY = os.getenv("QDRANT_API_KEY")
if not API_KEY:
raise ValueError("QDRANT_API_KEY environment variable not set")
# Use HTTPS in production
client = QdrantClient(
host="your-qdrant-domain.com",
port=6333,
https=True,
api_key=API_KEY
)
# Implement rate limiting
from collections import defaultdict
import time
class RateLimiter:
def __init__(self, max_requests=100, window_seconds=60):
self.requests = defaultdict(list)
self.max_requests = max_requests
self.window = window_seconds
def is_allowed(self, user_id):
now = time.time()
user_requests = self.requests[user_id]
# Remove old requests
user_requests = [t for t in user_requests if now - t < self.window]
self.requests[user_id] = user_requests
return len(user_requests) < self.max_requests
# Validate all inputs
def validate_vector(vector, expected_dim):
"""Validate input vector"""
if not isinstance(vector, list):
raise TypeError("Vector must be a list")
if len(vector) != expected_dim:
raise ValueError(f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}")
if not all(isinstance(x, (int, float)) for x in vector):
raise TypeError("Vector must contain only numbers")
return True
Resources & Further Learning
Official Resources
Client Libraries
Integration Examples
Performance Guides
Community & Support
Related Technologies
- HNSW Algorithm Paper
- Sentence Transformers
- FAISS (Facebook AI Similarity Search)
- Weaviate Documentation
- Pinecone Documentation
Tutorials & Courses
- Building RAG Applications with Qdrant
- Semantic Search Tutorial
- Recommendation Systems with Qdrant
- Multi-modal Search Guide
This primer covers Qdrant vector database implementation in 2025 with comprehensive coverage of features, performance optimization, and real-world applications. Qdrant continues to evolve rapidly, so stay updated with the latest releases and documentation for the most current features and best practices.