Skip to main content

Qdrant Comprehensive Primer

The Ultimate Guide to Vector Similarity Search & AI Applications


Table of Contents

  1. Introduction & Philosophy
  2. Installation & Setup
  3. Core Concepts
  4. Collection Management
  5. Vector Operations
  6. Advanced Search Features
  7. Filtering & Querying
  8. Performance Optimization
  9. Distributed Deployment
  10. Client Libraries
  11. Integration Patterns
  12. Monitoring & Observability
  13. Security Best Practices
  14. Real-World Applications
  15. Troubleshooting
  16. Best Practices

Introduction & Philosophy

What is Qdrant?

Qdrant (pronounced "Quadrant") is an open-source vector database and similarity search engine designed specifically for AI applications. Written in Rust for performance and reliability, Qdrant provides production-ready vector similarity search with advanced filtering capabilities.

Core Philosophy

"Accuracy, Speed, and Simplicity" - Qdrant embodies the principle that vector search should be both powerful and accessible. It's designed to:

  • Deliver accurate results through advanced quantization and indexing techniques
  • Maintain high performance even with billions of vectors and complex filters
  • Provide developer-friendly APIs that integrate seamlessly with AI workflows
  • Scale horizontally across multiple nodes without sacrificing search quality
  • Support rich metadata alongside vectors for context-aware applications

Key Differentiators

  1. Filtered Search: Advanced filtering capabilities without sacrificing performance
  2. Quantization Support: Multiple quantization options for memory efficiency
  3. On-disk Storage: Efficient memory usage with optional RAM acceleration
  4. Distributed Architecture: Built-in sharding and replication
  5. Rich Client SDKs: Support for Python, TypeScript/JavaScript, Go, Rust, and more

Architecture Overview

Qdrant uses a sophisticated architecture that combines:

  • HNSW (Hierarchical Navigable Small World) indices for fast approximate nearest neighbor search
  • Payload Storage: Flexible JSON document storage alongside vectors
  • Quantization: Multiple compression options (Scalar, Product, Binary)
  • Distributed System: Automatic sharding with consensus-based replication

Installation & Setup

Local Installation

# Pull the latest Qdrant image
docker pull qdrant/qdrant

# Run Qdrant server
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant

# Or with custom configuration
docker run -p 6333:6333 -p 6334:6334 \
-v $(pwd)/config.yaml:/qdrant/config/production.yaml \
-v $(pwd)/qdrant_storage:/qdrant/storage \
qdrant/qdrant

Python Client

# Install the Python client
pip install qdrant-client

# Quick start with in-memory storage
from qdrant_client import QdrantClient

# In-memory for testing
client = QdrantClient(":memory:")

# Connect to local server
client = QdrantClient(host="localhost", port=6333)

Binary Installation

# Download binary (Linux/macOS)
curl -L https://github.com/qdrant/qdrant/releases/latest/download/qdrant-linux-x86_64 -o qdrant
chmod +x qdrant

# Run Qdrant
./qdrant

# Or as a service
sudo ./qdrant service --config-path ./config/config.yaml

Cloud Installation

Qdrant Cloud

# Install Qdrant Cloud CLI
pip install qdrant-cloud

# Log in to Qdrant Cloud
qdrant-cloud login

# Create a new cluster
qdrant-cloud cluster create my-cluster --plan starter

Kubernetes

# qdrant-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
name: qdrant
spec:
replicas: 1
selector:
matchLabels:
app: qdrant
template:
metadata:
labels:
app: qdrant
spec:
containers:
- name: qdrant
image: qdrant/qdrant:latest
ports:
- containerPort: 6333
- containerPort: 6334
volumeMounts:
- name: storage
mountPath: /qdrant/storage
volumes:
- name: storage
persistentVolumeClaim:
claimName: qdrant-storage

Configuration

Basic Configuration (config.yaml)

# Qdrant configuration file
service:
host: '0.0.0.0' # Listen on all interfaces
http_port: 6333
grpc_port: 6334
max_request_size_mb: 32
enable_cors: true

# Storage configuration
storage:
storage_path: './storage'
# Snapshots configuration
snapshots_path: './snapshots'

# Performance tuning
performance:
max_search_threads: 0 # Auto-detect
max_workers: 0 # Equal to CPU cores

# Cluster configuration (for distributed mode)
cluster:
enabled: false
p2p:
port: 6335
consensus:
tick_period_ms: 100

Environment Variables

# Set configuration via environment variables
export QDRANT_SERVICE__HTTP_PORT=8080
export QDRANT_SERVICE__ENABLE_CORS=true
export QDRANT_STORAGE__STORAGE_PATH=/data/qdrant

# Run with environment overrides
qdrant

Core Concepts

Vectors & Embeddings

from qdrant_client import QdrantClient
import numpy as np

# Create sample vectors (embeddings)
dimensions = 384 # Common for text embeddings
vectors = [
np.random.rand(dimensions).astype(np.float32).tolist()
for _ in range(100)
]

# Vectors can be dense (float arrays) or sparse (key-value pairs)
dense_vector = [0.1, 0.2, 0.3, ..., 0.384]
sparse_vector = {
"indices": [0, 5, 10, 15],
"values": [0.8, 0.6, 0.9, 0.7]
}

Collections

from qdrant_client.models import Distance, VectorParams, CollectionInfo

# Define collection configuration
vector_params = VectorParams(
size=384, # Vector dimensions
distance=Distance.COSINE # Distance metric
)

# Create collection
client.create_collection(
collection_name="documents",
vectors_config=vector_params
)

# Get collection info
collection_info = client.get_collection("documents")
print(collection_info)

Distance Metrics

from qdrant_client.models import Distance

# Available distance metrics
distances = {
Distance.COSINE: "Cosine similarity",
Distance.EUCLID: "Euclidean distance",
Distance.DOT: "Dot product",
Distance.MANHATTAN: "Manhattan distance"
}

# Choose appropriate distance based on your use case
# - COSINE: Text embeddings, normalized vectors
# - EUCLID: General purpose, unnormalized vectors
# - DOT: Normalized embeddings (same as COSINE for unit vectors)

Payloads

# Payloads are JSON documents stored with vectors
payload = [
{
"id": "doc1",
"title": "Introduction to Vector Databases",
"content": "Vector databases are specialized for...",
"category": "database",
"tags": ["vector", "embedding", "similarity"],
"timestamp": "2024-01-01T00:00:00Z",
"metadata": {
"author": "John Doe",
"word_count": 1500
}
},
{
"id": "doc2",
"title": "Machine Learning Basics",
"content": "Machine learning algorithms learn...",
"category": "ml",
"tags": ["ml", "ai", "algorithms"],
"timestamp": "2024-01-02T00:00:00Z"
}
]

# Payloads can include any JSON-serializable data
# - Text fields
# - Numbers
# - Booleans
# - Arrays
# - Nested objects

Points

from qdrant_client.models import PointStruct

# A point combines vector, payload, and ID
points = [
PointStruct(
id=1,
vector=[0.1, 0.2, 0.3, ...],
payload={
"title": "Document 1",
"category": "tech"
}
),
PointStruct(
id=2,
vector=[0.4, 0.5, 0.6, ...],
payload={
"title": "Document 2",
"category": "science"
}
)
]

# IDs can be integers or UUID strings
point_with_uuid_id = PointStruct(
id="550e8400-e29b-41d4-a716-446655440000",
vector=[0.7, 0.8, 0.9, ...],
payload={"title": "UUID Document"}
)

Collection Management

Creating Collections

from qdrant_client.models import VectorParams, Distance, CollectionConfig

# Basic collection creation
client.create_collection(
collection_name="documents",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

# Advanced configuration with multiple vector types
from qdrant_client.models import VectorParams, VectorStructure

# Multiple named vectors (multi-modal)
vectors_config = VectorStructure(
{
"text": VectorParams(size=384, distance=Distance.COSINE),
"image": VectorParams(size=512, distance=Distance.EUCLID),
"metadata": VectorParams(size=64, distance=Distance.DOT)
}
)

client.create_collection(
collection_name="multi_modal",
vectors_config=vectors_config
)

# With on-disk storage and quantization
from qdrant_client.models import QuantizationConfig, ScalarQuantization

quantization_config = QuantizationConfig(
scalar=ScalarQuantization(
type="scalar",
quantile=0.99, # Quantile for bucket boundaries
always_ram=False # Keep quantized data on disk
)
)

client.create_collection(
collection_name="optimized",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config=quantization_config,
on_disk=True # Store vectors on disk
)

Collection Configuration

# Update collection settings
from qdrant_client.models import OptimizersConfig

client.update_collection(
collection_name="documents",
optimizer_config=OptimizersConfig(
indexing_threshold=10000, # Build index after this many points
flush_interval_sec=5, # Flush to disk every 5 seconds
max_segment_size=100000, # Maximum segment size
memmap_threshold=50000 # Use memory mapping for segments larger than this
)
)

# Configure HNSW parameters
from qdrant_client.models import HnswConfigDiff

client.update_collection(
collection_name="documents",
hnsw_config=HnswConfigDiff(
m=16, # Number of connections per node
ef_construct=100, # Size of dynamic list during construction
full_scan_threshold=10000 # Use brute force for smaller collections
)
)

Collection Operations

from qdrant_client.models import Filter

# List all collections
collections = client.get_collections()
print(collections)

# Get collection details
collection_info = client.get_collection("documents")
print(f"Vectors count: {collection_info.points_count}")
print(f"Status: {collection_info.status}")
print(f"Vectors configuration: {collection_info.config.params.vectors}")

# Delete collection
client.delete_collection("documents")

# Collection aliases
client.create_collection(
collection_name="documents_v2",
vectors_config=VectorParams(size=384, distance=Distance.COSINE)
)

# Create alias
client.update_collection_aliases(
change_aliases_operations=[
{
"create_alias": {
"collection_name": "documents_v2",
"alias_name": "documents"
}
}
]
)

# Now you can use "documents" alias
# to refer to "documents_v2" collection

Snapshots & Backups

# Create snapshot
snapshot_info = client.create_snapshot("documents")
print(f"Snapshot created: {snapshot_info.name}")

# List snapshots
snapshots = client.list_snapshots("documents")
for snapshot in snapshots:
print(f"Snapshot: {snapshot.name}, Size: {snapshot.size} bytes")

# Restore from snapshot
client.restore_snapshot("documents", snapshot_path)

# Create full backup (all collections)
backup_info = client.create_full_backup()
print(f"Backup created: {backup_info.name}")

# Schedule automatic snapshots
# This is typically done through configuration or cron job

Vector Operations

Inserting Vectors

from qdrant_client.models import PointStruct
import uuid

# Insert single point
point = PointStruct(
id=str(uuid.uuid4()), # Use UUID for better distribution
vector=[0.1, 0.2, 0.3, ...], # Your embedding
payload={
"title": "New Document",
"content": "This is a new document...",
"category": "technology",
"timestamp": datetime.datetime.utcnow().isoformat()
}
}

client.upsert(
collection_name="documents",
points=[point]
)

# Batch insert for better performance
points = []
for i in range(1000):
points.append(PointStruct(
id=str(uuid.uuid4()),
vector=np.random.rand(384).astype(np.float32).tolist(),
payload={
"id": f"doc_{i}",
"title": f"Document {i}",
"category": np.random.choice(["tech", "science", "business"]),
"score": np.random.random()
}
))

# Insert in batches
batch_size = 100
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(
collection_name="documents",
points=batch
)

Updating Vectors

# Update existing point
client.upsert(
collection_name="documents",
points=[PointStruct(
id="existing_point_id",
vector=[0.9, 0.8, 0.7, ...], # New vector
payload={
"title": "Updated Document",
"version": 2
}
)]
)

# Update only payload (keep vector unchanged)
client.set_payload(
collection_name="documents",
payload={
"last_updated": datetime.datetime.utcnow().isoformat(),
"status": "processed"
},
points=["doc_1", "doc_2", "doc_3"]
)

# Update specific payload fields
client.set_payload(
collection_name="documents",
payload={
"metadata.author": "New Author" # Nested update
},
points=["doc_1"]
)

# Delete payload fields
client.delete_payload(
collection_name="documents",
keys=["temp_field", "old_metadata"],
points=["doc_1", "doc_2"]
)

Deleting Vectors

# Delete by IDs
client.delete(
collection_name="documents",
points_selector=["doc_1", "doc_2", "doc_3"]
)

# Delete by filter
from qdrant_client.models import Filter

client.delete(
collection_name="documents",
points_selector=Filter(
must=[
{
"key": "category",
"match": {"value": "deprecated"}
}
]
)
)

# Clear entire collection
client.delete(
collection_name="documents",
points_selector=Filter(
must=[{"key": "category", "match": {"value": "any"}}]
)
)

Retrieving Vectors

# Retrieve specific points
points = client.retrieve(
collection_name="documents",
ids=["doc_1", "doc_2", "doc_3"],
with_payload=True,
with_vectors=False # Don't need vectors, just metadata
)

for point in points:
print(f"ID: {point.id}")
print(f"Title: {point.payload.get('title')}")
print(f"Category: {point.payload.get('category')}")

# Retrieve with specific payload fields
points = client.retrieve(
collection_name="documents",
ids=["doc_1"],
with_payload=["title", "category"], # Only these fields
with_vectors=True
)

# Scroll through collection (pagination)
from qdrant_client.models import ScrollRequest

scroll_result = client.scroll(
collection_name="documents",
scroll_request=ScrollRequest(
limit=10,
with_payload=True,
with_vectors=False,
filter=Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
}
]
)
)
)

points = scroll_result[0]
next_page_offset = scroll_result[1]

print(f"Retrieved {len(points)} points")

Advanced Search Features

from qdrant_client.models import SearchRequest

# Simple similarity search
query_vector = [0.1, 0.2, 0.3, ...] # Your query embedding

search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10, # Return top 10 results
with_payload=True,
with_vectors=False
)

for hit in search_result:
print(f"ID: {hit.id}")
print(f"Score: {hit.score}")
print(f"Title: {hit.payload.get('title')}")
print(f"Category: {hit.payload.get('category')}")
print("---")
# Search with multiple query vectors
query_vectors = {
"text": text_embedding,
"image": image_embedding
}

# Search with weights for different vector types
search_result = client.search(
collection_name="multi_modal",
query_vector=query_vectors,
limit=10,
with_payload=True
)

# Or with explicit weights
from qdrant_client.models import QueryVector

search_result = client.search(
collection_name="multi_modal",
query_vector=[
QueryVector(
vector=text_embedding,
name="text",
weight=0.7 # 70% weight for text
),
QueryVector(
vector=image_embedding,
name="image",
weight=0.3 # 30% weight for image
)
],
limit=10
)
# Combine vector similarity with keyword search
from qdrant_client.models import Filter, SearchParams

# Search with filter
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "timestamp",
"range": {
"gte": "2024-01-01T00:00:00Z",
"lte": "2024-12-31T23:59:59Z"
}
}
]
),
limit=10,
search_params=SearchParams(
hnsw_ef=128, # Search accuracy vs speed
exact=False, # Use approximate search
quantization=None # Don't use quantization for this search
)
)

# Search with scoring based on payload
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
with_payload=True,
score_threshold=0.5, # Minimum similarity score
payload_filter=Filter(
should=[
{
"key": "tags",
"match": {"value": "featured"}
},
{
"key": "score",
"range": {"gte": 0.8}
}
]
)
)
# Find similar items to a set of positive examples
positive_ids = ["doc_1", "doc_5", "doc_10"]
negative_ids = ["doc_2"] # Optional negative examples

recommend_result = client.recommend(
collection_name="documents",
positive=positive_ids,
negative=negative_ids,
limit=10,
with_payload=True,
strategy="average_vector" # or "best_score"
)

# Find similar to specific query with context
context_result = client.discover(
collection_name="documents",
target=target_embedding, # What to find similar to
context=[
ContextExamplePair(
positive=context_positive_embedding,
negative=context_negative_embedding
)
],
limit=10
)

Grouping Results

# Group results by a field
from qdrant_client.models import GroupRequest

groups = client.search_groups(
collection_name="documents",
query_vector=query_vector,
group_by="category", # Group by category
limit=10, # Total groups to return
group_size=3, # Items per group
with_payload=True
)

for group in groups:
print(f"Group: {group.id}")
print(f"Hits: {len(group.hits)}")
for hit in group.hits:
print(f" - {hit.payload.get('title')}")
print()

Filtering & Querying

Basic Filters

from qdrant_client.models import Filter

# Exact match
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
}
]
)

# Multiple conditions (AND)
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "published",
"match": {"value": True}
}
]
)

# OR conditions
filter = Filter(
should=[
{
"key": "category",
"match": {"value": "technology"}
},
{
"key": "category",
"match": {"value": "science"}
}
]
)

# NOT conditions
filter = Filter(
must_not=[
{
"key": "status",
"match": {"value": "archived"}
}
]
)

Advanced Filters

# Range filters
filter = Filter(
must=[
{
"key": "score",
"range": {
"gte": 0.5,
"lte": 1.0
}
},
{
"key": "timestamp",
"range": {
"gte": "2024-01-01T00:00:00Z",
"lte": "2024-12-31T23:59:59Z"
}
}
]
)

# Nested field filters
filter = Filter(
must=[
{
"key": "metadata.author",
"match": {"value": "John Doe"}
},
{
"key": "metadata.word_count",
"range": {"gte": 1000}
}
]
)

# Array operations
filter = Filter(
must=[
{
"key": "tags",
"match": {"value": "featured"} # Value exists in array
},
{
"key": "tags",
"match": {"value": "technology"}
}
]
)

# Array contains all values
filter = Filter(
must=[
{
"key": "required_skills",
"match": {"value": "python"}
},
{
"key": "required_skills",
"match": {"value": "machine_learning"}
}
]
)

# Nested array filters
filter = Filter(
must=[
{
"key": "reviews.rating",
"range": {"gte": 4}
},
{
"key": "reviews.text",
"match": {"value": "excellent"}
}
]
)
# Text match with Qdrant's full-text search
filter = Filter(
must=[
{
"key": "content",
"match": {"text": "vector database"} # Full-text search
}
]
)

# Text search with fuzziness
filter = Filter(
must=[
{
"key": "title",
"match": {
"text": "vectr databse", # Typos allowed
"fuzziness": "auto" # Auto-detect fuzziness level
}
}
]
)

# Combine with boosting
from qdrant_client.models import QueryVectorBoost

search_result = client.search(
collection_name="documents",
query_vector=query_vector,
query_filter=Filter(
must=[
{
"key": "content",
"match": {"text": "machine learning"}
}
]
),
limit=10,
query_vector_boosts=[
QueryVectorBoost(
vector=text_embedding,
name="text",
weight=0.7
),
QueryVectorBoost(
vector=keyword_boost_embedding,
name="keyword",
weight=0.3
)
]
)

Geo Filters

# Point within radius
filter = Filter(
must=[
{
"key": "location",
"geo_radius": {
"center": {
"lon": -73.9874,
"lat": 40.7589
},
"radius": 1000 # meters
}
}
]
)

# Point within bounding box
filter = Filter(
must=[
{
"key": "location",
"geo_bounding_box": {
"top_right": {"lon": -73.95, "lat": 40.78},
"bottom_left": {"lon": -74.02, "lat": 40.73}
}
}
]
)

# Complex geo filters
filter = Filter(
must=[
{
"key": "location",
"geo_polygon": {
"exterior": [
{"lon": -73.95, "lat": 40.78},
{"lon": -73.95, "lat": 40.73},
{"lon": -74.02, "lat": 40.73},
{"lon": -74.02, "lat": 40.78},
{"lon": -73.95, "lat": 40.78}
]
}
}
]
)

Performance Optimization

Quantization

from qdrant_client.models import QuantizationConfig, ScalarQuantization, ProductQuantization

# Scalar quantization (uniform)
scalar_config = QuantizationConfig(
scalar=ScalarQuantization(
type="scalar",
quantile=0.99, # Use 99th percentile for quantization
always_ram=False # Store quantized data on disk
)
)

# Product quantization (better compression)
product_config = QuantizationConfig(
product=ProductQuantization(
type="product",
compression="x32", # 32x compression
always_ram=False
)
)

# Binary quantization (fastest)
binary_config = QuantizationConfig(
binary=BinaryQuantization(
type="binary",
always_ram=False
)
)

# Apply quantization to collection
client.update_collection(
collection_name="documents",
quantization_config=scalar_config
)

# Create collection with quantization
client.create_collection(
collection_name="optimized_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
quantization_config=scalar_config,
on_disk=True
)

Indexing Strategies

from qdrant_client.models import HnswConfigDiff, OptimizersConfig

# Configure HNSW for optimal performance
hnsw_config = HnswConfigDiff(
m=16, # Number of connections per node (16-64)
ef_construct=100, # Size of dynamic list during construction
full_scan_threshold=10000, # Use brute force for smaller collections
max_indexing_threads=0, # Use all available threads
on_disk=False # Keep index in RAM for faster search
)

optimizer_config = OptimizersConfig(
indexing_threshold=10000, # Build index after this many points
flush_interval_sec=5, # Flush to disk every 5 seconds
max_segment_size=100000, # Maximum segment size
memmap_threshold=50000 # Use memory mapping for larger segments
)

client.update_collection(
collection_name="documents",
hnsw_config=hnsw_config,
optimizer_config=optimizer_config
)

Memory Management

# Configure memory limits
# This is typically done in config.yaml
"""
storage:
storage_path: "./storage"
max_segment_size: 1000000 # 1M points per segment
vectors_data_threshold: 100000 # Use disk for larger segments
cache_vector_size: 1000 # Cache size for recently accessed vectors
"""

# Use on-disk storage for large collections
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
on_disk=True, # Store vectors on disk
quantization_config=QuantizationConfig(
scalar=ScalarQuantization(type="scalar", always_ram=False)
)
)

# Optimize payload storage
# Store only necessary fields in payload
# Use external storage for large content
"""
payload_example = {
"id": "doc_1",
"title": "Document Title",
"summary": "Brief summary...", # Keep summary in payload
"content_url": "s3://bucket/documents/doc_1.txt", # Store large content externally
"metadata": {
"size": 1024,
"format": "txt"
}
}
"""

Query Optimization

from qdrant_client.models import SearchParams, SamplingStrategy

# Use appropriate search parameters
search_params = SearchParams(
hnsw_ef=128, # Balance between accuracy and speed
exact=False, # Use approximate search
quantization=None, # Don't use quantization for critical searches
indexing_threshold=10000
)

# Use sampling for very large collections
sampling = SamplingStrategy(
strategy="random",
rate=0.1 # Sample 10% of collection
)

# Optimized search with early stopping
search_result = client.search(
collection_name="documents",
query_vector=query_vector,
limit=10,
search_params=search_params,
timeout=5.0, # 5 second timeout
score_threshold=0.3, # Skip low-scoring results
with_payload=False # Don't return payload if not needed
)

# Batch searches for better throughput
batch_queries = [query1, query2, query3, ...]
batch_results = client.search_batch(
collection_name="documents",
requests=[
SearchRequest(
vector=q,
limit=10,
with_payload=True
)
for q in batch_queries
]
)

Distributed Deployment

Cluster Setup

# config.yaml for cluster mode
service:
host: '0.0.0.0'
http_port: 6333
grpc_port: 6334

cluster:
enabled: true
p2p:
port: 6335
consensus:
raft_tick_period_ms: 100
peers:
- "qdrant1:6335"
- "qdrant2:6335"
- "qdrant3:6335"

storage:
storage_path: "/qdrant/storage"
# Connect to cluster
from qdrant_client import QdrantClient

# Connect to specific node
client = QdrantClient(host="qdrant1", port=6333)

# Or use all nodes for load balancing
client = QdrantClient(
hosts=["qdrant1", "qdrant2", "qdrant3"],
port=6333,
https=False
)

Sharding & Replication

# Create collection with sharding
from qdrant_client.models import CreateCollection, ShardingMethod

client.create_collection(
collection_name="distributed_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
sharding_method=ShardingMethod.AUTO, # or CUSTOM
replication_factor=2, # Each shard has 2 replicas
write_consistency_factor=2, # Write must succeed on 2 replicas
# shard_number=4 # Number of shards (optional, auto by default)
)

# Manual sharding
client.update_collection(
collection_name="distributed_docs",
shard_key=0 # Specify shard for operation
)

# Get cluster information
cluster_info = client.get_cluster_info()
print(f"Number of peers: {len(cluster_info.peers)}")
print(f"Collection shards: {cluster_info.collection_shards}")

Load Balancing

# Configure client-side load balancing
from qdrant_client import QdrantClient

client = QdrantClient(
hosts=["node1", "node2", "node3"],
port=6333,
https=False,
timeout=10.0,
prefer_grpc=True # Use gRPC for better performance
)

# The client will automatically:
# - Retry failed requests
# - Distribute read operations
# - Handle node failures

# Health check
try:
cluster_info = client.get_cluster_info()
print("Cluster is healthy")
except Exception as e:
print(f"Cluster issue: {e}")

Scaling Strategies

# Horizontal scaling with auto-sharding
client.create_collection(
collection_name="scalable_docs",
vectors_config=VectorParams(size=384, distance=Distance.COSINE),
sharding_method=ShardingMethod.AUTO,
replication_factor=1, # Start with 1 replica
# Qdrant will automatically add shards as needed
)

# Vertical scaling within nodes
# Monitor resource usage and adjust:
"""
- Increase RAM for larger in-memory indices
- Use SSD for storage_path
- Add more CPU cores for indexing
- Adjust hnsw_ef for search quality
"""

# Hybrid approach
# Use distributed cluster with large nodes
# and proper sharding strategy

Client Libraries

Python Client

from qdrant_client import QdrantClient, models
import numpy as np

# Initialize client
client = QdrantClient(host="localhost", port=6333)

# Async client
from qdrant_client.async_client import AsyncQdrantClient

async_client = AsyncQdrantClient(host="localhost", port=6333)

# Batch operations
async def batch_upload():
points = []
for i in range(10000):
points.append(models.PointStruct(
id=i,
vector=np.random.rand(384).tolist(),
payload={"text": f"Document {i}"}
))

# Upload in batches
batch_size = 500
for i in range(0, len(points), batch_size):
await async_client.upsert(
collection_name="docs",
points=points[i:i+batch_size]
)

# Search with async
async def search_async(query_vector):
results = await async_client.search(
collection_name="docs",
query_vector=query_vector,
limit=10
)
return results

JavaScript/TypeScript Client

import { QdrantClient } from '@qdrant/js-client-rest';

const client = new QdrantClient({
host: 'localhost',
port: 6333,
https: false
});

// Create collection
await client.createCollection('documents', {
vectors: {
size: 384,
distance: 'Cosine'
}
});

// Insert points
const points = Array.from({ length: 100 }, (_, i) => ({
id: i,
vector: Array(384).fill(0).map(() => Math.random()),
payload: {
title: `Document ${i}`,
category: ['tech', 'science', 'business'][Math.floor(Math.random() * 3)]
}
}));

await client.upsert('documents', { points });

// Search
const searchResults = await client.search('documents', {
vector: Array(384).fill(0).map(() => Math.random()),
limit: 10,
filter: {
must: [
{
key: 'category',
match: { value: 'tech' }
}
]
}
});

// gRPC client for better performance
import { QdrantGrpcClient } from '@qdrant/js-client-grpc';

const grpcClient = new QdrantGrpcClient('localhost:6334');

Go Client

package main

import (
"context"
"fmt"
"log"

"github.com/qdrant/go-client/qdrant"
)

func main() {
// Create client
client, err := qdrant.NewClient("localhost:6334")
if err != nil {
log.Fatal(err)
}

// Create collection
vectorParams := qdrant.VectorParams{
Size: 384,
Distance: qdrant.Distance_Cosine,
}

err = client.CreateCollection(context.Background(), "documents", vectorParams)
if err != nil {
log.Fatal(err)
}

// Insert points
points := make([]*qdrant.PointStruct, 100)
for i := 0; i < 100; i++ {
points[i] = &qdrant.PointStruct{
Id: qdrant.NewPointIdNum(uint64(i)),
Vectors: qdrant.NewVectors(generateVector(384)),
Payload: map[string]interface{}{
"title": fmt.Sprintf("Document %d", i),
"category": "tech",
},
}
}

err = client.Upsert(context.Background(), "documents", points)
if err != nil {
log.Fatal(err)
}

// Search
searchRequest := &qdrant.SearchRequest{
Vector: generateVector(384),
Limit: 10,
WithPayload: true,
}

results, err := client.Search(context.Background(), "documents", searchRequest)
if err != nil {
log.Fatal(err)
}

for _, hit := range results {
fmt.Printf("ID: %v, Score: %f\n", hit.Id, hit.Score)
}
}

Rust Client

use qdrant_client::Qdrant;
use qdrant_client::qdrant::{
CreateCollection, VectorParams, Distance,
PointStruct, UpsertPoints, SearchPoints,
WithPayloadSelector,
};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Create client
let client = Qdrant::from_url("http://localhost:6334").build()?;

// Create collection
client
.create_collection(&CreateCollection {
collection_name: "documents".to_string(),
vectors_config: Some(VectorParams::from(VectorParams {
size: 384,
distance: Distance::Cosine.into(),
..Default::default()
})),
..Default::default()
})
.await?;

// Insert points
let points: Vec<PointStruct> = (0..100)
.map(|i| PointStruct {
id: Some(i.into()),
vectors: Some(generate_vector(384).into()),
payload: serde_json::json!({
"title": format!("Document {}", i),
"category": "tech"
})
.as_object()
.unwrap()
.clone(),
})
.collect();

client
.upsert_points(UpsertPoints {
collection_name: "documents".to_string(),
points,
..Default::default()
})
.await?;

// Search
let search_result = client
.search_points(&SearchPoints {
collection_name: "documents".to_string(),
vector: generate_vector(384),
limit: 10,
with_payload: Some(WithPayloadSelector::Enable(true)),
..Default::default()
})
.await?;

for point in search_result.result {
println!("ID: {:?}, Score: {}", point.id, point.score);
}

Ok(())
}

Integration Patterns

RAG (Retrieval-Augmented Generation)

import openai
from qdrant_client import QdrantClient
from sentence_transformers import SentenceTransformer

class RAGSystem:
def __init__(self):
# Initialize components
self.client = QdrantClient(host="localhost", port=6333)
self.embedding_model = SentenceTransformer('all-MiniLM-L6-v2')
self.llm = openai.OpenAI()

def add_document(self, text: str, metadata: dict):
# Generate embedding
embedding = self.embedding_model.encode(text).tolist()

# Store with metadata
self.client.upsert(
collection_name="documents",
points=[PointStruct(
id=str(uuid.uuid4()),
vector=embedding,
payload={
"text": text,
**metadata
}
)]
)

def query(self, question: str, context_size: int = 5) -> str:
# Generate question embedding
question_embedding = self.embedding_model.encode(question).tolist()

# Retrieve relevant documents
results = self.client.search(
collection_name="documents",
query_vector=question_embedding,
limit=context_size,
with_payload=True
)

# Prepare context
context = "\n\n".join([
hit.payload["text"] for hit in results
])

# Generate answer
response = self.llm.chat.completions.create(
model="gpt-4",
messages=[
{
"role": "system",
"content": f"Answer the question based on this context:\n{context}"
},
{
"role": "user",
"content": question
}
]
)

return response.choices[0].message.content

Recommendation System

from qdrant_client import QdrantClient
from collections import defaultdict

class RecommendationEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)

def add_user_interaction(self, user_id: str, item_id: str, rating: float):
# Store interaction as user-item vector
user_vector = self.get_user_embedding(user_id)
item_vector = self.get_item_embedding(item_id)

# Create combined interaction vector
interaction_vector = self.combine_vectors(user_vector, item_vector, rating)

self.client.upsert(
collection_name="interactions",
points=[PointStruct(
id=f"{user_id}_{item_id}",
vector=interaction_vector,
payload={
"user_id": user_id,
"item_id": item_id,
"rating": rating,
"timestamp": datetime.utcnow()
}
)]
)

def recommend_items(self, user_id: str, limit: int = 10):
# Get user's positive interactions
positive_interactions = self.client.scroll(
collection_name="interactions",
scroll_request=ScrollRequest(
filter=Filter(
must=[
{"key": "user_id", "match": {"value": user_id}},
{"key": "rating", "range": {"gte": 4.0}}
]
),
limit=100,
with_vectors=True
)
)[0]

if not positive_interactions:
# Return popular items for new users
return self.get_popular_items(limit)

# Extract item IDs
positive_item_ids = [hit.payload["item_id"] for hit in positive_interactions]

# Get recommendations
recommendations = self.client.recommend(
collection_name="items",
positive=positive_item_ids,
limit=limit,
with_payload=True
)

return recommendations

def collaborative_filtering(self, user_id: str):
# Find similar users
user_vector = self.get_user_embedding(user_id)

similar_users = self.client.search(
collection_name="users",
query_vector=user_vector,
limit=10,
with_payload=True
)

# Get items liked by similar users
similar_user_ids = [user.id for user in similar_users]

recommended_items = self.client.search(
collection_name="interactions",
query_filter=Filter(
must=[
{"key": "user_id", "match": {"any": similar_user_ids}},
{"key": "rating", "range": {"gte": 4.0}}
]
),
limit=20,
group_by="item_id",
group_size=1
)

return recommended_items

Semantic Search Engine

from qdrant_client import QdrantClient
from typing import List, Dict, Any

class SemanticSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.embedding_model = SentenceTransformer('multi-qa-mpnet-base-dot-v1')

def index_documents(self, documents: List[Dict[str, Any]]):
points = []
for doc in documents:
# Generate embeddings for title and content
title_embedding = self.embedding_model.encode(doc['title']).tolist()
content_embedding = self.embedding_model.encode(doc['content'][:512]).tolist()

points.append(PointStruct(
id=doc['id'],
vector={
"title": title_embedding,
"content": content_embedding
},
payload={
**doc,
"indexed_at": datetime.utcnow().isoformat()
}
))

# Batch upload
self.client.upsert(
collection_name="search_docs",
points=points
)

def search(self, query: str, filters: Dict = None, limit: int = 10):
# Generate query embeddings
query_embedding = self.embedding_model.encode(query).tolist()

# Build filter if provided
query_filter = None
if filters:
conditions = []
for key, value in filters.items():
if isinstance(value, list):
conditions.append({"key": key, "match": {"any": value}})
else:
conditions.append({"key": key, "match": {"value": value}})

if conditions:
query_filter = Filter(must=conditions)

# Perform search
results = self.client.search(
collection_name="search_docs",
query_vector={
"title": query_embedding,
"content": query_embedding
},
query_filter=query_filter,
limit=limit,
with_payload=True,
score_threshold=0.3
)

return results

def hybrid_search(self, query: str, filters: Dict = None, limit: int = 10):
# Combine vector search with keyword matching
vector_results = self.search(query, filters, limit * 2)

# Extract keywords from query
keywords = self.extract_keywords(query)

# Boost results with keyword matches
boosted_results = []
for result in vector_results:
score_boost = 1.0

# Check for keyword matches in title
title = result.payload.get('title', '').lower()
for keyword in keywords:
if keyword in title:
score_boost += 0.2

# Apply boost
result.score *= score_boost
boosted_results.append(result)

# Sort by boosted score
boosted_results.sort(key=lambda x: x.score, reverse=True)

return boosted_results[:limit]
from qdrant_client import QdrantClient
import torch
from torchvision import models, transforms
from PIL import Image

class ImageSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)

# Load pre-trained model for image embeddings
self.model = models.resnet50(pretrained=True)
self.model.eval()

# Image preprocessing
self.transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])

def extract_features(self, image_path: str) -> list:
# Load and preprocess image
image = Image.open(image_path).convert('RGB')
image_tensor = self.transform(image).unsqueeze(0)

# Extract features
with torch.no_grad():
features = self.model(image_tensor)

# Normalize features
features = torch.nn.functional.normalize(features, p=2, dim=1)

return features.flatten().tolist()

def index_image(self, image_path: str, metadata: dict):
features = self.extract_features(image_path)

self.client.upsert(
collection_name="images",
points=[PointStruct(
id=metadata.get('id', str(uuid.uuid4())),
vector=features,
payload={
"image_path": image_path,
**metadata
}
)]
)

def find_similar_images(self, query_image_path: str, limit: int = 10):
query_features = self.extract_features(query_image_path)

results = self.client.search(
collection_name="images",
query_vector=query_features,
limit=limit,
with_payload=True
)

return results

def search_by_text(self, text_query: str, limit: int = 10):
# Use CLIP or similar model for text-to-image search
# This is a simplified example
text_embedding = self.text_to_image_embedding(text_query)

results = self.client.search(
collection_name="images",
query_vector=text_embedding,
limit=limit,
with_payload=True
)

return results

Monitoring & Observability

Metrics Collection

# Qdrant provides built-in metrics endpoint
# Accessible at /metrics in Prometheus format

import requests
import time

class QdrantMonitor:
def __init__(self, base_url="http://localhost:6333"):
self.base_url = base_url

def get_metrics(self):
"""Get Prometheus metrics"""
response = requests.get(f"{self.base_url}/metrics")
return response.text

def get_collection_stats(self, collection_name):
"""Get collection-specific statistics"""
client = QdrantClient(host="localhost", port=6333)
collection_info = client.get_collection(collection_name)

return {
"points_count": collection_info.points_count,
"vectors_count": collection_info.vectors_count,
"status": collection_info.status,
"config": collection_info.config.dict(),
"indexed_vectors_count": collection_info.indexed_vectors_count
}

def monitor_performance(self, interval=60):
"""Continuously monitor performance"""
while True:
stats = self.get_collection_stats("documents")

# Calculate performance indicators
indexing_rate = stats.get("points_count", 0) / interval
memory_usage = self.calculate_memory_usage(stats)

print(f"Points: {stats['points_count']}")
print(f"Indexing rate: {indexing_rate:.2f} points/sec")
print(f"Memory usage: {memory_usage:.2f} MB")

time.sleep(interval)

def calculate_memory_usage(self, stats):
"""Estimate memory usage based on collection stats"""
# Rough estimation
vector_size = 384 * 4 # bytes per vector (float32)
points_count = stats.get("points_count", 0)

return (points_count * vector_size) / (1024 * 1024) # MB

Logging & Debugging

import logging
from qdrant_client import QdrantClient

# Configure logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

logger = logging.getLogger('qdrant')

class MonitoredQdrantClient(QdrantClient):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.logger = logging.getLogger('qdrant.client')

def search(self, collection_name, *args, **kwargs):
start_time = time.time()

try:
result = super().search(collection_name, *args, **kwargs)

# Log search metrics
duration = time.time() - start_time
self.logger.info(
f"Search in {collection_name}: "
f"limit={kwargs.get('limit', 10)}, "
f"duration={duration:.3f}s, "
f"results={len(result)}"
)

return result

except Exception as e:
self.logger.error(
f"Search failed in {collection_name}: {str(e)}"
)
raise

def upsert(self, collection_name, points):
start_time = time.time()

try:
result = super().upsert(collection_name, points)

duration = time.time() - start_time
self.logger.info(
f"Upsert to {collection_name}: "
f"points={len(points)}, "
f"duration={duration:.3f}s"
)

return result

except Exception as e:
self.logger.error(
f"Upsert failed in {collection_name}: {str(e)}"
)
raise

Alerting

import smtplib
from email.mime.text import MIMEText
from prometheus_client import start_http_server, Gauge

class QdrantAlertManager:
def __init__(self):
self.alerts = []

# Prometheus metrics
self.points_count = Gauge('qdrant_points_count', 'Number of points in collection')
self.search_latency = Gauge('qdrant_search_latency_seconds', 'Search latency')
self.error_rate = Gauge('qdrant_error_rate', 'Error rate')

def check_health(self, client):
"""Perform health checks"""
alerts = []

try:
# Check collection status
collections = client.get_collections()
for collection in collections.collections:
info = client.get_collection(collection.name)

# Check for low disk space
if info.points_count > 1000000: # Threshold
alerts.append({
"type": "warning",
"message": f"Collection {collection.name} has {info.points_count} points"
})

# Update metrics
self.points_count.set(info.points_count)

# Test search latency
start = time.time()
client.search("test_collection", [0] * 384, limit=1)
latency = time.time() - start
self.search_latency.set(latency)

if latency > 1.0: # 1 second threshold
alerts.append({
"type": "critical",
"message": f"High search latency: {latency:.2f}s"
})

except Exception as e:
self.error_rate.inc()
alerts.append({
"type": "critical",
"message": f"Health check failed: {str(e)}"
})

return alerts

def send_alert(self, alert):
"""Send alert via email"""
msg = MIMEText(f"""
Qdrant Alert: {alert['type'].upper()}

Message: {alert['message']}
Timestamp: {datetime.utcnow().isoformat()}
""")

msg['Subject'] = f"Qdrant Alert: {alert['type'].upper()}"
msg['From'] = "qdrant-monitor@example.com"
msg['To'] = "admin@example.com"

# Send email (configure SMTP settings)
with smtplib.SMTP('localhost') as server:
server.send_message(msg)

def start_monitoring(self, interval=60):
"""Start continuous monitoring"""
start_http_server(8000) # Expose metrics

client = QdrantClient(host="localhost", port=6333)

while True:
alerts = self.check_health(client)

for alert in alerts:
self.send_alert(alert)
self.alerts.append(alert)

time.sleep(interval)

Security Best Practices

Authentication & Authorization

# Configure API key authentication
"""
config.yaml:
service:
api_key: "your-secret-api-key"

# Or use environment variable
export QDRANT_SERVICE__API_KEY="your-secret-api-key"
"""

# Use authenticated client
from qdrant_client import QdrantClient

client = QdrantClient(
host="localhost",
port=6333,
api_key="your-secret-api-key"
)

# JWT authentication (for enterprise)
client = QdrantClient(
host="localhost",
port=6333,
jwt_token="your-jwt-token"
)

Network Security

# Configure HTTPS
"""
config.yaml:
service:
enable_https: true
https_cert_path: "/path/to/cert.pem"
https_key_path: "/path/to/key.pem"
"""

# Use client with HTTPS
client = QdrantClient(
host="your-qdrant-domain.com",
port=6333,
https=True,
verify=True, # Verify SSL certificate
timeout=30
)

# Network restrictions
"""
config.yaml:
service:
host: "127.0.0.1" # Listen only on localhost
http_port: 6333

# Or use specific interface
service:
host: "10.0.0.100" # Internal network only
"""

Data Encryption

# Enable encryption at rest (enterprise feature)
"""
config.yaml:
storage:
encrypt_storage: true
encryption_key_path: "/path/to/encryption.key"
"""

# Client-side encryption for sensitive payloads
from cryptography.fernet import Fernet

class EncryptedQdrantClient:
def __init__(self, encryption_key):
self.client = QdrantClient(host="localhost", port=6333)
self.cipher = Fernet(encryption_key)

def encrypt_payload(self, payload):
"""Encrypt sensitive fields"""
encrypted = payload.copy()
if 'sensitive_data' in encrypted:
data = json.dumps(encrypted['sensitive_data'])
encrypted['sensitive_data'] = self.cipher.encrypt(
data.encode()
).decode()
return encrypted

def decrypt_payload(self, payload):
"""Decrypt sensitive fields"""
decrypted = payload.copy()
if 'sensitive_data' in decrypted:
data = self.cipher.decrypt(
decrypted['sensitive_data'].encode()
).decode()
decrypted['sensitive_data'] = json.loads(data)
return decrypted

def upsert(self, collection_name, points):
"""Upsert with encryption"""
encrypted_points = []
for point in points:
point.payload = self.encrypt_payload(point.payload)
encrypted_points.append(point)

return self.client.upsert(collection_name, encrypted_points)

Access Control

# Implement RBAC (Role-Based Access Control)
class QdrantAccessManager:
def __init__(self):
self.roles = {
"admin": ["read", "write", "delete", "manage"],
"writer": ["read", "write"],
"reader": ["read"]
}
self.users = {
"user1": {"role": "admin"},
"user2": {"role": "writer"},
"user3": {"role": "reader"}
}

def check_permission(self, user, operation):
"""Check if user has permission for operation"""
role = self.users.get(user, {}).get("role")
if not role:
return False

return operation in self.roles.get(role, [])

def authorize_operation(self, user, operation, collection_name):
"""Authorize operation on collection"""
if not self.check_permission(user, operation):
raise PermissionError(f"User {user} cannot {operation} on {collection_name}")

return True

# Usage example
access_manager = QdrantAccessManager()

class SecureQdrantClient(QdrantClient):
def __init__(self, user, *args, **kwargs):
super().__init__(*args, **kwargs)
self.user = user
self.access_manager = QdrantAccessManager()

def search(self, collection_name, *args, **kwargs):
self.access_manager.authorize_operation(self.user, "read", collection_name)
return super().search(collection_name, *args, **kwargs)

def upsert(self, collection_name, points):
self.access_manager.authorize_operation(self.user, "write", collection_name)
return super().upsert(collection_name, points)

Real-World Applications

class ProductSearchEngine:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.text_encoder = SentenceTransformer('all-MiniLM-L6-v2')
self.image_encoder = self.load_image_encoder()

def index_product(self, product):
# Create multi-modal embeddings
text_embedding = self.text_encoder.encode(
f"{product['title']} {product['description']} {' '.join(product['tags'])}"
).tolist()

image_embedding = None
if product.get('image_url'):
image_embedding = self.encode_image(product['image_url'])

# Store product with rich metadata
point = PointStruct(
id=product['id'],
vector={
"text": text_embedding,
"image": image_embedding
} if image_embedding else {"text": text_embedding},
payload={
"title": product['title'],
"description": product['description'],
"price": product['price'],
"category": product['category'],
"brand": product['brand'],
"tags": product['tags'],
"in_stock": product['in_stock'],
"rating": product.get('rating', 0),
"popularity_score": product.get('popularity_score', 0)
}
)

self.client.upsert("products", [point])

def search_products(self, query, filters=None, limit=20):
# Generate query embedding
query_embedding = self.text_encoder.encode(query).tolist()

# Build filter
query_filter = None
if filters:
conditions = []

if 'category' in filters:
conditions.append({
"key": "category",
"match": {"value": filters['category']}
})

if 'price_range' in filters:
conditions.append({
"key": "price",
"range": filters['price_range']
})

if 'in_stock' in filters:
conditions.append({
"key": "in_stock",
"match": {"value": filters['in_stock']}
})

if conditions:
query_filter = Filter(must=conditions)

# Search with boosting
results = self.client.search(
collection_name="products",
query_vector={"text": query_embedding},
query_filter=query_filter,
limit=limit,
with_payload=True,
score_threshold=0.3
)

# Re-rank based on business rules
reranked_results = []
for hit in results:
# Apply business logic boosts
boost = 1.0

# Boost popular items
popularity = hit.payload.get('popularity_score', 0)
boost += popularity * 0.1

# Boost in-stock items
if hit.payload.get('in_stock', False):
boost += 0.2

# Boost highly rated items
rating = hit.payload.get('rating', 0)
if rating >= 4.0:
boost += 0.15

hit.score *= boost
reranked_results.append(hit)

# Sort by final score
reranked_results.sort(key=lambda x: x.score, reverse=True)

return reranked_results[:limit]

def get_recommendations(self, user_id, product_id, limit=10):
# Get similar products
similar = self.client.recommend(
collection_name="products",
positive=[product_id],
limit=limit * 2,
with_payload=True
)

# Filter based on user preferences
user_prefs = self.get_user_preferences(user_id)

filtered_results = []
for hit in similar:
# Apply user preference filters
if user_prefs.get('preferred_brands'):
if hit.payload.get('brand') in user_prefs['preferred_brands']:
hit.score *= 1.2

if user_prefs.get('price_range'):
price = hit.payload.get('price', 0)
if user_prefs['price_range'][0] <= price <= user_prefs['price_range'][1]:
hit.score *= 1.1

filtered_results.append(hit)

# Sort and return
filtered_results.sort(key=lambda x: x.score, reverse=True)
return filtered_results[:limit]

Document Analysis System

class DocumentAnalysisSystem:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.text_encoder = SentenceTransformer('all-mpnet-base-v2')
self.nlp = spacy.load("en_core_web_lg")

def process_document(self, file_path, metadata):
# Extract text from document
text = self.extract_text(file_path)

# Split into chunks
chunks = self.chunk_document(text)

# Process each chunk
points = []
for i, chunk in enumerate(chunks):
# Generate embedding
embedding = self.text_encoder.encode(chunk).tolist()

# Extract entities
entities = self.extract_entities(chunk)

# Generate summary
summary = self.generate_summary(chunk)

points.append(PointStruct(
id=f"{metadata['doc_id']}_chunk_{i}",
vector=embedding,
payload={
"doc_id": metadata['doc_id'],
"chunk_id": i,
"text": chunk,
"summary": summary,
"entities": entities,
"doc_type": metadata['doc_type'],
"author": metadata.get('author'),
"date": metadata.get('date'),
"source_file": file_path
}
))

# Store chunks
self.client.upsert("document_chunks", points)

# Store document metadata
self.client.upsert("documents", [PointStruct(
id=metadata['doc_id'],
vector=self.text_encoder.encode(text[:2000]).tolist(), # Document summary embedding
payload=metadata
)])

def semantic_search(self, query, doc_type=None, date_range=None, limit=20):
# Generate query embedding
query_embedding = self.text_encoder.encode(query).tolist()

# Build filter
conditions = []
if doc_type:
conditions.append({"key": "doc_type", "match": {"value": doc_type}})

if date_range:
conditions.append({
"key": "date",
"range": date_range
})

query_filter = Filter(must=conditions) if conditions else None

# Search for relevant chunks
results = self.client.search(
collection_name="document_chunks",
query_vector=query_embedding,
query_filter=query_filter,
limit=limit,
with_payload=True,
group_by="doc_id",
group_size=3
)

return results

def entity_search(self, entity_type, entity_value, limit=10):
# Search by extracted entities
results = self.client.search(
collection_name="document_chunks",
query_filter=Filter(
must=[
{
"key": "entities.type",
"match": {"value": entity_type}
},
{
"key": "entities.text",
"match": {"value": entity_value}
}
]
),
limit=limit,
with_payload=True
)

return results

def find_similar_documents(self, doc_id, limit=10):
# Find documents similar to given document
doc_info = self.client.retrieve(
collection_name="documents",
ids=[doc_id],
with_vectors=True
)[0]

if doc_info.vector:
similar = self.client.search(
collection_name="documents",
query_vector=doc_info.vector,
limit=limit,
with_payload=True,
score_threshold=0.5
)
return similar

return []

def generate_document_summary(self, doc_id):
# Retrieve all chunks
chunks = self.client.scroll(
collection_name="document_chunks",
scroll_request=ScrollRequest(
filter=Filter(
must=[{"key": "doc_id", "match": {"value": doc_id}}]
),
limit=100,
with_payload=True
)
)[0]

# Sort by chunk ID and concatenate
chunks.sort(key=lambda x: x.payload['chunk_id'])
full_text = " ".join([chunk.payload['text'] for chunk in chunks])

# Generate summary (could use LLM here)
summary = self.generate_summary(full_text, max_length=500)

return {
"doc_id": doc_id,
"summary": summary,
"chunk_count": len(chunks),
"total_length": len(full_text)
}

Fraud Detection System

class FraudDetectionSystem:
def __init__(self):
self.client = QdrantClient(host="localhost", port=6333)
self.feature_extractor = TransactionFeatureExtractor()

def record_transaction(self, transaction):
# Extract features for vector embedding
features = self.feature_extractor.extract(transaction)

# Create transaction vector
vector = self.create_transaction_vector(features, transaction)

# Store with rich metadata
point = PointStruct(
id=transaction['transaction_id'],
vector=vector,
payload={
"transaction_id": transaction['transaction_id'],
"user_id": transaction['user_id'],
"amount": transaction['amount'],
"merchant": transaction['merchant'],
"category": transaction['category'],
"timestamp": transaction['timestamp'],
"location": transaction.get('location'),
"device_id": transaction.get('device_id'),
"ip_address": transaction.get('ip_address'),
"features": features,
"is_fraud": transaction.get('is_fraud', False),
"risk_score": 0.0 # Will be updated
}
)

self.client.upsert("transactions", [point])

# Check for fraud in real-time
return self.check_fraud_risk(transaction, vector)

def check_fraud_risk(self, transaction, vector):
# Find similar transactions
similar_transactions = self.client.search(
collection_name="transactions",
query_vector=vector,
limit=10,
query_filter=Filter(
must_not=[
{"key": "transaction_id", "match": {"value": transaction['transaction_id']}}
]
),
with_payload=True
)

# Calculate risk score
risk_score = self.calculate_risk_score(transaction, similar_transactions)

# Update risk score in database
self.client.set_payload(
collection_name="transactions",
payload={"risk_score": risk_score},
points=[transaction['transaction_id']]
)

# Check thresholds
if risk_score > 0.8:
return {"risk": "high", "score": risk_score, "action": "block"}
elif risk_score > 0.6:
return {"risk": "medium", "score": risk_score, "action": "review"}
else:
return {"risk": "low", "score": risk_score, "action": "allow"}

def calculate_risk_score(self, transaction, similar_transactions):
risk_score = 0.0

# Check for unusual amounts
if similar_transactions:
avg_amount = sum(t.payload['amount'] for t in similar_transactions) / len(similar_transactions)
if transaction['amount'] > avg_amount * 3:
risk_score += 0.3

# Check for unusual locations
transaction_location = transaction.get('location')
if transaction_location:
location_distances = []
for t in similar_transactions:
if t.payload.get('location'):
distance = self.calculate_distance(
transaction_location,
t.payload['location']
)
location_distances.append(distance)

if location_distances and min(location_distances) > 1000: # 1000 km
risk_score += 0.4

# Check for unusual timing
transaction_time = datetime.fromisoformat(transaction['timestamp'])
if transaction_time.hour < 6 or transaction_time.hour > 23:
risk_score += 0.2

# Check fraud rate in similar transactions
fraud_count = sum(1 for t in similar_transactions if t.payload.get('is_fraud'))
if similar_transactions:
fraud_rate = fraud_count / len(similar_transactions)
risk_score += fraud_rate * 0.3

# Check device/IP anomalies
if self.is_new_device_for_user(
transaction['user_id'],
transaction.get('device_id')
):
risk_score += 0.2

if self.is_new_ip_for_user(
transaction['user_id'],
transaction.get('ip_address')
):
risk_score += 0.2

return min(risk_score, 1.0)

def detect_fraud_patterns(self):
# Analyze patterns across all transactions
patterns = []

# Look for high-frequency transactions
high_freq_users = self.client.search(
collection_name="transactions",
query_vector=[0] * 128, # Dummy vector
limit=1000,
with_payload=True,
group_by="user_id",
group_size=1,
score_threshold=0
)

for group in high_freq_users:
if group.hits[0].payload.get('transaction_count', 0) > 50: # Threshold
patterns.append({
"type": "high_frequency",
"user_id": group.id,
"count": group.hits[0].payload.get('transaction_count')
})

# Look for unusual merchant patterns
unusual_merchants = self.client.scroll(
collection_name="transactions",
scroll_request=ScrollRequest(
limit=10000,
with_payload=["merchant", "amount", "timestamp"]
)
)[0]

# Analyze merchant patterns
merchant_stats = {}
for point in unusual_merchants:
merchant = point.payload['merchant']
if merchant not in merchant_stats:
merchant_stats[merchant] = {
"count": 0,
"total_amount": 0,
"unique_users": set()
}

merchant_stats[merchant]["count"] += 1
merchant_stats[merchant]["total_amount"] += point.payload['amount']
merchant_stats[merchant]["unique_users"].add(point.payload['user_id'])

# Flag suspicious merchants
for merchant, stats in merchant_stats.items():
if stats["count"] > 100 and len(stats["unique_users"]) < 5:
patterns.append({
"type": "suspicious_merchant",
"merchant": merchant,
"transaction_count": stats["count"],
"unique_users": len(stats["unique_users"])
})

return patterns

Troubleshooting

Common Issues

# Memory issues
# Problem: Out of memory errors
# Solution 1: Use on-disk storage
client.create_collection(
collection_name="large_collection",
vectors_config=VectorParams(size=1536, distance=Distance.COSINE),
on_disk=True,
quantization_config=QuantizationConfig(
scalar=ScalarQuantization(type="scalar", always_ram=False)
)
)

# Solution 2: Optimize HNSW parameters
client.update_collection(
collection_name="large_collection",
hnsw_config=HnswConfigDiff(
m=16, # Reduce from default
ef_construct=100,
max_indexing_threads=4 # Limit threads
)
)

# Slow search performance
# Solution: Check and optimize configuration
collection_info = client.get_collection("my_collection")
print(f"Indexed vectors: {collection_info.indexed_vectors_count}")

# If not all vectors indexed, wait for indexing to complete
# Or increase indexing threads
client.update_collection(
collection_name="my_collection",
optimizer_config=OptimizersConfig(
max_indexing_threads=8 # Use more threads
)
)

# Connection issues
# Solution: Check network and timeouts
try:
client = QdrantClient(
host="localhost",
port=6333,
timeout=30.0, # Increase timeout
prefer_grpc=False # Try HTTP if gRPC fails
)
# Test connection
client.get_collections()
except Exception as e:
print(f"Connection error: {e}")
# Check if Qdrant is running
# Check firewall settings

Debug Mode

# Enable debug logging
import logging

logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger('qdrant_client')

# Create client with debug
client = QdrantClient(
host="localhost",
port=6333,
log_level=logging.DEBUG
)

# Monitor operations
class DebugQdrantClient(QdrantClient):
def search(self, collection_name, *args, **kwargs):
start_time = time.time()

# Log search parameters
logger.debug(f"Searching in {collection_name}")
logger.debug(f"Limit: {kwargs.get('limit', 10)}")
logger.debug(f"Filter: {kwargs.get('query_filter')}")

try:
result = super().search(collection_name, *args, **kwargs)

duration = time.time() - start_time
logger.debug(f"Search completed in {duration:.3f}s")
logger.debug(f"Results: {len(result)}")

return result

except Exception as e:
logger.error(f"Search failed: {str(e)}")
raise

Performance Profiling

import cProfile
import pstats

def profile_qdrant_operation():
def decorator(func):
def wrapper(*args, **kwargs):
# Profile the function
profiler = cProfile.Profile()
profiler.enable()

result = func(*args, **kwargs)

profiler.disable()

# Print stats
stats = pstats.Stats(profiler)
stats.sort_stats('cumulative')
stats.print_stats(20)

return result
return wrapper
return decorator

# Usage
@profile_qdrant_operation()
def batch_upload():
points = generate_test_points(10000)
client.upsert("test_collection", points)

# Or profile specific operations
def profile_search():
query_vector = generate_test_vector(384)

with cProfile.Profile() as pr:
for _ in range(100):
client.search(
"test_collection",
query_vector,
limit=10
)

# Save stats to file
pr.dump_stats('qdrant_search_profile.prof')

Recovery & Repair

# Recover from corrupted collection
def repair_collection(client, collection_name):
try:
# Try to get collection info
info = client.get_collection(collection_name)

# If successful, create backup
snapshot = client.create_snapshot(collection_name)
print(f"Created snapshot: {snapshot.name}")

except Exception as e:
print(f"Collection is corrupted: {e}")

# Options:
# 1. Restore from snapshot
snapshots = client.list_snapshots(collection_name)
if snapshots:
client.restore_snapshot(collection_name, snapshots[-1].name)
print("Restored from latest snapshot")

# 2. Recreate collection (if you have the data)
else:
print("No snapshots available. Collection may need to be recreated.")

Best Practices

Collection Design

# Choose appropriate vector dimensions
# Common dimensions:
# - all-MiniLM-L6-v2: 384
# - text-embedding-ada-002: 1536
# - BERT-base: 768
# - Custom models: variable

# Use descriptive collection names
# Good: user_embeddings, product_vectors, document_chunks
# Bad: vec1, test, temp

# Plan for growth
# Estimate your needs:
# - Small project (<1M vectors): Single node
# - Medium project (1M-10M): Consider sharding
# - Large project (>10M): Distributed cluster

# Set appropriate distance metrics
# - COSINE: Text embeddings, normalized vectors
# - EUCLID: General purpose, unnormalized
# - DOT: Normalized vectors (same as COSINE)
# - MANHATTAN: High-dimensional spaces

collection_config = {
"user_profiles": {
"dimensions": 384,
"distance": Distance.COSINE,
"on_disk": False,
"quantization": None
},
"product_catalog": {
"dimensions": 512,
"distance": Distance.EUCLID,
"on_disk": True,
"quantization": QuantizationConfig(
scalar=ScalarQuantization(type="scalar")
)
}
}

Data Management

# Use UUIDs for point IDs
import uuid

# Good
point_id = str(uuid.uuid4())

# Bad for distributed systems
point_id = incremental_number

# Version your data schema
payload_schema = {
"version": "1.0",
"fields": {
"title": {"type": "string", "required": True},
"content": {"type": "string", "required": True},
"metadata": {"type": "object", "optional": True}
}
}

# Implement data validation
def validate_payload(payload, schema):
"""Validate payload against schema"""
for field, config in schema["fields"].items():
if config.get("required") and field not in payload:
raise ValueError(f"Required field missing: {field}")

return True

# Use consistent timestamp formats
from datetime import datetime, timezone

timestamp = datetime.now(timezone.utc).isoformat()
# "2024-01-01T12:00:00+00:00"

Performance Optimization

# Batch operations
def batch_upsert(client, collection_name, points, batch_size=100):
"""Upload points in batches"""
for i in range(0, len(points), batch_size):
batch = points[i:i + batch_size]
client.upsert(collection_name, batch)

# Optional: Add delay to avoid overwhelming
# time.sleep(0.1)

# Use appropriate HNSW parameters
hnsw_configs = {
"small": { # <10K vectors
"m": 16,
"ef_construct": 100,
"ef": 64
},
"medium": { # 10K-1M vectors
"m": 32,
"ef_construct": 200,
"ef": 128
},
"large": { # >1M vectors
"m": 64,
"ef_construct": 400,
"ef": 256
}
}

# Monitor and adjust
def optimize_hnsw_parameters(client, collection_name, vector_count):
"""Adjust HNSW parameters based on collection size"""
if vector_count < 10000:
config = hnsw_configs["small"]
elif vector_count < 1000000:
config = hnsw_configs["medium"]
else:
config = hnsw_configs["large"]

client.update_collection(
collection_name,
hnsw_config=HnswConfigDiff(**config)
)

Query Optimization

# Use filters effectively
# Good: Use indexed fields for filtering
filter = Filter(
must=[
{
"key": "category",
"match": {"value": "electronics"} # Fast
}
]
)

# Bad: Filter on nested complex objects
filter = Filter(
must=[
{
"key": "metadata.attributes.specifications.weight",
"range": {"gte": 1.0} # Slow
}
]
)

# Limit returned payload
# Only request needed fields
client.search(
collection_name="products",
query_vector=query,
limit=10,
with_payload=["title", "price", "image_url"], # Only these fields
with_vectors=False # Don't need vectors in results
)

# Use score thresholds
client.search(
collection_name="documents",
query_vector=query,
limit=10,
score_threshold=0.5 # Skip low-similarity results
)

Monitoring & Maintenance

# Regular health checks
def health_check(client):
"""Perform comprehensive health check"""
checks = {
"connection": False,
"collections": [],
"disk_usage": {},
"memory_usage": {}
}

try:
# Check connection
client.get_collections()
checks["connection"] = True

# Check each collection
collections = client.get_collections()
for collection in collections.collections:
info = client.get_collection(collection.name)
checks["collections"].append({
"name": collection.name,
"status": info.status,
"points_count": info.points_count,
"indexed_vectors": info.indexed_vectors_count
})

# Check disk usage (implementation depends on OS)
checks["disk_usage"] = get_disk_usage()

# Estimate memory usage
checks["memory_usage"] = estimate_memory_usage(checks["collections"])

except Exception as e:
print(f"Health check failed: {e}")

return checks

# Implement alerting
def setup_alerts(client):
"""Set up monitoring alerts"""
# Monitor collection size
collections = client.get_collections()

for collection in collections.collections:
info = client.get_collection(collection.name)

# Alert if collection is too large
if info.points_count > 1000000: # 1M
send_alert(
f"Collection {collection.name} has {info.points_count} points"
)

# Alert if indexing is slow
if info.status != "green":
send_alert(
f"Collection {collection.name} status: {info.status}"
)

Security Best Practices

# Never commit credentials
# Use environment variables
import os

API_KEY = os.getenv("QDRANT_API_KEY")
if not API_KEY:
raise ValueError("QDRANT_API_KEY environment variable not set")

# Use HTTPS in production
client = QdrantClient(
host="your-qdrant-domain.com",
port=6333,
https=True,
api_key=API_KEY
)

# Implement rate limiting
from collections import defaultdict
import time

class RateLimiter:
def __init__(self, max_requests=100, window_seconds=60):
self.requests = defaultdict(list)
self.max_requests = max_requests
self.window = window_seconds

def is_allowed(self, user_id):
now = time.time()
user_requests = self.requests[user_id]

# Remove old requests
user_requests = [t for t in user_requests if now - t < self.window]
self.requests[user_id] = user_requests

return len(user_requests) < self.max_requests

# Validate all inputs
def validate_vector(vector, expected_dim):
"""Validate input vector"""
if not isinstance(vector, list):
raise TypeError("Vector must be a list")

if len(vector) != expected_dim:
raise ValueError(f"Vector dimension mismatch: expected {expected_dim}, got {len(vector)}")

if not all(isinstance(x, (int, float)) for x in vector):
raise TypeError("Vector must contain only numbers")

return True

Resources & Further Learning

Official Resources

Client Libraries

Integration Examples

Performance Guides

Community & Support

Tutorials & Courses


This primer covers Qdrant vector database implementation in 2025 with comprehensive coverage of features, performance optimization, and real-world applications. Qdrant continues to evolve rapidly, so stay updated with the latest releases and documentation for the most current features and best practices.