Postgres-Änderungen an Dremio streamen
Überblick
Abschnitt betitelt „Überblick“Diese Anleitung erklärt, wie Sie Debezium konfigurieren, um eine lokale PostgreSQL-Datenbank zu überwachen, alle Änderungen auf Zeilenebene (INSERTs, UPDATEs, DELETEs) zu erfassen und diese Ereignisse direkt an STACKIT Intake zu streamen. Intake führt diese automatisch in eine Dremio-Iceberg-Tabelle für Echtzeitanalysen zusammen.
Voraussetzungen
Abschnitt betitelt „Voraussetzungen“Bevor Sie mit der Einrichtung der Pipeline beginnen, stellen Sie sicher, dass folgende Voraussetzungen erfüllt sind:
- Sie verfügen über ein STACKIT-Projekt und die erforderlichen Berechtigungen (
Intake.Admin,Intake.EditoroderIntake.Owner). - Sie haben die STACKIT CLI installiert und konfiguriert: Benutzerhandbuch
- Sie haben Zugriff auf eine Dremio-Instanz und besitzen ein Personal Access Token (PAT).
- Sie haben Docker und Docker Compose auf Ihrem lokalen Rechner installiert.
Schritt-für-Schritt-Konfiguration
Abschnitt betitelt „Schritt-für-Schritt-Konfiguration“Richten Sie Ihre STACKIT Intake-Umgebung ein
Abschnitt betitelt „Richten Sie Ihre STACKIT Intake-Umgebung ein“Sie müssen die Infrastruktur bereitstellen, die den Datenstrom empfangen wird. Dies umfasst das Erstellen eines Runners, eines Intakes und eines Intake-Benutzers.
-
Einen Intake Runner erstellen
Definieren Sie zunächst die Engine für Ihre Datenerfassung.
Inhalt folgt in Kürze.
Flag Beschreibung Standard Optional display-nameEin lesbarer Name für den Intake Runner. max-message-size-kibDie maximale Größe einer einzelnen Nachricht in Kibibyte. max-messages-per-hourDie maximale Anzahl an Nachrichten, die der Runner pro Stunde verarbeiten kann. project-idDie ID Ihres STACKIT-Projekts. regionDie Region für den Runner (z. B. eu01).Erstellungsbefehl ausführen
Abschnitt betitelt „Erstellungsbefehl ausführen“Um Ihren Intake Runner zu erstellen, führen Sie den Befehl mit den entsprechenden Flags aus:
Terminal-Fenster stackit beta intake runner create --display-name <DISPLAY_NAME> --max-message-size-kib <SIZE> --max-messages-per-hour <RATE> --project-id <PROJECT_ID> --region eu01Bestätigen Sie nach dem Ausführen des Befehls mit y. Wenn die Instanz erfolgreich erstellt wurde, gibt die CLI die Instanz-ID zurück.
Beispiel
Abschnitt betitelt „Beispiel“Terminal-Fenster stackit beta intake runner create \--display-name "debezium-pg-runner" \--max-message-size-kib 1000 \--max-messages-per-hour 1000 \--project-id "1234-5678-90ab-cdef" \--region eu01Ausgabe:
Created Intake Runner for project "...". Runner ID: af1b6d5b-9dc5-4dee-ab48-e944a7a17a2bBootstrap Server: af1b6d5b.intake.eu01.onstackit.cloud:9094 -
Einen Intake erstellen
Der Intake fungiert als Datenleitung, die den Runner mit Ihrer Dremio-Tabelle verbindet.
Inhalt folgt in Kürze.
Flag Beschreibung runner-idDie ID des im vorherigen Schritt erstellten Intake Runners. catalog-uriDer Dremio Catalog URI. catalog-warehouseDer Name des Dremio-Warehouses. catalog-table-nameDer Name der Ziel-Tabelle in Dremio. dremio-patIhr Dremio Personal Access Token. dremio-token-endpointDer Dremio-Authentifizierungsendpunkt. Erstellungsbefehl ausführen
Abschnitt betitelt „Erstellungsbefehl ausführen“Um Ihren Intake zu erstellen, führen Sie den folgenden Befehl aus:
Terminal-Fenster stackit beta intake create --runner-id <RUNNER_ID> --display-name <NAME> --catalog-uri <URI> --catalog-table-name <TABLE> --dremio-pat <PAT> ...Beispiel
Abschnitt betitelt „Beispiel“Terminal-Fenster stackit beta intake create \--display-name "debezium-intake" \--runner-id "af1b6d5b-9dc5-4dee-ab48-e944a7a17a2b" \--catalog-uri "[https://catalog.dremio](https://catalog.dremio)..." \--catalog-warehouse "catalog-s3" \--catalog-table-name "debezium_pg_users" \--catalog-auth-type "dremio" \--dremio-token-endpoint "[https://oauth.dremio](https://oauth.dremio)..." \--dremio-pat "MY_SECRET_PAT" \--project-id "1234-5678-90ab-cdef" \--region eu01Ausgabe:
Created Intake for Runner "...". Intake ID: 17802315-32c2-48ce-a33d-3043d8aec89bTopic: intake-17802315-32c2-48ce-a33d-3043d8aec89b -
Einen Intake-Benutzer erstellen
Sie müssen einen dedizierten Benutzer erstellen, damit Debezium sich gegenüber dem Intake authentifizieren kann.
Inhalt folgt in Kürze.
Flag Beschreibung intake-idDie ID des im vorherigen Schritt erstellten Intakes. display-nameEin Name für den Benutzer. passwordEin starkes Passwort (mind. 12 Zeichen, Groß- und Kleinschreibung, Zahlen, Sonderzeichen). typeMuss auf intakegesetzt sein.Erstellungsbefehl ausführen
Abschnitt betitelt „Erstellungsbefehl ausführen“Terminal-Fenster stackit beta intake user create --intake-id <INTAKE_ID> --display-name <NAME> --password <PASSWORD> --type intakeBeispiel
Abschnitt betitelt „Beispiel“Terminal-Fenster stackit beta intake user create \--intake-id "17802315-32c2-48ce-a33d-3043d8aec89b" \--display-name "debezium-connector-user" \--password "SuperSaf3Password!" \--type "intake" \--project-id "1234-5678-90ab-cdef" \--region eu01Ausgabe:
Created Intake user... User ID: e9e65d48...Username: intake-user-e9e65d48...
Definieren Sie Ihre Dremio-Ziel-Tabelle (Empfohlen)
Abschnitt betitelt „Definieren Sie Ihre Dremio-Ziel-Tabelle (Empfohlen)“Um einen reibungslosen Erfassungsprozess zu gewährleisten, empfehlen wir, die Ziel-Tabelle in Dremio vorab zu erstellen. In diesem Tutorial wird Debezium so konfiguriert, dass eine vereinfachte JSON-Payload gesendet wird, die diesem Schema entspricht.
-
Öffnen Sie Ihre Dremio-Benutzeroberfläche.
-
Navigieren Sie zu Ihrer Quelle (z. B.
catalog-s3). -
Führen Sie folgendes SQL aus, um die Tabelle zu erstellen. Stellen Sie sicher, dass der Tabellenname mit dem in Schritt 1 verwendeten
catalog-table-nameübereinstimmt.CREATE TABLE "catalog-s3"."intake"."debezium_pg_users" (id INT,name VARCHAR,email VARCHAR)
Lokale Postgres- und Kafka Connect-Dateien einrichten
Abschnitt betitelt „Lokale Postgres- und Kafka Connect-Dateien einrichten“Sie erstellen nun die lokalen Konfigurationsdateien, die erforderlich sind, um den Connector im Standalone-Modus auszuführen.
Dateistruktur
- intake-debezium-tutorial/ - connect-data/ - postgres-init/ - init.sql - connect-log4j.properties - connect-standalone.properties - docker-compose.yml - pg-connector.properties-
Projektverzeichnisse erstellen:
Terminal-Fenster mkdir -p intake-debezium-tutorial/postgres-init intake-debezium-tutorial/connect-datacd intake-debezium-tutorial -
postgres-init/init.sqlerstellen: Dieses Skript initialisiert die Datenbank und aktiviert die logische Replikation.CREATE TABLE public.users (id SERIAL PRIMARY KEY,name VARCHAR(100),email VARCHAR(100));INSERT INTO public.users (name, email) VALUES ('Alice', 'alice@example.com');ALTER TABLE public.users REPLICA IDENTITY FULL; -
connect-log4j.propertieserstellen:log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -
docker-compose.ymlerstellen:services:postgres:image: debezium/postgres:15-alpineplatform: linux/amd64container_name: postgres_cdcports:- "5432:5432"environment:- POSTGRES_USER=postgres- POSTGRES_PASSWORD=postgres- POSTGRES_DB=testdbvolumes:- ./postgres-init:/docker-entrypoint-initdb.d- pg-data:/var/lib/postgresql/datanetworks:- cdc-networkconnect:image: debezium/connect:3.0.0.Finalplatform: linux/amd64container_name: connect_cdcdepends_on:- postgrescommand: >/kafka/bin/connect-standalone.sh/kafka/config/connect-standalone.properties/connect/pg-connector.propertiesenvironment:- CONNECT_REST_ADVERTISED_HOST_NAME=localhost- CONNECT_REST_ADVERTISED_PORT=8083volumes:- ./connect-data:/connect- ./connect-log4j.properties:/kafka/config/connect-log4j.properties- ./connect-standalone.properties:/kafka/config/connect-standalone.properties- ./pg-connector.properties:/connect/pg-connector.propertiesnetworks:- cdc-networkvolumes:pg-data:networks:cdc-network:driver: bridge -
connect-standalone.propertieserstellen:Ersetzen Sie
<BOOTSTRAP_SERVER>durch den URI aus Schritt 1 in Richten Sie Ihre STACKIT Intake-Umgebung ein und<USERNAME>/<PASSWORD>durch die Zugangsdaten aus Schritt 3 in Richten Sie Ihre STACKIT Intake-Umgebung ein.bootstrap.servers=<BOOTSTRAP_SERVER>plugin.path=/kafka/connectkey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterconfig.storage.class=org.apache.kafka.connect.storage.FileConfigBackingStoreoffset.storage.class=org.apache.kafka.connect.storage.FileOffsetBackingStorestatus.storage.class=org.apache.kafka.connect.storage.FileStatusBackingStoreconfig.storage.file.filename=/connect/standalone-configs.datoffset.storage.file.filename=/connect/standalone-offsets.datstatus.storage.file.filename=/connect/standalone-status.datoffset.flush.interval.ms=10000security.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-512sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";producer.security.protocol=SASL_SSLproducer.sasl.mechanism=SCRAM-SHA-512producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>"; -
pg-connector.propertieserstellen:Ersetzen Sie
<INTAKE_TOPIC>durch den Topic-Namen aus Schritt 2 in Richten Sie Ihre STACKIT Intake-Umgebung ein.name=postgres-source-connectorconnector.class=io.debezium.connector.postgresql.PostgresConnectordatabase.hostname=postgresdatabase.port=5432database.user=postgresdatabase.password=postgresdatabase.dbname=testdbdatabase.server.name=my_postgres_servertopic.prefix=postgres_serverplugin.name=pgoutputtable.include.list=public.userskey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsetransforms=extractKey, unwrap, routetransforms.extractKey.type=org.apache.kafka.connect.transforms.ExtractField$Keytransforms.extractKey.field=idtransforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStatetransforms.unwrap.drop.tombstones=falsetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=.*transforms.route.replacement=<INTAKE_TOPIC>
Pipeline starten
Abschnitt betitelt „Pipeline starten“Starten Sie die Container aus Ihrem Verzeichnis intake-debezium-tutorial:
docker-compose up -dÜberwachen Sie die Logs, um sicherzustellen, dass die Verbindung erfolgreich hergestellt wurde:
docker-compose logs -f connect_cdcEnd-to-End-Datenfluss überprüfen
Abschnitt betitelt „End-to-End-Datenfluss überprüfen“- Dremio prüfen: Fragen Sie Ihre Tabelle in Dremio ab. Sie sollten den ersten Datensatz „Alice“ sehen.
- Postgres aktualisieren: Öffnen Sie eine Shell im Datenbank-Container:
Führen Sie einige Änderungen aus:
Terminal-Fenster docker exec -it postgres_cdc psql -U postgres -d testdbINSERT INTO public.users (name, email) VALUES ('Bob', 'bob@example.com');UPDATE public.users SET email = 'alice.new@example.com' WHERE name = 'Alice'; - Verifizieren: Prüfen Sie Dremio erneut. Die Änderungen sollten in der Iceberg-Tabelle reflektiert werden.