AI conversations reset every time you close the chat window. All context lost. All discoveries forgotten. Start from zero.
MCP solves this with persistent sessions. Conversations that actually continue where you left off.
Prerequisites
New to MCP? Start with these foundational posts:
- What is the Model Context Protocol? A Beginner’s Guide - Understanding MCP basics
- Breaking Down AI Silos: How MCP Enables True AI Interoperability - MCP’s approach to AI integration
This post builds on MCP fundamentals and assumes basic familiarity with Python async programming.
The Stateless Problem
Traditional AI APIs are stateless. Each request stands alone:
# Traditional approach - no memory between calls
response1 = openai.chat.completions.create(
messages=[{"role": "user", "content": "Analyze this code for bugs"}]
)
# Later... AI has no idea what code you're talking about
response2 = openai.chat.completions.create(
messages=[{"role": "user", "content": "What about the memory leak?"}]
)
# AI: "What memory leak? What code?"
Common workarounds include:
- Copying conversation histories between requests
- Maintaining context in application state
- Building prompt chains manually
- Saving conversations to files
These approaches fail at scale and break with complex multi-day interactions. Learn more about how MCP handles context exchange in our dedicated guide.
How MCP Sessions Work
MCP provides session management for AI interactions through a simple API. The official MCP specification defines the complete protocol:
from mcp.server.fastmcp import FastMCP
import json
from datetime import datetime
from pathlib import Path
from typing import Any
# Initialize MCP server
mcp = FastMCP("session_server")
# Session storage directory
SESSIONS_DIR = Path("./sessions")
SESSIONS_DIR.mkdir(exist_ok=True)
# Helper function to save session
async def save_session(session_id: str, data: dict) -> None:
"""Persist session to file storage"""
file_path = SESSIONS_DIR / f"{session_id}.json"
with open(file_path, 'w') as f:
json.dump(data, f, indent=2, default=str)
# Create a persistent session
@mcp.tool()
async def create_session(session_id: str, project: str = "", metadata: dict = None) -> dict:
"""Create a new debugging session with persistent storage"""
session_data = {
"id": session_id,
"created_at": datetime.now().isoformat(),
"state": "active",
"project": project,
"metadata": metadata or {},
"context": {},
"conversation_history": [],
"discoveries": []
}
# Store session data
await save_session(session_id, session_data)
return {"status": "created", "session_id": session_id, "project": project}
@mcp.tool()
async def add_context(session_id: str, key: str, value: Any) -> dict:
"""Add context to an existing session"""
# Load session
file_path = SESSIONS_DIR / f"{session_id}.json"
with open(file_path, 'r') as f:
session = json.load(f)
# Add context
session["context"][key] = value
# Save updated session
await save_session(session_id, session)
return {"status": "context_added", "key": key}
Sessions work through three core mechanisms:
1. Session Lifecycle
Sessions in MCP follow a clear lifecycle:
State | Description | What Happens |
---|---|---|
Initialize | Session created | Unique ID generated, metadata stored |
Active | Conversation ongoing | Context accumulates, state updates |
Suspend | Temporarily paused | State saved, resources freed |
Resume | Reactivated | State restored, context available |
Terminate | Permanently ended | Optional archival, cleanup |
# MCP session lifecycle implementation
from mcp.server.fastmcp import FastMCP
import uuid
from enum import Enum
class SessionState(Enum):
ACTIVE = "active"
SUSPENDED = "suspended"
TERMINATED = "terminated"
# FastMCP server with session management
mcp = FastMCP("session_manager")
# Active sessions in memory
active_sessions = {}
@mcp.tool()
async def create_session(session_id: str = None) -> dict:
"""Create new session with automatic ID if needed"""
if not session_id:
session_id = f"session-{uuid.uuid4()}"
session_data = {
"id": session_id,
"created_at": datetime.now().isoformat(),
"state": SessionState.ACTIVE.value,
"context": {}
}
active_sessions[session_id] = session_data
await storage.save(session_id, session_data)
return {"session_id": session_id, "status": "created"}
@mcp.tool()
async def suspend_session(session_id: str) -> dict:
"""Save session state and free memory"""
session = active_sessions.get(session_id)
if not session:
return {"error": f"Session {session_id} not found"}
session["state"] = SessionState.SUSPENDED.value
session["suspended_at"] = datetime.now().isoformat()
# Save to persistent storage
await storage.save(session_id, session)
# Free memory
del active_sessions[session_id]
return {"session_id": session_id, "status": "suspended"}
@mcp.tool()
async def resume_session(session_id: str) -> dict:
"""Restore session from storage"""
# Load from storage
session = await storage.load(session_id)
if not session:
return {"error": f"Session {session_id} not found"}
# Update state
session["state"] = SessionState.ACTIVE.value
session["resumed_at"] = datetime.now().isoformat()
# Cache in memory
active_sessions[session_id] = session
return {
"session_id": session_id,
"status": "resumed",
"context_items": len(session.get("context", {}))
}
2. State Persistence
MCP supports multiple storage backends for different use cases:
# In-memory storage - fast but temporary
class InMemoryStorage:
def __init__(self):
self.sessions = {}
self.contexts = {}
async def save_session(self, session: Session):
self.sessions[session.id] = session
async def save_context(self, session_id: str, context: dict):
self.contexts[session_id] = context
# File-based storage - simple persistence
class FileStorage:
def __init__(self, base_path: str):
self.base_path = Path(base_path)
self.base_path.mkdir(exist_ok=True)
async def save_session(self, session: Session):
session_file = self.base_path / f"{session.id}.json"
session_data = {
"id": session.id,
"created_at": session.created_at.isoformat(),
"state": session.state.value,
"metadata": session.metadata
}
with open(session_file, 'w') as f:
json.dump(session_data, f, indent=2)
async def save_context(self, session_id: str, context: dict):
context_file = self.base_path / f"{session_id}_context.json"
# Handle large contexts with compression
if len(json.dumps(context)) > 1_000_000: # 1MB
with gzip.open(f"{context_file}.gz", 'wt') as f:
json.dump(context, f)
else:
with open(context_file, 'w') as f:
json.dump(context, f, indent=2)
# Database storage - production-ready
class DatabaseStorage:
def __init__(self, connection_string: str):
self.db = DatabaseConnection(connection_string)
async def save_session(self, session: Session):
await self.db.execute("""
INSERT INTO mcp_sessions (id, created_at, state, metadata)
VALUES (?, ?, ?, ?)
ON CONFLICT (id) DO UPDATE SET
state = excluded.state,
metadata = excluded.metadata
""", [session.id, session.created_at, session.state.value,
json.dumps(session.metadata)])
async def save_context(self, session_id: str, context: dict):
# Store context in chunks for better performance
for key, value in context.items():
await self.db.execute("""
INSERT INTO mcp_context (session_id, key, value, updated_at)
VALUES (?, ?, ?, ?)
ON CONFLICT (session_id, key) DO UPDATE SET
value = excluded.value,
updated_at = excluded.updated_at
""", [session_id, key, json.dumps(value), datetime.now()])
3. Context Accumulation
The real power comes from how context builds over time:
class Session:
def __init__(self, id: str, **kwargs):
self.id = id
self.context = {}
self.conversation_history = []
self.discoveries = {}
self.metadata = kwargs
async def add_context(self, key: str, value: Any):
"""Add or update context information"""
self.context[key] = {
"value": value,
"added_at": datetime.now(),
"type": type(value).__name__
}
async def add_message(self, role: str, content: str, metadata: dict = None):
"""Track conversation with metadata"""
message = {
"role": role,
"content": content,
"timestamp": datetime.now(),
"metadata": metadata or {}
}
self.conversation_history.append(message)
# Keep history manageable
if len(self.conversation_history) > 1000:
# Archive old messages
await self._archive_old_messages()
async def add_discovery(self, key: str, discovery: dict):
"""Track important findings"""
self.discoveries[key] = {
"discovery": discovery,
"timestamp": datetime.now(),
"relevance": discovery.get("relevance", 1.0)
}
def get_relevant_context(self, query: str = None) -> dict:
"""Retrieve context intelligently"""
if not query:
return self.context
# Simple relevance scoring
relevant = {}
query_lower = query.lower()
for key, value in self.context.items():
if query_lower in key.lower() or \
query_lower in str(value.get("value", "")).lower():
relevant[key] = value
return relevant
Real-World Example: Multi-Day Debugging
Here’s how sessions work across multiple days:
# Example from multi_day_debug.py showing the 3-day debugging flow:
# Day 1: Initial investigation
async def day1_initial_investigation(self):
# Create new session for debugging task
self.session = {
"id": "debug-memory-leak-2025-09",
"project": "E-Commerce API",
"issue": "Memory leak in production",
"symptoms": {
"memory_growth": "100MB/hour",
"affects": "All API instances"
}
}
# Add initial context about the environment
await self.add_context("deployment_diff", {
"version": "v2.4.0",
"changes": [
"Added real-time analytics feature",
"Upgraded caching layer",
"New webhook processing system"
]
})
# Examine code and make discoveries
await self.add_discovery(
"In-memory cache has no eviction policy",
"The CacheManager stores everything in self.cache dictionary "
"but never removes old entries."
)
# Suspend at end of day
self.session["state"] = "suspended"
await self.save_session()
# Day 2: Resume and narrow down issue
async def day2_verification(self):
# Resume session - all context preserved
loaded = await self.load_session()
# Continue with new findings
await self.add_context("cache_analysis", {
"total_entries": 45000,
"memory_usage": "2.3GB",
"largest_entries": [
"user_analytics_*": "15MB each"
]
})
await self.add_discovery(
"Analytics data is being cached indefinitely",
"User analytics objects (15MB each) are cached but never expire."
)
# Day 3: Implement and verify fix
async def day3_implementation(self):
# Resume again
loaded = await self.load_session()
# Verify solution worked
await self.add_context("verification_metrics", {
"before": {
"memory_growth": "100MB/hour",
"peak_memory": "8GB before restart"
},
"after": {
"memory_growth": "0MB/hour (stable)",
"peak_memory": "1.2GB constant"
}
})
Terminal Session Example
Here’s what a real debugging session looks like in the terminal:
$ python multi_day_debug.py
=== Multi-Day Debugging Session Demo ===
This simulates a real debugging scenario across 3 days
showing how MCP sessions preserve context over time.
Starting Day 1...
============================================================
DAY 1: Tuesday, September 2, 2025 - 10:00 AM
============================================================
USER: Production API instances are consuming memory at 100MB/hour.
Started after deploying v2.4.0 last Friday. Need to find the leak.
ASSISTANT: I'll help you track down this memory leak. Let's start
systematically. First, let's check what changed in v2.4.0 deployment.
Checking database connections...
Examining cache implementation...
DISCOVERY: In-memory cache has no eviction policy
HYPOTHESIS: Memory leak caused by unbounded in-memory cache (confidence: 0.7)
End of Day 1 Summary:
Session saved: debug_sessions/debug-memory-leak-2025-09.json
Session suspended for the day
============================================================
DAY 2: Wednesday, September 3, 2025 - 9:30 AM
============================================================
Session loaded from: debug_sessions/debug-memory-leak-2025-09.json
Resuming debugging session...
Previous discoveries:
- In-memory cache has no eviction policy
- Webhook queue grows unbounded
USER: I added monitoring overnight. The cache is definitely growing -
it has 45,000 entries and using 2.3GB RAM.
ASSISTANT: That confirms our cache hypothesis. 45,000 entries in 2.3GB
means roughly 50KB per entry. Let's check what's being cached.
Session resumed with full context from yesterday!
Session Storage Format
Sessions are stored as JSON files with all context preserved:
{
"id": "debug-memory-leak-2025-09",
"created_at": "2025-09-02T10:00:00",
"state": "suspended",
"suspended_at": "2025-09-02T18:30:00",
"project": "E-Commerce API",
"issue": "Memory leak in production",
"environment": {
"platform": "AWS EC2 t3.large",
"runtime": "Python 3.11",
"framework": "FastAPI",
"database": "PostgreSQL 14"
},
"context": {
"deployment_diff": {
"value": {
"version": "v2.4.0",
"changes": [
"Added real-time analytics feature",
"Upgraded caching layer",
"New webhook processing system"
]
},
"added_at": "2025-09-02T10:15:00"
},
"cache_analysis": {
"value": {
"total_entries": 45000,
"memory_usage": "2.3GB",
"largest_entries": {
"user_analytics_*": "15MB each",
"report_data_*": "8MB each"
}
},
"added_at": "2025-09-03T10:00:00"
}
},
"discoveries": [
{
"title": "In-memory cache has no eviction policy",
"description": "The CacheManager stores everything but never removes old entries.",
"timestamp": "2025-09-02T14:30:00"
},
{
"title": "Analytics data is being cached indefinitely",
"description": "User analytics objects (15MB each) are cached but never expire.",
"timestamp": "2025-09-03T11:00:00"
}
],
"conversation_history": [
{
"role": "user",
"content": "Production API instances are consuming memory at 100MB/hour...",
"timestamp": "2025-09-02T10:00:00"
},
{
"role": "assistant",
"content": "I'll help you track down this memory leak...",
"timestamp": "2025-09-02T10:01:00"
}
]
}
State Persistence Patterns
Different applications need different persistence strategies:
Pattern 1: Ephemeral Sessions (In-Memory)
Best for: Quick interactions, testing, demos
# From storage_examples.py - InMemoryStorage class
class InMemoryStorage(SessionStorage):
def __init__(self):
self.sessions: Dict[str, dict] = {}
async def save(self, session_id: str, data: dict) -> bool:
self.sessions[session_id] = data
return True
# Usage:
storage = InMemoryStorage()
await storage.save("session-123", session_data)
Pattern 2: File-Based Persistence
Best for: Development, small teams, simple deployments
# From storage_examples.py - FileStorage with compression
class FileStorage(SessionStorage):
async def save(self, session_id: str, data: dict) -> bool:
try:
file_path = self.storage_path / f"{session_id}.json"
json_data = json.dumps(data, indent=2, default=str)
# Use compression for large sessions
if len(json_data) > 10000: # 10KB threshold
file_path = self.storage_path / f"{session_id}.json.gz"
with gzip.open(file_path, 'wt', encoding='utf-8') as f:
f.write(json_data)
else:
with open(file_path, 'w') as f:
f.write(json_data)
return True
except Exception as e:
return False
Pattern 3: Database-Backed Sessions
Best for: Desktop apps, small to medium projects, moderate traffic
# From storage_examples.py - DatabaseStorage with SQLite
class DatabaseStorage(SessionStorage):
def __init__(self, database_path: str = "./sessions.db"):
self.database_path = database_path
self.initialized = False
async def init_db(self):
async with aiosqlite.connect(self.database_path) as db:
await db.execute('''
CREATE TABLE IF NOT EXISTS mcp_sessions (
session_id TEXT PRIMARY KEY,
data TEXT NOT NULL,
created_at TEXT DEFAULT CURRENT_TIMESTAMP,
updated_at TEXT DEFAULT CURRENT_TIMESTAMP
)
''')
await db.commit()
self.initialized = True
async def save(self, session_id: str, data: dict) -> bool:
async with aiosqlite.connect(self.database_path) as db:
# Check if exists
cursor = await db.execute(
'SELECT session_id FROM mcp_sessions WHERE session_id = ?',
(session_id,)
)
exists = await cursor.fetchone() is not None
if exists:
await db.execute('''
UPDATE mcp_sessions
SET data = ?, updated_at = CURRENT_TIMESTAMP
WHERE session_id = ?
''', (json.dumps(data, default=str), session_id))
else:
await db.execute('''
INSERT INTO mcp_sessions (session_id, data)
VALUES (?, ?)
''', (session_id, json.dumps(data, default=str)))
await db.commit()
return True
async def cleanup_old_sessions(self, days: int = 30):
"""Remove sessions older than specified days"""
async with aiosqlite.connect(self.database_path) as db:
cursor = await db.execute('''
DELETE FROM mcp_sessions
WHERE datetime(updated_at) < datetime('now', ? || ' days')
''', (-days,))
await db.commit()
Database Schema Visualization
For production deployments with higher traffic, you might want a more robust schema:
Table: mcp_sessions | |||
---|---|---|---|
Column | Type | Constraints | Description |
session_id | VARCHAR(255) | PRIMARY KEY | Unique session identifier |
data | JSON/TEXT | NOT NULL | Serialized session data |
created_at | TIMESTAMP | DEFAULT NOW() | Session creation time |
updated_at | TIMESTAMP | DEFAULT NOW() | Last update time |
state | VARCHAR(50) | NOT NULL | active/suspended/terminated |
user_id | VARCHAR(255) | INDEX | Optional user association |
Table: mcp_context | |||
---|---|---|---|
Column | Type | Constraints | Description |
session_id | VARCHAR(255) | FOREIGN KEY | Reference to session |
key | VARCHAR(255) | NOT NULL | Context key name |
value | JSON/TEXT | NOT NULL | Context value data |
updated_at | TIMESTAMP | DEFAULT NOW() | Last update time |
Table: mcp_conversation_history | |||
---|---|---|---|
Column | Type | Constraints | Description |
id | SERIAL | PRIMARY KEY | Auto-incrementing ID |
session_id | VARCHAR(255) | FOREIGN KEY | Reference to session |
role | VARCHAR(50) | NOT NULL | user/assistant/system |
content | TEXT | NOT NULL | Message content |
timestamp | TIMESTAMP | DEFAULT NOW() | Message timestamp |
metadata | JSON | NULLABLE | Additional message data |
Common Pitfalls and Solutions
1. Context Bloat
Sessions can grow too large over time:
class SmartSession(Session):
MAX_CONTEXT_SIZE = 10_000_000 # 10MB
MAX_HISTORY_LENGTH = 1000
async def add_context(self, key: str, value: Any):
# Check size before adding
new_size = len(json.dumps(value))
current_size = sum(
len(json.dumps(v)) for v in self.context.values()
)
if current_size + new_size > self.MAX_CONTEXT_SIZE:
# Intelligently prune old context
await self._prune_context()
await super().add_context(key, value)
async def _prune_context(self):
"""Remove least recently used context"""
# Sort by last access time
sorted_context = sorted(
self.context.items(),
key=lambda x: x[1].get("accessed_at", x[1]["added_at"])
)
# Remove oldest 20%
remove_count = len(sorted_context) // 5
for key, _ in sorted_context[:remove_count]:
del self.context[key]
2. Session Recovery Failures
Handle recovery gracefully:
async def safe_resume_session(session_id: str) -> Optional[Session]:
try:
return await session_manager.resume_session(session_id)
except ValueError:
# Session not found
print(f"Session {session_id} not found, creating new one")
return await session_manager.create_session()
except json.JSONDecodeError:
# Corrupted session data
print(f"Session {session_id} corrupted, creating fresh")
await session_manager.delete_session(session_id)
return await session_manager.create_session()
except Exception as e:
# Unknown error
print(f"Failed to resume session: {e}")
return None
3. Concurrent Access
Prevent race conditions:
import asyncio
from collections import defaultdict
class ConcurrentSessionManager(SessionManager):
def __init__(self, storage_backend):
super().__init__(storage_backend)
self._locks = defaultdict(asyncio.Lock)
async def get_session(self, session_id: str) -> Session:
"""Thread-safe session access"""
async with self._locks[session_id]:
if session_id in self.active_sessions:
return self.active_sessions[session_id]
return await self.resume_session(session_id)
async def update_session(self, session_id: str, updates: dict):
"""Atomic session updates"""
async with self._locks[session_id]:
session = await self.get_session(session_id)
for key, value in updates.items():
if hasattr(session, key):
setattr(session, key, value)
await self.storage.save_session(session)
Performance Considerations
Sessions add overhead. Here’s how to minimize it:
1. Lazy Loading
Don’t load everything upfront:
class LazySession(Session):
def __init__(self, id: str, storage: Storage):
super().__init__(id)
self.storage = storage
self._context_loaded = False
self._history_loaded = False
@property
async def context(self):
if not self._context_loaded:
self._context = await self.storage.load_context(self.id)
self._context_loaded = True
return self._context
@property
async def conversation_history(self):
if not self._history_loaded:
self._history = await self.storage.load_history(self.id)
self._history_loaded = True
return self._history
2. Caching Strategies
Use caching for frequently accessed sessions:
from functools import lru_cache
import time
class CachedSessionManager(SessionManager):
def __init__(self, storage_backend, cache_ttl: int = 3600):
super().__init__(storage_backend)
self.cache_ttl = cache_ttl
self._cache = {}
self._cache_times = {}
async def get_session(self, session_id: str) -> Session:
# Check cache first
if session_id in self._cache:
cache_time = self._cache_times.get(session_id, 0)
if time.time() - cache_time < self.cache_ttl:
return self._cache[session_id]
# Load from storage
session = await super().get_session(session_id)
# Update cache
self._cache[session_id] = session
self._cache_times[session_id] = time.time()
return session
3. Efficient Serialization
Use efficient formats for large contexts:
import msgpack
import pickle
class EfficientStorage(FileStorage):
def get_serializer(self, data_size: int):
"""Choose serializer based on data characteristics"""
if data_size < 1000: # Small data
return json
elif data_size < 100_000: # Medium data
return msgpack
else: # Large data
return pickle
async def save_context(self, session_id: str, context: dict):
data = json.dumps(context)
serializer = self.get_serializer(len(data))
filename = self.base_path / f"{session_id}_context.{serializer.__name__}"
if serializer == json:
with open(filename, 'w') as f:
json.dump(context, f)
else:
with open(filename, 'wb') as f:
serializer.dump(context, f)
Testing Your Implementation
Always test session persistence thoroughly:
import pytest
import tempfile
from pathlib import Path
@pytest.fixture
async def session_manager():
with tempfile.TemporaryDirectory() as tmpdir:
storage = FileStorage(tmpdir)
manager = SessionManager(storage)
yield manager
@pytest.mark.asyncio
async def test_session_persistence(session_manager):
# Create and populate session
session = await session_manager.create_session("test-session")
await session.add_context("test_key", "test_value")
await session.add_message("user", "Test message")
# Suspend session
await session_manager.suspend_session("test-session")
# Verify session is not in memory
assert "test-session" not in session_manager.active_sessions
# Resume session
resumed = await session_manager.resume_session("test-session")
# Verify state persisted
assert resumed.id == "test-session"
assert resumed.context["test_key"]["value"] == "test_value"
assert len(resumed.conversation_history) == 1
assert resumed.conversation_history[0]["content"] == "Test message"
@pytest.mark.asyncio
async def test_concurrent_access(session_manager):
"""Test multiple concurrent session updates"""
session = await session_manager.create_session("concurrent-test")
# Simulate concurrent updates
async def update_context(key: str, value: str):
s = await session_manager.get_session("concurrent-test")
await s.add_context(key, value)
await session_manager.storage.save_session(s)
# Run updates concurrently
await asyncio.gather(
update_context("key1", "value1"),
update_context("key2", "value2"),
update_context("key3", "value3")
)
# Verify all updates applied
final = await session_manager.get_session("concurrent-test")
assert len(final.context) == 3
Integration with MCP Servers
Here’s how to add session support to your MCP server:
from mcp.server.fastmcp import FastMCP
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any
# Initialize FastMCP server
mcp = FastMCP("session_server")
# Session storage
SESSIONS_DIR = Path("./sessions")
SESSIONS_DIR.mkdir(exist_ok=True)
session_cache: Dict[str, dict] = {}
# Helper function to check if session exists
async def session_exists(session_id: str) -> bool:
"""Check if a session exists"""
file_path = SESSIONS_DIR / f"{session_id}.json"
return file_path.exists()
# Helper function to save session
async def save_session(session_id: str, data: dict) -> None:
"""Persist session to file storage"""
file_path = SESSIONS_DIR / f"{session_id}.json"
with open(file_path, 'w') as f:
json.dump(data, f, indent=2, default=str)
@mcp.tool()
async def create_session(
session_id: str,
project: str = "",
description: str = "",
metadata: dict = None
) -> dict:
"""Create a new persistent session"""
# Check if session already exists
if await session_exists(session_id):
return {
"error": f"Session '{session_id}' already exists",
"suggestion": "Use resume_session to continue existing session"
}
# Create session structure
session_data = {
"id": session_id,
"created_at": datetime.now().isoformat(),
"state": "active",
"project": project,
"description": description,
"metadata": metadata or {},
"context": {},
"conversation_history": [],
"discoveries": []
}
# Save to storage and cache
await save_session(session_id, session_data)
session_cache[session_id] = session_data
return {"status": "created", "session_id": session_id, "project": project}
This approach gives you:
- Simple file-based storage that persists between runs
- In-memory caching for performance
- Clear session lifecycle management
- Easy debugging (sessions are JSON files)
Production Deployment
For production systems, consider these additional factors:
1. Session Limits
Prevent resource exhaustion:
class ProductionSessionManager(SessionManager):
MAX_SESSIONS_PER_USER = 100
MAX_TOTAL_SESSIONS = 10000
SESSION_TTL_DAYS = 90
async def create_session(self, session_id: str = None,
user_id: str = None) -> Session:
# Check user limits
if user_id:
user_sessions = await self.count_user_sessions(user_id)
if user_sessions >= self.MAX_SESSIONS_PER_USER:
# Clean up old sessions
await self.cleanup_user_sessions(user_id)
# Check total limits
total_sessions = await self.count_total_sessions()
if total_sessions >= self.MAX_TOTAL_SESSIONS:
raise Exception("Session limit reached")
session = await super().create_session(session_id)
session.metadata["user_id"] = user_id
return session
2. Security Considerations
import hmac
import hashlib
class SecureSession(Session):
def __init__(self, id: str, secret_key: str):
super().__init__(id)
self.secret_key = secret_key
def generate_token(self) -> str:
"""Generate secure session token"""
message = f"{self.id}:{self.created_at.isoformat()}"
signature = hmac.new(
self.secret_key.encode(),
message.encode(),
hashlib.sha256
).hexdigest()
return f"{self.id}:{signature}"
@staticmethod
def verify_token(token: str, secret_key: str) -> str:
"""Verify and extract session ID from token"""
try:
session_id, signature = token.split(":")
# Verify signature...
return session_id
except:
raise ValueError("Invalid session token")
3. Monitoring and Metrics
class MonitoredSessionManager(SessionManager):
def __init__(self, storage_backend, metrics_client):
super().__init__(storage_backend)
self.metrics = metrics_client
async def create_session(self, session_id: str = None) -> Session:
start_time = time.time()
try:
session = await super().create_session(session_id)
# Track metrics
self.metrics.increment("sessions.created")
self.metrics.histogram(
"sessions.create_time",
time.time() - start_time
)
return session
except Exception as e:
self.metrics.increment("sessions.create_errors")
raise
async def get_session_stats(self) -> dict:
"""Get session statistics for monitoring"""
return {
"active_sessions": len(self.active_sessions),
"total_sessions": await self.count_total_sessions(),
"avg_session_size": await self.get_average_session_size(),
"oldest_session": await self.get_oldest_session_age()
}
Code Examples
All the code from this post is available in the documentation:
The complete code examples are available on our GitHub repository. To run them:
# Clone the examples repository
git clone https://github.com/angrysharkstudio/mcp-sessions-examples
cd mcp-sessions-examples
# Install dependencies
pip install -r requirements.txt
# Run the examples:
python session_demo.py # Simple session demonstration
python storage_examples.py # Storage backends comparison
python mcp_session_server.py # Complete MCP server
python multi_day_debug.py # 3-day debugging scenario
The examples include:
- session_demo.py: Simple demonstration of session concepts
- storage_examples.py: In-memory, file-based, and database storage
- mcp_session_server.py: Complete FastMCP server with session tools
- multi_day_debug.py: Realistic 3-day debugging scenario
- README.md: Setup instructions and detailed explanations
Code Examples on GitHub
All the code examples from this article are available on GitHub. You can clone the repository and run them locally:
MCP Sessions Examples Repository - Complete working code from this tutorial
Additional Resources
- MCP GitHub Organization - Official MCP repositories and SDKs
- FastMCP Documentation - Python SDK for building MCP servers
- MCP Specification - Complete protocol specification
Clone the examples and experiment with different storage backends:
git clone https://github.com/angrysharkstudio/mcp-sessions-examples.git
cd mcp-sessions-examples
pip install -r requirements.txt
python session_demo.py
What’s Next
Sessions are just one part of MCP’s power. They enable:
- Multi-agent workflows that maintain state
- Long-running AI tasks
- Complex debugging sessions
- Personalized AI assistants
Next week, we’ll explore how MCP handles standardization versus flexibility - letting you extend the protocol while maintaining compatibility.
FAQ
Does every AI interaction need a session?
No. Use sessions for multi-turn conversations, debugging, or when context matters. Simple one-off queries work fine without them.
What’s the storage overhead?
Depends on usage. Text conversations average 1-10KB per exchange. Code contexts can be larger. Plan for 1-10MB per active session.
Can I share sessions between users?
Yes, with proper access control. Useful for team debugging or collaborative AI interactions.
How long should sessions persist?
Depends on use case. Development sessions: weeks to months. Support conversations: 30-90 days. Temporary tasks: hours to days.
What about GDPR/privacy?
Session data may contain personal information. Implement proper data retention policies and user data export/deletion capabilities.
Need Expert MCP Implementation?
Struggling with AI session management or MCP integration? Our team specializes in:
- Custom MCP Server Development - Tailored session management for your use case
- AI Integration Architecture - Multi-model systems with persistent context
- Performance Optimization - Scale to thousands of concurrent sessions
- Enterprise MCP Solutions - Production-ready implementations with monitoring
Contact us for MCP consulting →
Stay Updated with AI Development
Get weekly insights on MCP, AI integration patterns, and practical implementation tips. Join developers building the next generation of AI applications.
Subscribe to our newsletter on the blog →
Building something interesting with MCP sessions? Share your implementation - we feature innovative projects in our newsletter.

About Angry Shark Studio
Angry Shark Studio is a professional Unity AR/VR development studio specializing in mobile multiplatform applications and AI solutions. Our team includes Unity Certified Expert Programmers with extensive experience in AR/VR development.
Related Articles
More Articles
Explore more insights on Unity AR/VR development, mobile apps, and emerging technologies.
View All Articles