Skip to main content
tutorial

Sessions, States, and Continuity: MCP's Secret Sauce for Persistent AI Conversations

Angry Shark Studio
16 min read
MCP AI Sessions State Management Context Persistence Tutorial Development Model Context Protocol AI Continuity

AI conversations reset every time you close the chat window. All context lost. All discoveries forgotten. Start from zero.

MCP solves this with persistent sessions. Conversations that actually continue where you left off.

Visual representation of persistent AI conversation threads maintaining context over multiple sessions versus disconnected stateless conversations

Prerequisites

New to MCP? Start with these foundational posts:

This post builds on MCP fundamentals and assumes basic familiarity with Python async programming.

The Stateless Problem

Traditional AI APIs are stateless. Each request stands alone:

# Traditional approach - no memory between calls
response1 = openai.chat.completions.create(
    messages=[{"role": "user", "content": "Analyze this code for bugs"}]
)

# Later... AI has no idea what code you're talking about
response2 = openai.chat.completions.create(
    messages=[{"role": "user", "content": "What about the memory leak?"}]
)
# AI: "What memory leak? What code?"

Common workarounds include:

  • Copying conversation histories between requests
  • Maintaining context in application state
  • Building prompt chains manually
  • Saving conversations to files

These approaches fail at scale and break with complex multi-day interactions. Learn more about how MCP handles context exchange in our dedicated guide.

How MCP Sessions Work

MCP provides session management for AI interactions through a simple API. The official MCP specification defines the complete protocol:

from mcp.server.fastmcp import FastMCP
import json
from datetime import datetime
from pathlib import Path
from typing import Any

# Initialize MCP server
mcp = FastMCP("session_server")

# Session storage directory
SESSIONS_DIR = Path("./sessions")
SESSIONS_DIR.mkdir(exist_ok=True)

# Helper function to save session
async def save_session(session_id: str, data: dict) -> None:
    """Persist session to file storage"""
    file_path = SESSIONS_DIR / f"{session_id}.json"
    with open(file_path, 'w') as f:
        json.dump(data, f, indent=2, default=str)

# Create a persistent session
@mcp.tool()
async def create_session(session_id: str, project: str = "", metadata: dict = None) -> dict:
    """Create a new debugging session with persistent storage"""
    session_data = {
        "id": session_id,
        "created_at": datetime.now().isoformat(),
        "state": "active",
        "project": project,
        "metadata": metadata or {},
        "context": {},
        "conversation_history": [],
        "discoveries": []
    }
    
    # Store session data
    await save_session(session_id, session_data)
    
    return {"status": "created", "session_id": session_id, "project": project}

@mcp.tool()
async def add_context(session_id: str, key: str, value: Any) -> dict:
    """Add context to an existing session"""
    # Load session
    file_path = SESSIONS_DIR / f"{session_id}.json"
    with open(file_path, 'r') as f:
        session = json.load(f)
    
    # Add context
    session["context"][key] = value
    
    # Save updated session
    await save_session(session_id, session)
    
    return {"status": "context_added", "key": key}

Sessions work through three core mechanisms:

1. Session Lifecycle

Sessions in MCP follow a clear lifecycle:

StateDescriptionWhat Happens
InitializeSession createdUnique ID generated, metadata stored
ActiveConversation ongoingContext accumulates, state updates
SuspendTemporarily pausedState saved, resources freed
ResumeReactivatedState restored, context available
TerminatePermanently endedOptional archival, cleanup

MCP Session State Diagram showing the flow between Initialize, Active, Suspend, Resume, and Terminate states

# MCP session lifecycle implementation
from mcp.server.fastmcp import FastMCP
import uuid
from enum import Enum

class SessionState(Enum):
    ACTIVE = "active"
    SUSPENDED = "suspended"
    TERMINATED = "terminated"

# FastMCP server with session management
mcp = FastMCP("session_manager")

# Active sessions in memory
active_sessions = {}

@mcp.tool()
async def create_session(session_id: str = None) -> dict:
    """Create new session with automatic ID if needed"""
    if not session_id:
        session_id = f"session-{uuid.uuid4()}"
    
    session_data = {
        "id": session_id,
        "created_at": datetime.now().isoformat(),
        "state": SessionState.ACTIVE.value,
        "context": {}
    }
    
    active_sessions[session_id] = session_data
    await storage.save(session_id, session_data)
    
    return {"session_id": session_id, "status": "created"}
    
@mcp.tool()
async def suspend_session(session_id: str) -> dict:
    """Save session state and free memory"""
    session = active_sessions.get(session_id)
    if not session:
        return {"error": f"Session {session_id} not found"}
    
    session["state"] = SessionState.SUSPENDED.value
    session["suspended_at"] = datetime.now().isoformat()
    
    # Save to persistent storage
    await storage.save(session_id, session)
    
    # Free memory
    del active_sessions[session_id]
    
    return {"session_id": session_id, "status": "suspended"}

@mcp.tool()
async def resume_session(session_id: str) -> dict:
    """Restore session from storage"""
    # Load from storage
    session = await storage.load(session_id)
    if not session:
        return {"error": f"Session {session_id} not found"}
    
    # Update state
    session["state"] = SessionState.ACTIVE.value
    session["resumed_at"] = datetime.now().isoformat()
    
    # Cache in memory
    active_sessions[session_id] = session
    
    return {
        "session_id": session_id,
        "status": "resumed",
        "context_items": len(session.get("context", {}))
    }

2. State Persistence

MCP supports multiple storage backends for different use cases:

# In-memory storage - fast but temporary
class InMemoryStorage:
    def __init__(self):
        self.sessions = {}
        self.contexts = {}
    
    async def save_session(self, session: Session):
        self.sessions[session.id] = session
    
    async def save_context(self, session_id: str, context: dict):
        self.contexts[session_id] = context

# File-based storage - simple persistence
class FileStorage:
    def __init__(self, base_path: str):
        self.base_path = Path(base_path)
        self.base_path.mkdir(exist_ok=True)
    
    async def save_session(self, session: Session):
        session_file = self.base_path / f"{session.id}.json"
        
        session_data = {
            "id": session.id,
            "created_at": session.created_at.isoformat(),
            "state": session.state.value,
            "metadata": session.metadata
        }
        
        with open(session_file, 'w') as f:
            json.dump(session_data, f, indent=2)
    
    async def save_context(self, session_id: str, context: dict):
        context_file = self.base_path / f"{session_id}_context.json"
        
        # Handle large contexts with compression
        if len(json.dumps(context)) > 1_000_000:  # 1MB
            with gzip.open(f"{context_file}.gz", 'wt') as f:
                json.dump(context, f)
        else:
            with open(context_file, 'w') as f:
                json.dump(context, f, indent=2)

# Database storage - production-ready
class DatabaseStorage:
    def __init__(self, connection_string: str):
        self.db = DatabaseConnection(connection_string)
    
    async def save_session(self, session: Session):
        await self.db.execute("""
            INSERT INTO mcp_sessions (id, created_at, state, metadata)
            VALUES (?, ?, ?, ?)
            ON CONFLICT (id) DO UPDATE SET
                state = excluded.state,
                metadata = excluded.metadata
        """, [session.id, session.created_at, session.state.value, 
              json.dumps(session.metadata)])
    
    async def save_context(self, session_id: str, context: dict):
        # Store context in chunks for better performance
        for key, value in context.items():
            await self.db.execute("""
                INSERT INTO mcp_context (session_id, key, value, updated_at)
                VALUES (?, ?, ?, ?)
                ON CONFLICT (session_id, key) DO UPDATE SET
                    value = excluded.value,
                    updated_at = excluded.updated_at
            """, [session_id, key, json.dumps(value), datetime.now()])

3. Context Accumulation

The real power comes from how context builds over time:

class Session:
    def __init__(self, id: str, **kwargs):
        self.id = id
        self.context = {}
        self.conversation_history = []
        self.discoveries = {}
        self.metadata = kwargs
    
    async def add_context(self, key: str, value: Any):
        """Add or update context information"""
        self.context[key] = {
            "value": value,
            "added_at": datetime.now(),
            "type": type(value).__name__
        }
    
    async def add_message(self, role: str, content: str, metadata: dict = None):
        """Track conversation with metadata"""
        message = {
            "role": role,
            "content": content,
            "timestamp": datetime.now(),
            "metadata": metadata or {}
        }
        
        self.conversation_history.append(message)
        
        # Keep history manageable
        if len(self.conversation_history) > 1000:
            # Archive old messages
            await self._archive_old_messages()
    
    async def add_discovery(self, key: str, discovery: dict):
        """Track important findings"""
        self.discoveries[key] = {
            "discovery": discovery,
            "timestamp": datetime.now(),
            "relevance": discovery.get("relevance", 1.0)
        }
    
    def get_relevant_context(self, query: str = None) -> dict:
        """Retrieve context intelligently"""
        if not query:
            return self.context
        
        # Simple relevance scoring
        relevant = {}
        query_lower = query.lower()
        
        for key, value in self.context.items():
            if query_lower in key.lower() or \
               query_lower in str(value.get("value", "")).lower():
                relevant[key] = value
        
        return relevant

Real-World Example: Multi-Day Debugging

Here’s how sessions work across multiple days:

# Example from multi_day_debug.py showing the 3-day debugging flow:

# Day 1: Initial investigation
async def day1_initial_investigation(self):
    # Create new session for debugging task
    self.session = {
        "id": "debug-memory-leak-2025-09",
        "project": "E-Commerce API",
        "issue": "Memory leak in production",
        "symptoms": {
            "memory_growth": "100MB/hour",
            "affects": "All API instances"
        }
    }
    
    # Add initial context about the environment
    await self.add_context("deployment_diff", {
        "version": "v2.4.0",
        "changes": [
            "Added real-time analytics feature",
            "Upgraded caching layer",
            "New webhook processing system"
        ]
    })
    
    # Examine code and make discoveries
    await self.add_discovery(
        "In-memory cache has no eviction policy",
        "The CacheManager stores everything in self.cache dictionary "
        "but never removes old entries."
    )
    
    # Suspend at end of day
    self.session["state"] = "suspended"
    await self.save_session()

# Day 2: Resume and narrow down issue  
async def day2_verification(self):
    # Resume session - all context preserved
    loaded = await self.load_session()
    
    # Continue with new findings
    await self.add_context("cache_analysis", {
        "total_entries": 45000,
        "memory_usage": "2.3GB",
        "largest_entries": [
            "user_analytics_*": "15MB each"
        ]
    })
    
    await self.add_discovery(
        "Analytics data is being cached indefinitely",
        "User analytics objects (15MB each) are cached but never expire."
    )

# Day 3: Implement and verify fix
async def day3_implementation(self):
    # Resume again
    loaded = await self.load_session()
    
    # Verify solution worked
    await self.add_context("verification_metrics", {
        "before": {
            "memory_growth": "100MB/hour",
            "peak_memory": "8GB before restart"
        },
        "after": {
            "memory_growth": "0MB/hour (stable)",
            "peak_memory": "1.2GB constant"
        }
    })

Terminal Session Example

Here’s what a real debugging session looks like in the terminal:

$ python multi_day_debug.py
=== Multi-Day Debugging Session Demo ===
This simulates a real debugging scenario across 3 days
showing how MCP sessions preserve context over time.

Starting Day 1...

============================================================
DAY 1: Tuesday, September 2, 2025 - 10:00 AM
============================================================

USER: Production API instances are consuming memory at 100MB/hour. 
      Started after deploying v2.4.0 last Friday. Need to find the leak.

ASSISTANT: I'll help you track down this memory leak. Let's start 
           systematically. First, let's check what changed in v2.4.0 deployment.

Checking database connections...
Examining cache implementation...

DISCOVERY: In-memory cache has no eviction policy

HYPOTHESIS: Memory leak caused by unbounded in-memory cache (confidence: 0.7)

End of Day 1 Summary:
Session saved: debug_sessions/debug-memory-leak-2025-09.json

Session suspended for the day

============================================================
DAY 2: Wednesday, September 3, 2025 - 9:30 AM
============================================================
Session loaded from: debug_sessions/debug-memory-leak-2025-09.json

Resuming debugging session...
Previous discoveries:
  - In-memory cache has no eviction policy
  - Webhook queue grows unbounded

USER: I added monitoring overnight. The cache is definitely growing - 
      it has 45,000 entries and using 2.3GB RAM.

ASSISTANT: That confirms our cache hypothesis. 45,000 entries in 2.3GB 
           means roughly 50KB per entry. Let's check what's being cached.

Session resumed with full context from yesterday!

Session Storage Format

Sessions are stored as JSON files with all context preserved:

{
  "id": "debug-memory-leak-2025-09",
  "created_at": "2025-09-02T10:00:00",
  "state": "suspended",
  "suspended_at": "2025-09-02T18:30:00",
  "project": "E-Commerce API",
  "issue": "Memory leak in production",
  "environment": {
    "platform": "AWS EC2 t3.large",
    "runtime": "Python 3.11",
    "framework": "FastAPI",
    "database": "PostgreSQL 14"
  },
  "context": {
    "deployment_diff": {
      "value": {
        "version": "v2.4.0",
        "changes": [
          "Added real-time analytics feature",
          "Upgraded caching layer",
          "New webhook processing system"
        ]
      },
      "added_at": "2025-09-02T10:15:00"
    },
    "cache_analysis": {
      "value": {
        "total_entries": 45000,
        "memory_usage": "2.3GB",
        "largest_entries": {
          "user_analytics_*": "15MB each",
          "report_data_*": "8MB each"
        }
      },
      "added_at": "2025-09-03T10:00:00"
    }
  },
  "discoveries": [
    {
      "title": "In-memory cache has no eviction policy",
      "description": "The CacheManager stores everything but never removes old entries.",
      "timestamp": "2025-09-02T14:30:00"
    },
    {
      "title": "Analytics data is being cached indefinitely",
      "description": "User analytics objects (15MB each) are cached but never expire.",
      "timestamp": "2025-09-03T11:00:00"
    }
  ],
  "conversation_history": [
    {
      "role": "user",
      "content": "Production API instances are consuming memory at 100MB/hour...",
      "timestamp": "2025-09-02T10:00:00"
    },
    {
      "role": "assistant", 
      "content": "I'll help you track down this memory leak...",
      "timestamp": "2025-09-02T10:01:00"
    }
  ]
}

State Persistence Patterns

Different applications need different persistence strategies:

Pattern 1: Ephemeral Sessions (In-Memory)

Best for: Quick interactions, testing, demos

# From storage_examples.py - InMemoryStorage class
class InMemoryStorage(SessionStorage):
    def __init__(self):
        self.sessions: Dict[str, dict] = {}
    
    async def save(self, session_id: str, data: dict) -> bool:
        self.sessions[session_id] = data
        return True

# Usage:
storage = InMemoryStorage()
await storage.save("session-123", session_data)

Pattern 2: File-Based Persistence

Best for: Development, small teams, simple deployments

# From storage_examples.py - FileStorage with compression
class FileStorage(SessionStorage):
    async def save(self, session_id: str, data: dict) -> bool:
        try:
            file_path = self.storage_path / f"{session_id}.json"
            json_data = json.dumps(data, indent=2, default=str)
            
            # Use compression for large sessions
            if len(json_data) > 10000:  # 10KB threshold
                file_path = self.storage_path / f"{session_id}.json.gz"
                with gzip.open(file_path, 'wt', encoding='utf-8') as f:
                    f.write(json_data)
            else:
                with open(file_path, 'w') as f:
                    f.write(json_data)
            return True
        except Exception as e:
            return False

Pattern 3: Database-Backed Sessions

Best for: Desktop apps, small to medium projects, moderate traffic

# From storage_examples.py - DatabaseStorage with SQLite
class DatabaseStorage(SessionStorage):
    def __init__(self, database_path: str = "./sessions.db"):
        self.database_path = database_path
        self.initialized = False
    
    async def init_db(self):
        async with aiosqlite.connect(self.database_path) as db:
            await db.execute('''
                CREATE TABLE IF NOT EXISTS mcp_sessions (
                    session_id TEXT PRIMARY KEY,
                    data TEXT NOT NULL,
                    created_at TEXT DEFAULT CURRENT_TIMESTAMP,
                    updated_at TEXT DEFAULT CURRENT_TIMESTAMP
                )
            ''')
            await db.commit()
        self.initialized = True
    
    async def save(self, session_id: str, data: dict) -> bool:
        async with aiosqlite.connect(self.database_path) as db:
            # Check if exists
            cursor = await db.execute(
                'SELECT session_id FROM mcp_sessions WHERE session_id = ?',
                (session_id,)
            )
            exists = await cursor.fetchone() is not None
            
            if exists:
                await db.execute('''
                    UPDATE mcp_sessions 
                    SET data = ?, updated_at = CURRENT_TIMESTAMP
                    WHERE session_id = ?
                ''', (json.dumps(data, default=str), session_id))
            else:
                await db.execute('''
                    INSERT INTO mcp_sessions (session_id, data)
                    VALUES (?, ?)
                ''', (session_id, json.dumps(data, default=str)))
            
            await db.commit()
        return True
    
    async def cleanup_old_sessions(self, days: int = 30):
        """Remove sessions older than specified days"""
        async with aiosqlite.connect(self.database_path) as db:
            cursor = await db.execute('''
                DELETE FROM mcp_sessions 
                WHERE datetime(updated_at) < datetime('now', ? || ' days')
            ''', (-days,))
            await db.commit()

Database Schema Visualization

For production deployments with higher traffic, you might want a more robust schema:

Table: mcp_sessions
ColumnTypeConstraintsDescription
session_idVARCHAR(255)PRIMARY KEYUnique session identifier
dataJSON/TEXTNOT NULLSerialized session data
created_atTIMESTAMPDEFAULT NOW()Session creation time
updated_atTIMESTAMPDEFAULT NOW()Last update time
stateVARCHAR(50)NOT NULLactive/suspended/terminated
user_idVARCHAR(255)INDEXOptional user association
Table: mcp_context
ColumnTypeConstraintsDescription
session_idVARCHAR(255)FOREIGN KEYReference to session
keyVARCHAR(255)NOT NULLContext key name
valueJSON/TEXTNOT NULLContext value data
updated_atTIMESTAMPDEFAULT NOW()Last update time
Table: mcp_conversation_history
ColumnTypeConstraintsDescription
idSERIALPRIMARY KEYAuto-incrementing ID
session_idVARCHAR(255)FOREIGN KEYReference to session
roleVARCHAR(50)NOT NULLuser/assistant/system
contentTEXTNOT NULLMessage content
timestampTIMESTAMPDEFAULT NOW()Message timestamp
metadataJSONNULLABLEAdditional message data

Common Pitfalls and Solutions

1. Context Bloat

Sessions can grow too large over time:

class SmartSession(Session):
    MAX_CONTEXT_SIZE = 10_000_000  # 10MB
    MAX_HISTORY_LENGTH = 1000
    
    async def add_context(self, key: str, value: Any):
        # Check size before adding
        new_size = len(json.dumps(value))
        current_size = sum(
            len(json.dumps(v)) for v in self.context.values()
        )
        
        if current_size + new_size > self.MAX_CONTEXT_SIZE:
            # Intelligently prune old context
            await self._prune_context()
        
        await super().add_context(key, value)
    
    async def _prune_context(self):
        """Remove least recently used context"""
        # Sort by last access time
        sorted_context = sorted(
            self.context.items(),
            key=lambda x: x[1].get("accessed_at", x[1]["added_at"])
        )
        
        # Remove oldest 20%
        remove_count = len(sorted_context) // 5
        for key, _ in sorted_context[:remove_count]:
            del self.context[key]

2. Session Recovery Failures

Handle recovery gracefully:

async def safe_resume_session(session_id: str) -> Optional[Session]:
    try:
        return await session_manager.resume_session(session_id)
    except ValueError:
        # Session not found
        print(f"Session {session_id} not found, creating new one")
        return await session_manager.create_session()
    except json.JSONDecodeError:
        # Corrupted session data
        print(f"Session {session_id} corrupted, creating fresh")
        await session_manager.delete_session(session_id)
        return await session_manager.create_session()
    except Exception as e:
        # Unknown error
        print(f"Failed to resume session: {e}")
        return None

3. Concurrent Access

Prevent race conditions:

import asyncio
from collections import defaultdict

class ConcurrentSessionManager(SessionManager):
    def __init__(self, storage_backend):
        super().__init__(storage_backend)
        self._locks = defaultdict(asyncio.Lock)
    
    async def get_session(self, session_id: str) -> Session:
        """Thread-safe session access"""
        async with self._locks[session_id]:
            if session_id in self.active_sessions:
                return self.active_sessions[session_id]
            
            return await self.resume_session(session_id)
    
    async def update_session(self, session_id: str, updates: dict):
        """Atomic session updates"""
        async with self._locks[session_id]:
            session = await self.get_session(session_id)
            
            for key, value in updates.items():
                if hasattr(session, key):
                    setattr(session, key, value)
            
            await self.storage.save_session(session)

Performance Considerations

Sessions add overhead. Here’s how to minimize it:

1. Lazy Loading

Don’t load everything upfront:

class LazySession(Session):
    def __init__(self, id: str, storage: Storage):
        super().__init__(id)
        self.storage = storage
        self._context_loaded = False
        self._history_loaded = False
    
    @property
    async def context(self):
        if not self._context_loaded:
            self._context = await self.storage.load_context(self.id)
            self._context_loaded = True
        return self._context
    
    @property
    async def conversation_history(self):
        if not self._history_loaded:
            self._history = await self.storage.load_history(self.id)
            self._history_loaded = True
        return self._history

2. Caching Strategies

Use caching for frequently accessed sessions:

from functools import lru_cache
import time

class CachedSessionManager(SessionManager):
    def __init__(self, storage_backend, cache_ttl: int = 3600):
        super().__init__(storage_backend)
        self.cache_ttl = cache_ttl
        self._cache = {}
        self._cache_times = {}
    
    async def get_session(self, session_id: str) -> Session:
        # Check cache first
        if session_id in self._cache:
            cache_time = self._cache_times.get(session_id, 0)
            if time.time() - cache_time < self.cache_ttl:
                return self._cache[session_id]
        
        # Load from storage
        session = await super().get_session(session_id)
        
        # Update cache
        self._cache[session_id] = session
        self._cache_times[session_id] = time.time()
        
        return session

3. Efficient Serialization

Use efficient formats for large contexts:

import msgpack
import pickle

class EfficientStorage(FileStorage):
    def get_serializer(self, data_size: int):
        """Choose serializer based on data characteristics"""
        if data_size < 1000:  # Small data
            return json
        elif data_size < 100_000:  # Medium data
            return msgpack
        else:  # Large data
            return pickle
    
    async def save_context(self, session_id: str, context: dict):
        data = json.dumps(context)
        serializer = self.get_serializer(len(data))
        
        filename = self.base_path / f"{session_id}_context.{serializer.__name__}"
        
        if serializer == json:
            with open(filename, 'w') as f:
                json.dump(context, f)
        else:
            with open(filename, 'wb') as f:
                serializer.dump(context, f)

Testing Your Implementation

Always test session persistence thoroughly:

import pytest
import tempfile
from pathlib import Path

@pytest.fixture
async def session_manager():
    with tempfile.TemporaryDirectory() as tmpdir:
        storage = FileStorage(tmpdir)
        manager = SessionManager(storage)
        yield manager

@pytest.mark.asyncio
async def test_session_persistence(session_manager):
    # Create and populate session
    session = await session_manager.create_session("test-session")
    await session.add_context("test_key", "test_value")
    await session.add_message("user", "Test message")
    
    # Suspend session
    await session_manager.suspend_session("test-session")
    
    # Verify session is not in memory
    assert "test-session" not in session_manager.active_sessions
    
    # Resume session
    resumed = await session_manager.resume_session("test-session")
    
    # Verify state persisted
    assert resumed.id == "test-session"
    assert resumed.context["test_key"]["value"] == "test_value"
    assert len(resumed.conversation_history) == 1
    assert resumed.conversation_history[0]["content"] == "Test message"

@pytest.mark.asyncio
async def test_concurrent_access(session_manager):
    """Test multiple concurrent session updates"""
    session = await session_manager.create_session("concurrent-test")
    
    # Simulate concurrent updates
    async def update_context(key: str, value: str):
        s = await session_manager.get_session("concurrent-test")
        await s.add_context(key, value)
        await session_manager.storage.save_session(s)
    
    # Run updates concurrently
    await asyncio.gather(
        update_context("key1", "value1"),
        update_context("key2", "value2"),
        update_context("key3", "value3")
    )
    
    # Verify all updates applied
    final = await session_manager.get_session("concurrent-test")
    assert len(final.context) == 3

Integration with MCP Servers

Here’s how to add session support to your MCP server:

from mcp.server.fastmcp import FastMCP
import json
from datetime import datetime
from pathlib import Path
from typing import Dict, Any

# Initialize FastMCP server
mcp = FastMCP("session_server")

# Session storage
SESSIONS_DIR = Path("./sessions")
SESSIONS_DIR.mkdir(exist_ok=True)
session_cache: Dict[str, dict] = {}

# Helper function to check if session exists
async def session_exists(session_id: str) -> bool:
    """Check if a session exists"""
    file_path = SESSIONS_DIR / f"{session_id}.json"
    return file_path.exists()

# Helper function to save session
async def save_session(session_id: str, data: dict) -> None:
    """Persist session to file storage"""
    file_path = SESSIONS_DIR / f"{session_id}.json"
    with open(file_path, 'w') as f:
        json.dump(data, f, indent=2, default=str)

@mcp.tool()
async def create_session(
    session_id: str,
    project: str = "",
    description: str = "",
    metadata: dict = None
) -> dict:
    """Create a new persistent session"""
    # Check if session already exists
    if await session_exists(session_id):
        return {
            "error": f"Session '{session_id}' already exists",
            "suggestion": "Use resume_session to continue existing session"
        }
    
    # Create session structure
    session_data = {
        "id": session_id,
        "created_at": datetime.now().isoformat(),
        "state": "active",
        "project": project,
        "description": description,
        "metadata": metadata or {},
        "context": {},
        "conversation_history": [],
        "discoveries": []
    }
    
    # Save to storage and cache
    await save_session(session_id, session_data)
    session_cache[session_id] = session_data
    
    return {"status": "created", "session_id": session_id, "project": project}

This approach gives you:

  • Simple file-based storage that persists between runs
  • In-memory caching for performance
  • Clear session lifecycle management
  • Easy debugging (sessions are JSON files)

Production Deployment

For production systems, consider these additional factors:

1. Session Limits

Prevent resource exhaustion:

class ProductionSessionManager(SessionManager):
    MAX_SESSIONS_PER_USER = 100
    MAX_TOTAL_SESSIONS = 10000
    SESSION_TTL_DAYS = 90
    
    async def create_session(self, session_id: str = None, 
                           user_id: str = None) -> Session:
        # Check user limits
        if user_id:
            user_sessions = await self.count_user_sessions(user_id)
            if user_sessions >= self.MAX_SESSIONS_PER_USER:
                # Clean up old sessions
                await self.cleanup_user_sessions(user_id)
        
        # Check total limits
        total_sessions = await self.count_total_sessions()
        if total_sessions >= self.MAX_TOTAL_SESSIONS:
            raise Exception("Session limit reached")
        
        session = await super().create_session(session_id)
        session.metadata["user_id"] = user_id
        
        return session

2. Security Considerations

import hmac
import hashlib

class SecureSession(Session):
    def __init__(self, id: str, secret_key: str):
        super().__init__(id)
        self.secret_key = secret_key
    
    def generate_token(self) -> str:
        """Generate secure session token"""
        message = f"{self.id}:{self.created_at.isoformat()}"
        signature = hmac.new(
            self.secret_key.encode(),
            message.encode(),
            hashlib.sha256
        ).hexdigest()
        
        return f"{self.id}:{signature}"
    
    @staticmethod
    def verify_token(token: str, secret_key: str) -> str:
        """Verify and extract session ID from token"""
        try:
            session_id, signature = token.split(":")
            # Verify signature...
            return session_id
        except:
            raise ValueError("Invalid session token")

3. Monitoring and Metrics

class MonitoredSessionManager(SessionManager):
    def __init__(self, storage_backend, metrics_client):
        super().__init__(storage_backend)
        self.metrics = metrics_client
    
    async def create_session(self, session_id: str = None) -> Session:
        start_time = time.time()
        
        try:
            session = await super().create_session(session_id)
            
            # Track metrics
            self.metrics.increment("sessions.created")
            self.metrics.histogram(
                "sessions.create_time",
                time.time() - start_time
            )
            
            return session
        except Exception as e:
            self.metrics.increment("sessions.create_errors")
            raise
    
    async def get_session_stats(self) -> dict:
        """Get session statistics for monitoring"""
        return {
            "active_sessions": len(self.active_sessions),
            "total_sessions": await self.count_total_sessions(),
            "avg_session_size": await self.get_average_session_size(),
            "oldest_session": await self.get_oldest_session_age()
        }

MCP Session Performance Monitoring Dashboard showing active sessions gauge, session creation rate chart, and key metrics

Code Examples

All the code from this post is available in the documentation:

The complete code examples are available on our GitHub repository. To run them:

# Clone the examples repository
git clone https://github.com/angrysharkstudio/mcp-sessions-examples
cd mcp-sessions-examples

# Install dependencies
pip install -r requirements.txt

# Run the examples:
python session_demo.py         # Simple session demonstration
python storage_examples.py     # Storage backends comparison
python mcp_session_server.py   # Complete MCP server
python multi_day_debug.py      # 3-day debugging scenario

The examples include:

  • session_demo.py: Simple demonstration of session concepts
  • storage_examples.py: In-memory, file-based, and database storage
  • mcp_session_server.py: Complete FastMCP server with session tools
  • multi_day_debug.py: Realistic 3-day debugging scenario
  • README.md: Setup instructions and detailed explanations

Code Examples on GitHub

All the code examples from this article are available on GitHub. You can clone the repository and run them locally:

MCP Sessions Examples Repository - Complete working code from this tutorial

Additional Resources

Clone the examples and experiment with different storage backends:

git clone https://github.com/angrysharkstudio/mcp-sessions-examples.git
cd mcp-sessions-examples
pip install -r requirements.txt
python session_demo.py

What’s Next

Sessions are just one part of MCP’s power. They enable:

  • Multi-agent workflows that maintain state
  • Long-running AI tasks
  • Complex debugging sessions
  • Personalized AI assistants

Next week, we’ll explore how MCP handles standardization versus flexibility - letting you extend the protocol while maintaining compatibility.

FAQ

Does every AI interaction need a session?
No. Use sessions for multi-turn conversations, debugging, or when context matters. Simple one-off queries work fine without them.

What’s the storage overhead?
Depends on usage. Text conversations average 1-10KB per exchange. Code contexts can be larger. Plan for 1-10MB per active session.

Can I share sessions between users?
Yes, with proper access control. Useful for team debugging or collaborative AI interactions.

How long should sessions persist?
Depends on use case. Development sessions: weeks to months. Support conversations: 30-90 days. Temporary tasks: hours to days.

What about GDPR/privacy?
Session data may contain personal information. Implement proper data retention policies and user data export/deletion capabilities.


Need Expert MCP Implementation?

Struggling with AI session management or MCP integration? Our team specializes in:

  • Custom MCP Server Development - Tailored session management for your use case
  • AI Integration Architecture - Multi-model systems with persistent context
  • Performance Optimization - Scale to thousands of concurrent sessions
  • Enterprise MCP Solutions - Production-ready implementations with monitoring

Contact us for MCP consulting →


Stay Updated with AI Development

Get weekly insights on MCP, AI integration patterns, and practical implementation tips. Join developers building the next generation of AI applications.

Subscribe to our newsletter on the blog →


Building something interesting with MCP sessions? Share your implementation - we feature innovative projects in our newsletter.

Angry Shark Studio Logo

About Angry Shark Studio

Angry Shark Studio is a professional Unity AR/VR development studio specializing in mobile multiplatform applications and AI solutions. Our team includes Unity Certified Expert Programmers with extensive experience in AR/VR development.

Related Articles

More Articles

Explore more insights on Unity AR/VR development, mobile apps, and emerging technologies.

View All Articles

Need Help?

Have questions about this article or need assistance with your project?

Get in Touch