Build a Python chat application with MongoDB Flex
Diese Seite ist noch nicht in deiner Sprache verfügbar. Englische Seite aufrufen
In this tutorial, you implement a real world scenario with MongoDB Flex. After finishing all the steps, you have a Python FastAPI application, which stores chat messages into a MongoDB Flex database. You learn how to connect a Python application to MongoDB Flex and build a real-time chat system using WebSockets. With the acquired knowledge you can build modern real-time applications backed up by a solid and flexible database.
Overview
Section titled “Overview”This tutorial is split into the following sections:
-
Create a MongoDB Flex instance with a user, proper rights and ACL settings.
-
Initialize an environment to execute Python with the needed packages.
-
Write a small Python application which tests the connection to the database.
-
Create a real-time chat application which is backed up by MongoDB Flex.
Prerequisites
Section titled “Prerequisites”-
You have a STACKIT customer account: Create a customer account
-
You have a STACKIT user account: Create a user account
-
You have a STACKIT project: Create a project
-
On the development system, you have access to a shell with Python 3.8+ available.
-
You can reach the STACKIT cloud from your development machine.
-
You know the external IPv4 address of your client system.
Create a MongoDB Flex instance with a user, proper rights and ACL settings
Section titled “Create a MongoDB Flex instance with a user, proper rights and ACL settings”Initialize an environment to execute Python with the needed packages
Section titled “Initialize an environment to execute Python with the needed packages”python --versionThe output should be something like this:
Python 3.12.3The installed version should be at least equal or bigger than 3.10.0. If you have an older version installed, please consult the manual of your system to install a more recent version. After checking this prerequisite, create a folder for this tutorial:
mkdir python-chat-with-mongodb-flex-tutorialcd python-chat-with-mongodb-flex-tutorialpython -m venv venvsource venv/bin/activate # On Windows: venv\Scripts\activatepip install fastapi uvicorn pymongo python-dotenv websocketsThe output should be something like this:
Successfully installed fastapi-0.104.1 uvicorn-0.24.0 pymongo-4.6.0 python-dotenv-1.0.0 websockets-12.0Now you are ready to write your small Python test application.
Write a small Python application which tests the connection to the database
Section titled “Write a small Python application which tests the connection to the database”Open your code editor of choice and create a file called .env with the following content (insert
the connection string which you saved before):
MONGODB_URI=mongodb://USERNAME:YOUR-SECRET-PASSWORD@s-f47ac10b-58cc-4372-a567-0e02b2c3d479-0.mongodb.eu01.onstackit.cloud:27017/default?authSource=default&tls=true&authMechanism=SCRAM-SHA-256Create another file called test_connection.py:
from pymongo import MongoClientfrom dotenv import load_dotenvimport os
load_dotenv()
uri = os.getenv('MONGODB_URI')
def test_connection(): if not uri: print('MONGODB_URI not set!') return
client = MongoClient(uri)
try: print('Connecting to MongoDB Flex...')
# Test connection with ping client.admin.command('ping')
print('Successfully connected!')
# Get database name db_name = client.get_default_database().name print(f'Connected to: {db_name}')
except Exception as error: print(f'Error while connecting to MongoDB Flex: {error}') finally: client.close() print('Connection closed.')
if __name__ == '__main__': test_connection()Now execute the mini application with your Python interpreter:
python test_connection.pyYou should see something like:
Connecting to MongoDB Flex...Successfully connected!Connected to: defaultConnection closed.Create a real-time chat application which is backed up by MongoDB Flex
Section titled “Create a real-time chat application which is backed up by MongoDB Flex”Now that you set up everything correctly, you can begin with the implementation of the actual
application. To keep this tutorial focused on the core concepts, we are deliberately skipping the
setup of the best practice Python folder structure. We recommend to follow this best practice
structure when creating a project for production usage. Create a file called main.py:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPExceptionfrom fastapi.staticfiles import StaticFilesfrom fastapi.responses import HTMLResponsefrom pymongo import MongoClient, ASCENDINGfrom dotenv import load_dotenvimport osimport jsonfrom datetime import datetimefrom typing import Dict, List
# Load environment variablesload_dotenv()
app = FastAPI(title="MongoDB Flex Chat API")uri = os.getenv('MONGODB_URI')
if not uri: raise ValueError("MONGODB_URI environment variable is required")
# MongoDB connectionclient = MongoClient(uri)db = client.get_default_database()messages_collection = db.messagesrooms_collection = db.rooms
# Create indexes for better performancemessages_collection.create_index([("room_id", ASCENDING), ("timestamp", ASCENDING)])
# WebSocket Connection Managerclass ConnectionManager: def __init__(self): self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_id: str): await websocket.accept() if room_id not in self.active_connections: self.active_connections[room_id] = [] self.active_connections[room_id].append(websocket)
def disconnect(self, websocket: WebSocket, room_id: str): if room_id in self.active_connections: self.active_connections[room_id].remove(websocket) if not self.active_connections[room_id]: del self.active_connections[room_id]
async def broadcast_to_room(self, message: dict, room_id: str): if room_id in self.active_connections: disconnected = [] for connection in self.active_connections[room_id]: try: await connection.send_text(json.dumps(message)) except: disconnected.append(connection)
# Remove disconnected connections for connection in disconnected: self.active_connections[room_id].remove(connection)
manager = ConnectionManager()
# Initialize default roomsdef init_default_rooms(): if rooms_collection.count_documents({}) == 0: default_rooms = [ {"name": "general", "description": "General discussion"}, {"name": "tech", "description": "Technical discussions"}, {"name": "random", "description": "Random conversations"} ] rooms_collection.insert_many(default_rooms) print("Created default chat rooms")
@app.on_event("startup")async def startup_event(): init_default_rooms() print("🚀 Chat application started") print(f"📊 Connected to MongoDB: {db.name}")
# WebSocket endpoint for real-time chat@app.websocket("/ws/{room_id}/{username}")async def websocket_endpoint(websocket: WebSocket, room_id: str, username: str): await manager.connect(websocket, room_id)
# Send join notification join_message = { "type": "system", "content": f"{username} joined the room", "timestamp": datetime.utcnow().isoformat(), "room_id": room_id } await manager.broadcast_to_room(join_message, room_id)
try: while True: data = await websocket.receive_text() message_data = json.loads(data)
# Create message document message = { "room_id": room_id, "username": username, "content": message_data["content"], "type": "message", "timestamp": datetime.utcnow() }
# Store in MongoDB result = messages_collection.insert_one(message)
# Prepare message for broadcast broadcast_message = { "_id": str(result.inserted_id), "room_id": room_id, "username": username, "content": message_data["content"], "type": "message", "timestamp": message["timestamp"].isoformat() }
# Broadcast to all clients in room await manager.broadcast_to_room(broadcast_message, room_id)
except WebSocketDisconnect: manager.disconnect(websocket, room_id)
# Send leave notification leave_message = { "type": "system", "content": f"{username} left the room", "timestamp": datetime.utcnow().isoformat(), "room_id": room_id } await manager.broadcast_to_room(leave_message, room_id)
# REST API endpoints@app.get("/api/rooms")async def get_rooms(): """Get all available chat rooms""" rooms = list(rooms_collection.find({}, {"_id": 0})) return { "success": True, "count": len(rooms), "data": rooms }
@app.get("/api/messages/{room_id}")async def get_messages(room_id: str, limit: int = 50): """Get recent messages from a specific room""" try: messages = list( messages_collection.find({"room_id": room_id}) .sort("timestamp", -1) .limit(limit) )
# Convert ObjectId and datetime for JSON serialization for msg in messages: msg["_id"] = str(msg["_id"]) msg["timestamp"] = msg["timestamp"].isoformat()
# Return in chronological order messages.reverse()
return { "success": True, "count": len(messages), "room_id": room_id, "data": messages } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching messages: {str(e)}")
@app.get("/api/stats/{room_id}")async def get_room_stats(room_id: str): """Get statistics for a specific room""" try: pipeline = [ {"$match": {"room_id": room_id}}, {"$group": { "_id": "$username", "message_count": {"$sum": 1}, "last_message": {"$max": "$timestamp"} }}, {"$sort": {"message_count": -1}} ]
user_stats = list(messages_collection.aggregate(pipeline)) total_messages = messages_collection.count_documents({"room_id": room_id})
return { "success": True, "room_id": room_id, "total_messages": total_messages, "user_stats": user_stats } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}")
# Health check endpoint@app.get("/health")async def health_check(): try: # Test database connection client.admin.command('ping') return { "success": True, "message": "API is running", "database": "connected", "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "message": "Database connection failed", "error": str(e), "timestamp": datetime.utcnow().isoformat() }
# Serve static files and HTMLapp.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)async def get_chat_interface(): html_content = """ <!DOCTYPE html> <html> <head> <title>MongoDB Flex Chat</title> <style> body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; } .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } .header { text-align: center; margin-bottom: 20px; color: #333; } .room-info { background: #e9ecef; padding: 10px; border-radius: 4px; margin-bottom: 20px; } #messages { height: 400px; overflow-y: scroll; border: 1px solid #ddd; padding: 15px; margin-bottom: 15px; background: #fafafa; } .message { margin: 8px 0; padding: 8px; border-radius: 4px; } .message.user { background: #e3f2fd; } .message.system { background: #fff3e0; font-style: italic; color: #666; } .username { font-weight: bold; color: #1976d2; } .timestamp { font-size: 0.8em; color: #666; margin-left: 10px; } .input-area { display: flex; gap: 10px; } #messageInput { flex: 1; padding: 10px; border: 1px solid #ddd; border-radius: 4px; } button { padding: 10px 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer; } button:hover { background: #1565c0; } .status { text-align: center; padding: 10px; margin-bottom: 10px; border-radius: 4px; } .connected { background: #d4edda; color: #155724; } .disconnected { background: #f8d7da; color: #721c24; } </style> </head> <body> <div class="container"> <div class="header"> <h1>🚀 MongoDB Flex Chat</h1> <div id="status" class="status disconnected">Disconnected</div> </div>
<div class="room-info"> <strong>Room:</strong> <span id="roomName"></span> | <strong>User:</strong> <span id="userName"></span> </div>
<div id="messages"></div>
<div class="input-area"> <input type="text" id="messageInput" placeholder="Type your message..." maxlength="500"> <button onclick="sendMessage()">Send</button> </div> </div>
<script> // Get URL parameters const urlParams = new URLSearchParams(window.location.search); const room = urlParams.get('room') || 'general'; const username = urlParams.get('username') || 'Anonymous_' + Math.floor(Math.random() * 1000);
document.getElementById('roomName').textContent = room; document.getElementById('userName').textContent = username;
// WebSocket connection const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const ws = new WebSocket(`${wsProtocol}//${window.location.host}/ws/${room}/${username}`); const messages = document.getElementById('messages'); const statusDiv = document.getElementById('status');
ws.onopen = function() { statusDiv.textContent = 'Connected'; statusDiv.className = 'status connected'; };
ws.onclose = function() { statusDiv.textContent = 'Disconnected'; statusDiv.className = 'status disconnected'; };
ws.onmessage = function(event) { const message = JSON.parse(event.data); addMessage(message); };
function addMessage(message) { const div = document.createElement('div'); div.className = `message ${message.type}`;
if (message.type === 'system') { div.innerHTML = ` <span>${message.content}</span> <span class="timestamp">${formatTime(message.timestamp)}</span> `; } else { div.innerHTML = ` <span class="username">${message.username}:</span> <span>${message.content}</span> <span class="timestamp">${formatTime(message.timestamp)}</span> `; }
messages.appendChild(div); messages.scrollTop = messages.scrollHeight; }
function formatTime(timestamp) { return new Date(timestamp).toLocaleTimeString(); }
function sendMessage() { const input = document.getElementById('messageInput'); const content = input.value.trim();
if (content && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({content: content})); input.value = ''; } }
document.getElementById('messageInput').addEventListener('keypress', function(e) { if (e.key === 'Enter') { sendMessage(); } });
// Load previous messages fetch(`/api/messages/${room}`) .then(response => response.json()) .then(result => { if (result.success) { result.data.forEach(addMessage); } }) .catch(error => console.error('Error loading messages:', error)); </script> </body> </html> """ return html_content
# 404 Handler@app.exception_handler(404)async def not_found_handler(request, exc): return { "success": False, "message": "Endpoint not found" }
# Error Handler@app.exception_handler(500)async def internal_error_handler(request, exc): return { "success": False, "message": "Internal server error" }
if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)Now you can start your chat application:
python main.pyYou should see something like:
Created default chat roomsChat application startedConnected to MongoDB: defaultINFO: Started server process [12345]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)Open your browser and navigate to http://localhost:8000?room=general&username=TestUser. You can
open multiple browser windows with different usernames to test the real-time chat functionality.
Your chat application is now running and connected to MongoDB Flex! You can:
- Send and receive messages in real-time
- Switch between different chat rooms
- View message history
- See when users join and leave rooms
- Check API health and room statistics
The application demonstrates MongoDB’s flexibility in handling real-time data, WebSocket connections, and the powerful aggregation framework for statistics.