Eine Python-Chat-Anwendung mit MongoDB Flex erstellen
In diesem Tutorial implementieren Sie ein reales Szenario mit STACKIT MongoDB Flex. Nach Abschluss aller Schritte verfügen Sie über eine Python-FastAPI-Anwendung, die Chat-Nachrichten in einer MongoDB Flex-Datenbank speichert. Sie lernen, wie Sie eine Python-Anwendung mit MongoDB Flex verbinden und ein Echtzeit-Chat-System unter Verwendung von WebSockets erstellen. Mit dem erworbenen Wissen können Sie moderne Echtzeit-Anwendungen entwickeln, die auf einer soliden und flexiblen Datenbank basieren.
Übersicht
Abschnitt betitelt „Übersicht“Dieses Tutorial ist in die folgenden Abschnitte unterteilt:
-
Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen.
-
Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen.
-
Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet.
-
Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird.
Voraussetzungen
Abschnitt betitelt „Voraussetzungen“-
Sie haben ein STACKIT-Kundenkonto: Kundenkonto erstellen
-
Sie haben ein STACKIT-Benutzerkonto: Benutzerkonto erstellen
-
Sie haben ein STACKIT-Projekt: Projekt erstellen
-
Auf dem Entwicklungssystem haben Sie Zugriff auf eine Shell mit Python 3.8+.
-
Sie können von Ihrem Entwicklungsrechner aus die STACKIT Cloud erreichen.
-
Sie kennen die externe IPv4-Adresse Ihres Client-Systems.
Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen
Abschnitt betitelt „Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen“Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen
Abschnitt betitelt „Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen“python --versionDie Ausgabe sollte etwa so aussehen:
Python 3.12.3Die installierte Version sollte mindestens 3.10.0 entsprechen oder höher sein. Wenn Sie eine ältere Version installiert haben, konsultieren Sie bitte das Handbuch Ihres Systems, um eine neuere Version zu installieren. Nachdem Sie diese Voraussetzung geprüft haben, erstellen Sie einen Ordner für dieses Tutorial:
mkdir python-chat-with-mongodb-flex-tutorialcd python-chat-with-mongodb-flex-tutorialpython -m venv venvsource venv/bin/activate # Unter Windows: venv\Scripts\activatepip install fastapi uvicorn pymongo python-dotenv websocketsDie Ausgabe sollte etwa so aussehen:
Successfully installed fastapi-0.104.1 uvicorn-0.24.0 pymongo-4.6.0 python-dotenv-1.0.0 websockets-12.0Jetzt sind Sie bereit, Ihre kleine Python-Testanwendung zu schreiben.
Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet
Abschnitt betitelt „Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet“Öffnen Sie einen Code-Editor Ihrer Wahl und erstellen Sie eine Datei namens .env mit folgendem Inhalt (fügen Sie die Verbindungszeichenfolge ein, die Sie zuvor gespeichert haben):
MONGODB_URI=mongodb://USERNAME:YOUR-SECRET-PASSWORD@s-f47ac10b-58cc-4372-a567-0e02b2c3d479-0.mongodb.eu01.onstackit.cloud:27017/default?authSource=default&tls=true&authMechanism=SCRAM-SHA-256Erstellen Sie eine weitere Datei namens test_connection.py:
from pymongo import MongoClientfrom dotenv import load_dotenvimport os
load_dotenv()
uri = os.getenv('MONGODB_URI')
def test_connection(): if not uri: print('MONGODB_URI not set!') return
client = MongoClient(uri)
try: print('Connecting to MongoDB Flex...')
# Test connection with ping client.admin.command('ping')
print('Successfully connected!')
# Get database name db_name = client.get_default_database().name print(f'Connected to: {db_name}')
except Exception as error: print(f'Error while connecting to MongoDB Flex: {error}') finally: client.close() print('Connection closed.')
if __name__ == '__main__': test_connection()Führen Sie nun die Mini-Anwendung mit Ihrem Python-Interpreter aus:
python test_connection.pySie sollten Folgendes sehen:
Connecting to MongoDB Flex...Successfully connected!Connected to: defaultConnection closed.Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird
Abschnitt betitelt „Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird“Nachdem Sie alles korrekt eingerichtet haben, können Sie mit der Implementierung der eigentlichen Anwendung beginnen. Um dieses Tutorial auf die Kernkonzepte zu konzentrieren, verzichten wir bewusst auf die Einrichtung der Best-Practice-Ordnerstruktur für Python. Wir empfehlen, diese Best-Practice-Struktur einzuhalten, wenn Sie ein Projekt für den Produktionseinsatz erstellen. Erstellen Sie eine Datei namens main.py:
from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPExceptionfrom fastapi.staticfiles import StaticFilesfrom fastapi.responses import HTMLResponsefrom pymongo import MongoClient, ASCENDINGfrom dotenv import load_dotenvimport osimport jsonfrom datetime import datetimefrom typing import Dict, List
# Load environment variablesload_dotenv()
app = FastAPI(title="MongoDB Flex Chat API")uri = os.getenv('MONGODB_URI')
if not uri: raise ValueError("MONGODB_URI environment variable is required")
# MongoDB connectionclient = MongoClient(uri)db = client.get_default_database()messages_collection = db.messagesrooms_collection = db.rooms
# Create indexes for better performancemessages_collection.create_index([("room_id", ASCENDING), ("timestamp", ASCENDING)])
# WebSocket Connection Managerclass ConnectionManager: def __init__(self): self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_id: str): await websocket.accept() if room_id not in self.active_connections: self.active_connections[room_id] = [] self.active_connections[room_id].append(websocket)
def disconnect(self, websocket: WebSocket, room_id: str): if room_id in self.active_connections: self.active_connections[room_id].remove(websocket) if not self.active_connections[room_id]: del self.active_connections[room_id]
async def broadcast_to_room(self, message: dict, room_id: str): if room_id in self.active_connections: disconnected = [] for connection in self.active_connections[room_id]: try: await connection.send_text(json.dumps(message)) except: disconnected.append(connection)
# Remove disconnected connections for connection in disconnected: self.active_connections[room_id].remove(connection)
manager = ConnectionManager()
# Initialize default roomsdef init_default_rooms(): if rooms_collection.count_documents({}) == 0: default_rooms = [ {"name": "general", "description": "General discussion"}, {"name": "tech", "description": "Technical discussions"}, {"name": "random", "description": "Random conversations"} ] rooms_collection.insert_many(default_rooms) print("Created default chat rooms")
@app.on_event("startup")async def startup_event(): init_default_rooms() print("🚀 Chat application started") print(f"📊 Connected to MongoDB: {db.name}")
# WebSocket endpoint for real-time chat@app.websocket("/ws/{room_id}/{username}")async def websocket_endpoint(websocket: WebSocket, room_id: str, username: str): await manager.connect(websocket, room_id)
# Send join notification join_message = { "type": "system", "content": f"{username} joined the room", "timestamp": datetime.utcnow().isoformat(), "room_id": room_id } await manager.broadcast_to_room(join_message, room_id)
try: while True: data = await websocket.receive_text() message_data = json.loads(data)
# Create message document message = { "room_id": room_id, "username": username, "content": message_data["content"], "type": "message", "timestamp": datetime.utcnow() }
# Store in MongoDB result = messages_collection.insert_one(message)
# Prepare message for broadcast broadcast_message = { "_id": str(result.inserted_id), "room_id": room_id, "username": username, "content": message_data["content"], "type": "message", "timestamp": message["timestamp"].isoformat() }
# Broadcast to all clients in room await manager.broadcast_to_room(broadcast_message, room_id)
except WebSocketDisconnect: manager.disconnect(websocket, room_id)
# Send leave notification leave_message = { "type": "system", "content": f"{username} left the room", "timestamp": datetime.utcnow().isoformat(), "room_id": room_id } await manager.broadcast_to_room(leave_message, room_id)
# REST API endpoints@app.get("/api/rooms")async def get_rooms(): """Get all available chat rooms""" rooms = list(rooms_collection.find({}, {"_id": 0})) return { "success": True, "count": len(rooms), "data": rooms }
@app.get("/api/messages/{room_id}")async def get_messages(room_id: str, limit: int = 50): """Get recent messages from a specific room""" try: messages = list( messages_collection.find({"room_id": room_id}) .sort("timestamp", -1) .limit(limit) )
# Convert ObjectId and datetime for JSON serialization for msg in messages: msg["_id"] = str(msg["_id"]) msg["timestamp"] = msg["timestamp"].isoformat()
# Return in chronological order messages.reverse()
return { "success": True, "count": len(messages), "room_id": room_id, "data": messages } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching messages: {str(e)}")
@app.get("/api/stats/{room_id}")async def get_room_stats(room_id: str): """Get statistics for a specific room""" try: pipeline = [ {"$match": {"room_id": room_id}}, {"$group": { "_id": "$username", "message_count": {"$sum": 1}, "last_message": {"$max": "$timestamp"} }}, {"$sort": {"message_count": -1}} ]
user_stats = list(messages_collection.aggregate(pipeline)) total_messages = messages_collection.count_documents({"room_id": room_id})
return { "success": True, "room_id": room_id, "total_messages": total_messages, "user_stats": user_stats } except Exception as e: raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}")
# Health check endpoint@app.get("/health")async def health_check(): try: # Test database connection client.admin.command('ping') return { "success": True, "message": "API is running", "database": "connected", "timestamp": datetime.utcnow().isoformat() } except Exception as e: return { "success": False, "message": "Database connection failed", "error": str(e), "timestamp": datetime.utcnow().isoformat() }
# Serve static files and HTMLapp.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)async def get_chat_interface(): html_content = """ <!DOCTYPE html> <html> <head> <title>MongoDB Flex Chat</title> <style> body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; } .container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); } .header { text-align: center; margin-bottom: 20px; color: #333; } .room-info { background: #e9ecef; padding: 10px; border-radius: 4px; margin-bottom: 20px; } #messages { height: 400px; overflow-y: scroll; border: 1px solid #ddd; padding: 15px; margin-bottom: 15px; background: #fafafa; } .message { margin: 8px 0; padding: 8px; border-radius: 4px; } .message.user { background: #e3f2fd; } .message.system { background: #fff3e0; font-style: italic; color: #666; } .username { font-weight: bold; color: #1976d2; } .timestamp { font-size: 0.8em; color: #666; margin-left: 10px; } .input-area { display: flex; gap: 10px; } #messageInput { flex: 1; padding: 10px; border: 1px solid #ddd; border-radius: 4px; } button { padding: 10px 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer; } button:hover { background: #1565c0; } .status { text-align: center; padding: 10px; margin-bottom: 10px; border-radius: 4px; } .connected { background: #d4edda; color: #155724; } .disconnected { background: #f8d7da; color: #721c24; } </style> </head> <body> <div class="container"> <div class="header"> <h1>🚀 MongoDB Flex Chat</h1> <div id="status" class="status disconnected">Disconnected</div> </div>
<div class="room-info"> <strong>Room:</strong> <span id="roomName"></span> | <strong>User:</strong> <span id="userName"></span> </div>
<div id="messages"></div>
<div class="input-area"> <input type="text" id="messageInput" placeholder="Type your message..." maxlength="500"> <button onclick="sendMessage()">Send</button> </div> </div>
<script> // Get URL parameters const urlParams = new URLSearchParams(window.location.search); const room = urlParams.get('room') || 'general'; const username = urlParams.get('username') || 'Anonymous_' + Math.floor(Math.random() * 1000);
document.getElementById('roomName').textContent = room; document.getElementById('userName').textContent = username;
// WebSocket connection const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:'; const ws = new WebSocket(`${wsProtocol}//${window.location.host}/ws/${room}/${username}`); const messages = document.getElementById('messages'); const statusDiv = document.getElementById('status');
ws.onopen = function() { statusDiv.textContent = 'Connected'; statusDiv.className = 'status connected'; };
ws.onclose = function() { statusDiv.textContent = 'Disconnected'; statusDiv.className = 'status disconnected'; };
ws.onmessage = function(event) { const message = JSON.parse(event.data); addMessage(message); };
function addMessage(message) { const div = document.createElement('div'); div.className = `message ${message.type}`;
if (message.type === 'system') { div.innerHTML = ` <span>${message.content}</span> <span class="timestamp">${formatTime(message.timestamp)}</span> `; } else { div.innerHTML = ` <span class="username">${message.username}:</span> <span>${message.content}</span> <span class="timestamp">${formatTime(message.timestamp)}</span> `; }
messages.appendChild(div); messages.scrollTop = messages.scrollHeight; }
function formatTime(timestamp) { return new Date(timestamp).toLocaleTimeString(); }
function sendMessage() { const input = document.getElementById('messageInput'); const content = input.value.trim();
if (content && ws.readyState === WebSocket.OPEN) { ws.send(JSON.stringify({content: content})); input.value = ''; } }
document.getElementById('messageInput').addEventListener('keypress', function(e) { if (e.key === 'Enter') { sendMessage(); } });
// Load previous messages fetch(`/api/messages/${room}`) .then(response => response.json()) .then(result => { if (result.success) { result.data.forEach(addMessage); } }) .catch(error => console.error('Error loading messages:', error)); </script> </body> </html> """ return html_content
# 404 Handler@app.exception_handler(404)async def not_found_handler(request, exc): return { "success": False, "message": "Endpoint not found" }
# Error Handler@app.exception_handler(500)async def internal_error_handler(request, exc): return { "success": False, "message": "Internal server error" }
if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000)Jetzt können Sie Ihre Chat-Anwendung starten:
python main.pySie sollten Folgendes sehen:
Created default chat roomsChat application startedConnected to MongoDB: defaultINFO: Started server process [12345]INFO: Waiting for application startup.INFO: Application startup complete.INFO: Uvicorn running on [http://0.0.0.0:8000](http://0.0.0.0:8000) (Press CTRL+C to quit)Öffnen Sie Ihren Browser und navigieren Sie zu http://localhost:8000?room=general&username=TestUser. Sie können mehrere Browserfenster mit verschiedenen Benutzernamen öffnen, um die Echtzeit-Chat-Funktionalität zu testen.
Ihre Chat-Anwendung läuft nun und ist mit MongoDB Flex verbunden! Sie können:
- Nachrichten in Echtzeit senden und empfangen
- Zwischen verschiedenen Chat-Räumen wechseln
- Den Nachrichtenverlauf einsehen
- Sehen, wann Benutzer Räume betreten und verlassen
- Den API-Status und die Raumstatistiken überprüfen
Die Anwendung demonstriert die Flexibilität von MongoDB bei der Verarbeitung von Echtzeitdaten, WebSocket-Verbindungen und das leistungsstarke Aggregation-Framework für Statistiken.