Zum Inhalt springen

Eine Python-Chat-Anwendung mit MongoDB Flex erstellen

In diesem Tutorial implementieren Sie ein reales Szenario mit STACKIT MongoDB Flex. Nach Abschluss aller Schritte verfügen Sie über eine Python-FastAPI-Anwendung, die Chat-Nachrichten in einer MongoDB Flex-Datenbank speichert. Sie lernen, wie Sie eine Python-Anwendung mit MongoDB Flex verbinden und ein Echtzeit-Chat-System unter Verwendung von WebSockets erstellen. Mit dem erworbenen Wissen können Sie moderne Echtzeit-Anwendungen entwickeln, die auf einer soliden und flexiblen Datenbank basieren.

Dieses Tutorial ist in die folgenden Abschnitte unterteilt:

  1. Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen.

  2. Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen.

  3. Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet.

  4. Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird.

  • Sie haben ein STACKIT-Kundenkonto: Kundenkonto erstellen

  • Sie haben ein STACKIT-Benutzerkonto: Benutzerkonto erstellen

  • Sie haben ein STACKIT-Projekt: Projekt erstellen

  • Auf dem Entwicklungssystem haben Sie Zugriff auf eine Shell mit Python 3.8+.

  • Sie können von Ihrem Entwicklungsrechner aus die STACKIT Cloud erreichen.

  • Sie kennen die externe IPv4-Adresse Ihres Client-Systems.

Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen

Abschnitt betitelt „Erstellen Sie eine MongoDB Flex-Instanz mit einem Benutzer, den entsprechenden Rechten und ACL-Einstellungen“

Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen

Abschnitt betitelt „Initialisieren Sie eine Umgebung zum Ausführen von Python mit den benötigten Paketen“
Terminal-Fenster
python --version

Die Ausgabe sollte etwa so aussehen:

Python 3.12.3

Die installierte Version sollte mindestens 3.10.0 entsprechen oder höher sein. Wenn Sie eine ältere Version installiert haben, konsultieren Sie bitte das Handbuch Ihres Systems, um eine neuere Version zu installieren. Nachdem Sie diese Voraussetzung geprüft haben, erstellen Sie einen Ordner für dieses Tutorial:

mkdir python-chat-with-mongodb-flex-tutorial
cd python-chat-with-mongodb-flex-tutorial
python -m venv venv
source venv/bin/activate # Unter Windows: venv\Scripts\activate
pip install fastapi uvicorn pymongo python-dotenv websockets

Die Ausgabe sollte etwa so aussehen:

Successfully installed fastapi-0.104.1 uvicorn-0.24.0 pymongo-4.6.0 python-dotenv-1.0.0 websockets-12.0

Jetzt sind Sie bereit, Ihre kleine Python-Testanwendung zu schreiben.

Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet

Abschnitt betitelt „Schreiben Sie eine kleine Python-Anwendung, welche die Verbindung zur Datenbank testet“

Öffnen Sie einen Code-Editor Ihrer Wahl und erstellen Sie eine Datei namens .env mit folgendem Inhalt (fügen Sie die Verbindungszeichenfolge ein, die Sie zuvor gespeichert haben):

MONGODB_URI=mongodb://USERNAME:YOUR-SECRET-PASSWORD@s-f47ac10b-58cc-4372-a567-0e02b2c3d479-0.mongodb.eu01.onstackit.cloud:27017/default?authSource=default&tls=true&authMechanism=SCRAM-SHA-256

Erstellen Sie eine weitere Datei namens test_connection.py:

from pymongo import MongoClient
from dotenv import load_dotenv
import os
load_dotenv()
uri = os.getenv('MONGODB_URI')
def test_connection():
if not uri:
print('MONGODB_URI not set!')
return
client = MongoClient(uri)
try:
print('Connecting to MongoDB Flex...')
# Test connection with ping
client.admin.command('ping')
print('Successfully connected!')
# Get database name
db_name = client.get_default_database().name
print(f'Connected to: {db_name}')
except Exception as error:
print(f'Error while connecting to MongoDB Flex: {error}')
finally:
client.close()
print('Connection closed.')
if __name__ == '__main__':
test_connection()

Führen Sie nun die Mini-Anwendung mit Ihrem Python-Interpreter aus:

Terminal-Fenster
python test_connection.py

Sie sollten Folgendes sehen:

Connecting to MongoDB Flex...
Successfully connected!
Connected to: default
Connection closed.

Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird

Abschnitt betitelt „Erstellen Sie eine Echtzeit-Chat-Anwendung, die von MongoDB Flex unterstützt wird“

Nachdem Sie alles korrekt eingerichtet haben, können Sie mit der Implementierung der eigentlichen Anwendung beginnen. Um dieses Tutorial auf die Kernkonzepte zu konzentrieren, verzichten wir bewusst auf die Einrichtung der Best-Practice-Ordnerstruktur für Python. Wir empfehlen, diese Best-Practice-Struktur einzuhalten, wenn Sie ein Projekt für den Produktionseinsatz erstellen. Erstellen Sie eine Datei namens main.py:

from fastapi import FastAPI, WebSocket, WebSocketDisconnect, HTTPException
from fastapi.staticfiles import StaticFiles
from fastapi.responses import HTMLResponse
from pymongo import MongoClient, ASCENDING
from dotenv import load_dotenv
import os
import json
from datetime import datetime
from typing import Dict, List
# Load environment variables
load_dotenv()
app = FastAPI(title="MongoDB Flex Chat API")
uri = os.getenv('MONGODB_URI')
if not uri:
raise ValueError("MONGODB_URI environment variable is required")
# MongoDB connection
client = MongoClient(uri)
db = client.get_default_database()
messages_collection = db.messages
rooms_collection = db.rooms
# Create indexes for better performance
messages_collection.create_index([("room_id", ASCENDING), ("timestamp", ASCENDING)])
# WebSocket Connection Manager
class ConnectionManager:
def __init__(self):
self.active_connections: Dict[str, List[WebSocket]] = {}
async def connect(self, websocket: WebSocket, room_id: str):
await websocket.accept()
if room_id not in self.active_connections:
self.active_connections[room_id] = []
self.active_connections[room_id].append(websocket)
def disconnect(self, websocket: WebSocket, room_id: str):
if room_id in self.active_connections:
self.active_connections[room_id].remove(websocket)
if not self.active_connections[room_id]:
del self.active_connections[room_id]
async def broadcast_to_room(self, message: dict, room_id: str):
if room_id in self.active_connections:
disconnected = []
for connection in self.active_connections[room_id]:
try:
await connection.send_text(json.dumps(message))
except:
disconnected.append(connection)
# Remove disconnected connections
for connection in disconnected:
self.active_connections[room_id].remove(connection)
manager = ConnectionManager()
# Initialize default rooms
def init_default_rooms():
if rooms_collection.count_documents({}) == 0:
default_rooms = [
{"name": "general", "description": "General discussion"},
{"name": "tech", "description": "Technical discussions"},
{"name": "random", "description": "Random conversations"}
]
rooms_collection.insert_many(default_rooms)
print("Created default chat rooms")
@app.on_event("startup")
async def startup_event():
init_default_rooms()
print("🚀 Chat application started")
print(f"📊 Connected to MongoDB: {db.name}")
# WebSocket endpoint for real-time chat
@app.websocket("/ws/{room_id}/{username}")
async def websocket_endpoint(websocket: WebSocket, room_id: str, username: str):
await manager.connect(websocket, room_id)
# Send join notification
join_message = {
"type": "system",
"content": f"{username} joined the room",
"timestamp": datetime.utcnow().isoformat(),
"room_id": room_id
}
await manager.broadcast_to_room(join_message, room_id)
try:
while True:
data = await websocket.receive_text()
message_data = json.loads(data)
# Create message document
message = {
"room_id": room_id,
"username": username,
"content": message_data["content"],
"type": "message",
"timestamp": datetime.utcnow()
}
# Store in MongoDB
result = messages_collection.insert_one(message)
# Prepare message for broadcast
broadcast_message = {
"_id": str(result.inserted_id),
"room_id": room_id,
"username": username,
"content": message_data["content"],
"type": "message",
"timestamp": message["timestamp"].isoformat()
}
# Broadcast to all clients in room
await manager.broadcast_to_room(broadcast_message, room_id)
except WebSocketDisconnect:
manager.disconnect(websocket, room_id)
# Send leave notification
leave_message = {
"type": "system",
"content": f"{username} left the room",
"timestamp": datetime.utcnow().isoformat(),
"room_id": room_id
}
await manager.broadcast_to_room(leave_message, room_id)
# REST API endpoints
@app.get("/api/rooms")
async def get_rooms():
"""Get all available chat rooms"""
rooms = list(rooms_collection.find({}, {"_id": 0}))
return {
"success": True,
"count": len(rooms),
"data": rooms
}
@app.get("/api/messages/{room_id}")
async def get_messages(room_id: str, limit: int = 50):
"""Get recent messages from a specific room"""
try:
messages = list(
messages_collection.find({"room_id": room_id})
.sort("timestamp", -1)
.limit(limit)
)
# Convert ObjectId and datetime for JSON serialization
for msg in messages:
msg["_id"] = str(msg["_id"])
msg["timestamp"] = msg["timestamp"].isoformat()
# Return in chronological order
messages.reverse()
return {
"success": True,
"count": len(messages),
"room_id": room_id,
"data": messages
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching messages: {str(e)}")
@app.get("/api/stats/{room_id}")
async def get_room_stats(room_id: str):
"""Get statistics for a specific room"""
try:
pipeline = [
{"$match": {"room_id": room_id}},
{"$group": {
"_id": "$username",
"message_count": {"$sum": 1},
"last_message": {"$max": "$timestamp"}
}},
{"$sort": {"message_count": -1}}
]
user_stats = list(messages_collection.aggregate(pipeline))
total_messages = messages_collection.count_documents({"room_id": room_id})
return {
"success": True,
"room_id": room_id,
"total_messages": total_messages,
"user_stats": user_stats
}
except Exception as e:
raise HTTPException(status_code=500, detail=f"Error fetching stats: {str(e)}")
# Health check endpoint
@app.get("/health")
async def health_check():
try:
# Test database connection
client.admin.command('ping')
return {
"success": True,
"message": "API is running",
"database": "connected",
"timestamp": datetime.utcnow().isoformat()
}
except Exception as e:
return {
"success": False,
"message": "Database connection failed",
"error": str(e),
"timestamp": datetime.utcnow().isoformat()
}
# Serve static files and HTML
app.mount("/static", StaticFiles(directory="static"), name="static")
@app.get("/", response_class=HTMLResponse)
async def get_chat_interface():
html_content = """
<!DOCTYPE html>
<html>
<head>
<title>MongoDB Flex Chat</title>
<style>
body { font-family: Arial, sans-serif; margin: 20px; background: #f5f5f5; }
.container { max-width: 800px; margin: 0 auto; background: white; padding: 20px; border-radius: 8px; box-shadow: 0 2px 10px rgba(0,0,0,0.1); }
.header { text-align: center; margin-bottom: 20px; color: #333; }
.room-info { background: #e9ecef; padding: 10px; border-radius: 4px; margin-bottom: 20px; }
#messages { height: 400px; overflow-y: scroll; border: 1px solid #ddd; padding: 15px; margin-bottom: 15px; background: #fafafa; }
.message { margin: 8px 0; padding: 8px; border-radius: 4px; }
.message.user { background: #e3f2fd; }
.message.system { background: #fff3e0; font-style: italic; color: #666; }
.username { font-weight: bold; color: #1976d2; }
.timestamp { font-size: 0.8em; color: #666; margin-left: 10px; }
.input-area { display: flex; gap: 10px; }
#messageInput { flex: 1; padding: 10px; border: 1px solid #ddd; border-radius: 4px; }
button { padding: 10px 20px; background: #1976d2; color: white; border: none; border-radius: 4px; cursor: pointer; }
button:hover { background: #1565c0; }
.status { text-align: center; padding: 10px; margin-bottom: 10px; border-radius: 4px; }
.connected { background: #d4edda; color: #155724; }
.disconnected { background: #f8d7da; color: #721c24; }
</style>
</head>
<body>
<div class="container">
<div class="header">
<h1>🚀 MongoDB Flex Chat</h1>
<div id="status" class="status disconnected">Disconnected</div>
</div>
<div class="room-info">
<strong>Room:</strong> <span id="roomName"></span> |
<strong>User:</strong> <span id="userName"></span>
</div>
<div id="messages"></div>
<div class="input-area">
<input type="text" id="messageInput" placeholder="Type your message..." maxlength="500">
<button onclick="sendMessage()">Send</button>
</div>
</div>
<script>
// Get URL parameters
const urlParams = new URLSearchParams(window.location.search);
const room = urlParams.get('room') || 'general';
const username = urlParams.get('username') || 'Anonymous_' + Math.floor(Math.random() * 1000);
document.getElementById('roomName').textContent = room;
document.getElementById('userName').textContent = username;
// WebSocket connection
const wsProtocol = window.location.protocol === 'https:' ? 'wss:' : 'ws:';
const ws = new WebSocket(`${wsProtocol}//${window.location.host}/ws/${room}/${username}`);
const messages = document.getElementById('messages');
const statusDiv = document.getElementById('status');
ws.onopen = function() {
statusDiv.textContent = 'Connected';
statusDiv.className = 'status connected';
};
ws.onclose = function() {
statusDiv.textContent = 'Disconnected';
statusDiv.className = 'status disconnected';
};
ws.onmessage = function(event) {
const message = JSON.parse(event.data);
addMessage(message);
};
function addMessage(message) {
const div = document.createElement('div');
div.className = `message ${message.type}`;
if (message.type === 'system') {
div.innerHTML = `
<span>${message.content}</span>
<span class="timestamp">${formatTime(message.timestamp)}</span>
`;
} else {
div.innerHTML = `
<span class="username">${message.username}:</span>
<span>${message.content}</span>
<span class="timestamp">${formatTime(message.timestamp)}</span>
`;
}
messages.appendChild(div);
messages.scrollTop = messages.scrollHeight;
}
function formatTime(timestamp) {
return new Date(timestamp).toLocaleTimeString();
}
function sendMessage() {
const input = document.getElementById('messageInput');
const content = input.value.trim();
if (content && ws.readyState === WebSocket.OPEN) {
ws.send(JSON.stringify({content: content}));
input.value = '';
}
}
document.getElementById('messageInput').addEventListener('keypress', function(e) {
if (e.key === 'Enter') {
sendMessage();
}
});
// Load previous messages
fetch(`/api/messages/${room}`)
.then(response => response.json())
.then(result => {
if (result.success) {
result.data.forEach(addMessage);
}
})
.catch(error => console.error('Error loading messages:', error));
</script>
</body>
</html>
"""
return html_content
# 404 Handler
@app.exception_handler(404)
async def not_found_handler(request, exc):
return {
"success": False,
"message": "Endpoint not found"
}
# Error Handler
@app.exception_handler(500)
async def internal_error_handler(request, exc):
return {
"success": False,
"message": "Internal server error"
}
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)

Jetzt können Sie Ihre Chat-Anwendung starten:

Terminal-Fenster
python main.py

Sie sollten Folgendes sehen:

Created default chat rooms
Chat application started
Connected to MongoDB: default
INFO: Started server process [12345]
INFO: Waiting for application startup.
INFO: Application startup complete.
INFO: Uvicorn running on [http://0.0.0.0:8000](http://0.0.0.0:8000) (Press CTRL+C to quit)

Öffnen Sie Ihren Browser und navigieren Sie zu http://localhost:8000?room=general&username=TestUser. Sie können mehrere Browserfenster mit verschiedenen Benutzernamen öffnen, um die Echtzeit-Chat-Funktionalität zu testen.

Ihre Chat-Anwendung läuft nun und ist mit MongoDB Flex verbunden! Sie können:

  • Nachrichten in Echtzeit senden und empfangen
  • Zwischen verschiedenen Chat-Räumen wechseln
  • Den Nachrichtenverlauf einsehen
  • Sehen, wann Benutzer Räume betreten und verlassen
  • Den API-Status und die Raumstatistiken überprüfen

Die Anwendung demonstriert die Flexibilität von MongoDB bei der Verarbeitung von Echtzeitdaten, WebSocket-Verbindungen und das leistungsstarke Aggregation-Framework für Statistiken.