Skip to content

Build a Python chat application with MongoDB Flex

In this tutorial, you implement a real world scenario with MongoDB Flex. After finishing all the steps, you have a Python FastAPI application, which stores chat messages into a MongoDB Flex database. You learn how to connect a Python application to MongoDB Flex and build a real-time chat system using WebSockets. With the acquired knowledge you can build modern real-time applications backed up by a solid and flexible database.

This tutorial is split into the following sections:

  1. Create a MongoDB Flex instance with a user, proper rights and ACL settings.

  2. Initialize an environment to execute Python with the needed packages.

  3. Write a small Python application which tests the connection to the database.

  4. Create a real-time chat application which is backed up by MongoDB Flex.

  • You have a STACKIT customer account: Create a customer account

  • You have a STACKIT user account: Create a user account

  • You have a STACKIT project: Create a project

  • On the development system, you have access to a shell with Python 3.8+ available.

  • You can reach the STACKIT cloud from your development machine.

  • You know the external IPv4 address of your client system.

Create a MongoDB Flex instance with a user, proper rights and ACL settings

Section titled β€œCreate a MongoDB Flex instance with a user, proper rights and ACL settings”

Initialize an environment to execute Python with the needed packages

Section titled β€œInitialize an environment to execute Python with the needed packages”
Terminal window
python --version

The output should be something like this:

Python 3.12.3

The installed version should be at least equal or bigger than 3.10.0. If you have an older version installed, please consult the manual of your system to install a more recent version. After checking this prerequisite, create a folder for this tutorial:

Terminal window
mkdir python-chat-with-mongodb-flex-tutorial
cd python-chat-with-mongodb-flex-tutorial
python -m venv venv
source venv/bin/activate # On Windows: venv\Scripts\activate
pip install fastapi uvicorn pymongo python-dotenv websockets

The output should be something like this:

Successfully installed fastapi-0.104.1 uvicorn-0.24.0 pymongo-4.6.0 python-dotenv-1.0.0 websockets-12.0

Now you are ready to write your small Python test application.

Write a small Python application which tests the connection to the database

Section titled β€œWrite a small Python application which tests the connection to the database”

Open your code editor of choice and create a file called .env with the following content (insert the connection string which you saved before):

MONGODB_URI=mongodb://USERNAME:YOUR-SECRET-PASSWORD@s-f47ac10b-58cc-4372-a567-0e02b2c3d479-0.mongodb.eu01.onstackit.cloud:27017/default?authSource=default&tls=true&authMechanism=SCRAM-SHA-256

Create another file called test_connection.py:

from pymongo import MongoClient
from dotenv import load_dotenv
import os
load_dotenv()
uri = os.getenv('MONGODB_URI')
def test_connection():
if not uri:
print('MONGODB_URI not set!')
return
client = MongoClient(uri)
try:
print('Connecting to MongoDB Flex...')
# Test connection with ping
client.admin.command('ping')
print('Successfully connected!')
# Get database name
db_name = client.get_default_database().name
print(f'Connected to: {db_name}')
except Exception as error:
print(f'Error while connecting to MongoDB Flex: {error}')
finally:
client.close()
print('Connection closed.')
if __name__ == '__main__':
test_connection()

Now execute the mini application with your Python interpreter:

Terminal window
python test_connection.py

You should see something like:

Connecting to MongoDB Flex...
Successfully connected!
Connected to: default
Connection closed.

Create a real-time chat application which is backed up by MongoDB Flex

Section titled β€œCreate a real-time chat application which is backed up by MongoDB Flex”

Now that you set up everything correctly, you can begin with the implementation of the actual application. To keep this tutorial focused on the core concepts, we are deliberately skipping the setup of the best practice Python folder structure. We recommend to follow this best practice structure when creating a project for production usage. Create a file called main.py:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pymongo import MongoClient, ASCENDING
from dotenv import load_dotenv
import os
import json
from datetime import datetime
from typing import Dict, List
# Load environment variables
load_dotenv()
app = FastAPI(title="MongoDB Flex Chat API")
uri = os.getenv('MONGODB_URI')
if not uri:
raise ValueError("MONGODB_URI environment variable is required")
# MongoDB connection
client = MongoClient(uri)
db = client.get_default_database()
messages_collection = db.messages
rooms_collection = db.rooms
# Create indexes for better performance
messages_collection.create_index([("room_id", ASCENDING), ("timestamp", ASCENDING)])
# WebSocket Connection Manager
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_id: str):
await websocket.accept()
if room_id not in self.active_connections:
self.active_connections[room_id] = []
self.active_connections[room_id].append(websocket)
def disconnect(self, websocket: WebSocket, room_id: str):
if room_id in self.active_connections:
self.active_connections[room_id].remove(websocket)
if not self.active_connections[room_id]:
del self.active_connections[room_id]
async def broadcast_to_room(self, message: dict, room_id: str):
if room_id in self.active_connections:
disconnected = []
for connection in self.active_connections[room_id]:
try:
await connection.send_text(json.dumps(message))
except:
disconnected.append(connection)
# Remove disconnected connections
for connection in disconnected:
self.active_connections[room_id].remove(connection)
manager = ConnectionManager()
# Initialize default rooms
def init_default_rooms():
if rooms_collection.count_documents({}) == 0:
default_rooms = [
{"name": "general", "description": "General discussion"},
{"name": "tech", "description": "Technical discussions"},
{"name": "random", "description": "Random conversations"}
]
rooms_collection.insert_many(default_rooms)
print("Created default chat rooms")
@app.on_event("startup")
async def startup_event():
init_default_rooms()
print("πŸš€ Chat application started")
print(f"πŸ“Š Connected to MongoDB: {db.name}")
# WebSocket endpoint for real-time chat
@app.websocket("/ws/{room_id}/{username}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, username: str):
await manager.connect(websocket, room_id)
# Send join notification
join_message = {
"type": "system",
"content": f"{username} joined the room",
"timestamp": datetime.utcnow().isoformat(),
"room_id": room_id
}
await manager.broadcast_to_room(join_message, room_id)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# Create message document
message = {
"room_id": room_id,
"username": username,
"content": message_data["content"],
"type": "message",
"timestamp": datetime.utcnow()
}
# Store in MongoDB
result = messages_collection.insert_one(message)
# Prepare message for broadcast
broadcast_message = {
"_id": str(result.inserted_id),
"room_id": room_id,
"username": username,
"content": message_data["content"],
"type": "message",
"timestamp": message["timestamp"].isoformat()
}
# Broadcast to all clients in room
await manager.broadcast_to_room(broadcast_message, room_id)
except WebSocketDisconnect:
manager.disconnect(websocket, room_id)
# Send leave notification
leave_message = {
"type": "system",
"content": f"{username} left the room",
"timestamp": datetime.utcnow().isoformat(),
"room_id": room_id
}
await manager.broadcast_to_room(leave_message, room_id)
# REST API endpoints
@app.get("/api/rooms")
async def get_rooms():
"""Get all available chat rooms"""
rooms = list(rooms_collection.find({}, {"_id": 0}))
return {
"success": True,
"count": len(rooms),
"data": rooms
}
@app.get("/api/messages/{room_id}")
async def get_messages(room_id: str, limit: int = 50):
"""Get recent messages from a specific room"""
try:
messages = list(
messages_collection.find({"room_id": room_id})
.sort("timestamp", -1)
.limit(limit)
)
# Convert ObjectId and datetime for JSON serialization
for msg in messages:
msg["_id"] = str(msg["_id"])
msg["timestamp"] = msg["timestamp"].isoformat()
# Return in chronological order
messages.reverse()
return {
"success": True,
"count": len(messages),
"room_id": room_id,
"data": messages
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching messages: {str(e)}")
@app.get("/api/stats/{room_id}")
async def get_room_stats(room_id: str):
"""Get statistics for a specific room"""
try:
pipeline = [
{"$match": {"room_id": room_id}},
{"$group": {
"_id": "$username",
"message_count": {"$sum": 1},
"last_message": {"$max": "$timestamp"}
}},
{"$sort": {"message_count": -1}}
]
user_stats = list(messages_collection.aggregate(pipeline))
total_messages = messages_collection.count_documents({"room_id": room_id})
return {
"success": True,
"room_id": room_id,
"total_messages": total_messages,
"user_stats": user_stats
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}")
# Health check endpoint
@app.get("/health")
async def health_check():
try:
# Test database connection
client.admin.command('ping')
return {
"success": True,
"message": "API is running",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"success": False,
"message": "Database connection failed",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# Serve static files and HTML
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def get_chat_interface():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>MongoDB Flex Chat</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
.header { text-align: center; margin-bottom: 20px; color: #333; }
.room-info { background: #e9ecef; padding: 10px; border-radius: 4px; margin-bottom: 20px; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ddd; padding: 15px; margin-bottom: 15px; background: #fafafa; }
.message { margin: 8px 0; padding: 8px; border-radius: 4px; }
.message.user { background: #e3f2fd; }
.message.system { background: #fff3e0; font-style: italic; color: #666; }
.username { font-weight: bold; color: #1976d2; }
.timestamp { font-size: 0.8em; color: #666; margin-left: 10px; }
.input-area { display: flex; gap: 10px; }
#messageInput { flex: 1; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
button { padding: 10px 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background: #1565c0; }
.status { text-align: center; padding: 10px; margin-bottom: 10px; border-radius: 4px; }
.connected { background: #d4edda; color: #155724; }
.disconnected { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>πŸš€ MongoDB Flex Chat</h1>
<div id="status" class="status disconnected">Disconnected</div>
</div>
<div class="room-info">
<strong>Room:</strong> <span id="roomName"></span> |
<strong>User:</strong> <span id="userName"></span>
</div>
<div id="messages"></div>
<div class="input-area">
<input type="text" id="messageInput" placeholder="Type your message..." maxlength="500">
<button onclick="sendMessage()">Send</button>
</div>
</div>
<script>
// Get URL parameters
const urlParams = new URLSearchParams(window.location.search);
const room = urlParams.get('room') || 'general';
const username = urlParams.get('username') || 'Anonymous_' + Math.floor(Math.random() * 1000);
document.getElementById('roomName').textContent = room;
document.getElementById('userName').textContent = username;
// WebSocket connection
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = new WebSocket(`${wsProtocol}//${window.location.host}/ws/${room}/${username}`);
const messages = document.getElementById('messages');
const statusDiv = document.getElementById('status');
ws.onopen = function() {
statusDiv.textContent = 'Connected';
statusDiv.className = 'status connected';
};
ws.onclose = function() {
statusDiv.textContent = 'Disconnected';
statusDiv.className = 'status disconnected';
};
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
addMessage(message);
};
function addMessage(message) {
const div = document.createElement('div');
div.className = `message ${message.type}`;
if (message.type === 'system') {
div.innerHTML = `
<span>${message.content}</span>
<span class="timestamp">${formatTime(message.timestamp)}</span>
`;
} else {
div.innerHTML = `
<span class="username">${message.username}:</span>
<span>${message.content}</span>
<span class="timestamp">${formatTime(message.timestamp)}</span>
`;
}
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
function formatTime(timestamp) {
return new Date(timestamp).toLocaleTimeString();
}
function sendMessage() {
const input = document.getElementById('messageInput');
const content = input.value.trim();
if (content && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({content: content}));
input.value = '';
}
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
// Load previous messages
fetch(`/api/messages/${room}`)
.then(response => response.json())
.then(result => {
if (result.success) {
result.data.forEach(addMessage);
}
})
.catch(error => console.error('Error loading messages:', error));
</script>
</body>
</html>
"""
return html_content
# 404 Handler
@app.exception_handler(404)
async def not_found_handler(request, exc):
return {
"success": False,
"message": "Endpoint not found"
}
# Error Handler
@app.exception_handler(500)
async def internal_error_handler(request, exc):
return {
"success": False,
"message": "Internal server error"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Now you can start your chat application:

Terminal window
python main.py

You should see something like:

Created default chat rooms
Chat application started
Connected to MongoDB: default
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8000 (Press CTRL+C to quit)

Open your browser and navigate to http://localhost:8000?room=general&username=TestUser. You can open multiple browser windows with different usernames to test the real-time chat functionality.

Your chat application is now running and connected to MongoDB Flex! You can:

  • Send and receive messages in real-time
  • Switch between different chat rooms
  • View message history
  • See when users join and leave rooms
  • Check API health and room statistics

The application demonstrates MongoDB’s flexibility in handling real-time data, WebSocket connections, and the powerful aggregation framework for statistics.