Zum Inhalt springen

Postgres-Änderungen an Dremio streamen

Diese Anleitung erklärt, wie Sie Debezium konfigurieren, um eine lokale PostgreSQL-Datenbank zu überwachen, alle Änderungen auf Zeilenebene (INSERTs, UPDATEs, DELETEs) zu erfassen und diese Ereignisse direkt an STACKIT Intake zu streamen. Intake führt diese automatisch in eine Dremio-Iceberg-Tabelle für Echtzeitanalysen zusammen.

Bevor Sie mit der Einrichtung der Pipeline beginnen, stellen Sie sicher, dass folgende Voraussetzungen erfüllt sind:

  • Sie verfügen über ein STACKIT-Projekt und die erforderlichen Berechtigungen (Intake.Admin, Intake.Editor oder Intake.Owner).
  • Sie haben die STACKIT CLI installiert und konfiguriert: Benutzerhandbuch
  • Sie haben Zugriff auf eine Dremio-Instanz und besitzen ein Personal Access Token (PAT).
  • Sie haben Docker und Docker Compose auf Ihrem lokalen Rechner installiert.

Sie müssen die Infrastruktur bereitstellen, die den Datenstrom empfangen wird. Dies umfasst das Erstellen eines Runners, eines Intakes und eines Intake-Benutzers.

  1. Einen Intake Runner erstellen

    Definieren Sie zunächst die Engine für Ihre Datenerfassung.

    Inhalt folgt in Kürze.

  2. Einen Intake erstellen

    Der Intake fungiert als Datenleitung, die den Runner mit Ihrer Dremio-Tabelle verbindet.

    Inhalt folgt in Kürze.

  3. Einen Intake-Benutzer erstellen

    Sie müssen einen dedizierten Benutzer erstellen, damit Debezium sich gegenüber dem Intake authentifizieren kann.

    Inhalt folgt in Kürze.

Definieren Sie Ihre Dremio-Ziel-Tabelle (Empfohlen)

Abschnitt betitelt „Definieren Sie Ihre Dremio-Ziel-Tabelle (Empfohlen)“

Um einen reibungslosen Erfassungsprozess zu gewährleisten, empfehlen wir, die Ziel-Tabelle in Dremio vorab zu erstellen. In diesem Tutorial wird Debezium so konfiguriert, dass eine vereinfachte JSON-Payload gesendet wird, die diesem Schema entspricht.

  1. Öffnen Sie Ihre Dremio-Benutzeroberfläche.

  2. Navigieren Sie zu Ihrer Quelle (z. B. catalog-s3).

  3. Führen Sie folgendes SQL aus, um die Tabelle zu erstellen. Stellen Sie sicher, dass der Tabellenname mit dem in Schritt 1 verwendeten catalog-table-name übereinstimmt.

    CREATE TABLE "catalog-s3"."intake"."debezium_pg_users" (
    id INT,
    name VARCHAR,
    email VARCHAR
    )

Lokale Postgres- und Kafka Connect-Dateien einrichten

Abschnitt betitelt „Lokale Postgres- und Kafka Connect-Dateien einrichten“

Sie erstellen nun die lokalen Konfigurationsdateien, die erforderlich sind, um den Connector im Standalone-Modus auszuführen.

Dateistruktur

- intake-debezium-tutorial/
- connect-data/
- postgres-init/
- init.sql
- connect-log4j.properties
- connect-standalone.properties
- docker-compose.yml
- pg-connector.properties
  1. Projektverzeichnisse erstellen:

    Terminal-Fenster
    mkdir -p intake-debezium-tutorial/postgres-init intake-debezium-tutorial/connect-data
    cd intake-debezium-tutorial
  2. postgres-init/init.sql erstellen: Dieses Skript initialisiert die Datenbank und aktiviert die logische Replikation.

    CREATE TABLE public.users (
    id SERIAL PRIMARY KEY,
    name VARCHAR(100),
    email VARCHAR(100)
    );
    INSERT INTO public.users (name, email) VALUES ('Alice', 'alice@example.com');
    ALTER TABLE public.users REPLICA IDENTITY FULL;
  3. connect-log4j.properties erstellen:

    log4j.rootLogger=INFO, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
  4. docker-compose.yml erstellen:

    services:
    postgres:
    image: debezium/postgres:15-alpine
    platform: linux/amd64
    container_name: postgres_cdc
    ports:
    - "5432:5432"
    environment:
    - POSTGRES_USER=postgres
    - POSTGRES_PASSWORD=postgres
    - POSTGRES_DB=testdb
    volumes:
    - ./postgres-init:/docker-entrypoint-initdb.d
    - pg-data:/var/lib/postgresql/data
    networks:
    - cdc-network
    connect:
    image: debezium/connect:3.0.0.Final
    platform: linux/amd64
    container_name: connect_cdc
    depends_on:
    - postgres
    command: >
    /kafka/bin/connect-standalone.sh
    /kafka/config/connect-standalone.properties
    /connect/pg-connector.properties
    environment:
    - CONNECT_REST_ADVERTISED_HOST_NAME=localhost
    - CONNECT_REST_ADVERTISED_PORT=8083
    volumes:
    - ./connect-data:/connect
    - ./connect-log4j.properties:/kafka/config/connect-log4j.properties
    - ./connect-standalone.properties:/kafka/config/connect-standalone.properties
    - ./pg-connector.properties:/connect/pg-connector.properties
    networks:
    - cdc-network
    volumes:
    pg-data:
    networks:
    cdc-network:
    driver: bridge
  5. connect-standalone.properties erstellen:

    Ersetzen Sie <BOOTSTRAP_SERVER> durch den URI aus Schritt 1 in Richten Sie Ihre STACKIT Intake-Umgebung ein und <USERNAME>/<PASSWORD> durch die Zugangsdaten aus Schritt 3 in Richten Sie Ihre STACKIT Intake-Umgebung ein.

    bootstrap.servers=<BOOTSTRAP_SERVER>
    plugin.path=/kafka/connect
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    internal.key.converter=org.apache.kafka.connect.json.JsonConverter
    internal.value.converter=org.apache.kafka.connect.json.JsonConverter
    config.storage.class=org.apache.kafka.connect.storage.FileConfigBackingStore
    offset.storage.class=org.apache.kafka.connect.storage.FileOffsetBackingStore
    status.storage.class=org.apache.kafka.connect.storage.FileStatusBackingStore
    config.storage.file.filename=/connect/standalone-configs.dat
    offset.storage.file.filename=/connect/standalone-offsets.dat
    status.storage.file.filename=/connect/standalone-status.dat
    offset.flush.interval.ms=10000
    security.protocol=SASL_SSL
    sasl.mechanism=SCRAM-SHA-512
    sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
    producer.security.protocol=SASL_SSL
    producer.sasl.mechanism=SCRAM-SHA-512
    producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
  6. pg-connector.properties erstellen:

    Ersetzen Sie <INTAKE_TOPIC> durch den Topic-Namen aus Schritt 2 in Richten Sie Ihre STACKIT Intake-Umgebung ein.

    name=postgres-source-connector
    connector.class=io.debezium.connector.postgresql.PostgresConnector
    database.hostname=postgres
    database.port=5432
    database.user=postgres
    database.password=postgres
    database.dbname=testdb
    database.server.name=my_postgres_server
    topic.prefix=postgres_server
    plugin.name=pgoutput
    table.include.list=public.users
    key.converter=org.apache.kafka.connect.storage.StringConverter
    value.converter=org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable=false
    transforms=extractKey, unwrap, route
    transforms.extractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
    transforms.extractKey.field=id
    transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
    transforms.unwrap.drop.tombstones=false
    transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
    transforms.route.regex=.*
    transforms.route.replacement=<INTAKE_TOPIC>

Starten Sie die Container aus Ihrem Verzeichnis intake-debezium-tutorial:

Terminal-Fenster
docker-compose up -d

Überwachen Sie die Logs, um sicherzustellen, dass die Verbindung erfolgreich hergestellt wurde:

Terminal-Fenster
docker-compose logs -f connect_cdc
  1. Dremio prüfen: Fragen Sie Ihre Tabelle in Dremio ab. Sie sollten den ersten Datensatz „Alice“ sehen.
  2. Postgres aktualisieren: Öffnen Sie eine Shell im Datenbank-Container:
    Terminal-Fenster
    docker exec -it postgres_cdc psql -U postgres -d testdb
    Führen Sie einige Änderungen aus:
    INSERT INTO public.users (name, email) VALUES ('Bob', 'bob@example.com');
    UPDATE public.users SET email = 'alice.new@example.com' WHERE name = 'Alice';
  3. Verifizieren: Prüfen Sie Dremio erneut. Die Änderungen sollten in der Iceberg-Tabelle reflektiert werden.