Zum Inhalt springen

Stream Postgres changes to Dremio

Diese Seite ist noch nicht in deiner Sprache verfügbar. Englische Seite aufrufen

This guide explains how to configure Debezium to monitor a local PostgreSQL database, capture all row-level changes (INSERTs, UPDATEs, DELETEs), and stream those events directly into STACKIT Intake. Intake automatically ingests them into a Dremio Iceberg table for real-time analytics.

Before you begin setting up the pipeline, ensure you have the following in place:

  • You have a STACKIT project and the necessary permissions (Intake.Admin, Intake.Editor, or Intake.Owner).
  • You have installed and configured the STACKIT CLI: User Guide
  • You have access to a Dremio instance and possess a Personal Access Token (PAT).
  • You have Docker and Docker Compose installed on your local machine.
  1. Set up your STACKIT Intake environment

    You need to provision the infrastructure that will receive the data stream. This involves creating a Runner, an Intake, and an Intake User.

    1. Create an Intake Runner

      First, define the engine for your data ingestion.

      Content coming soon.

    2. Create an Intake

      The Intake acts as the data pipe connecting the Runner to your Dremio table.

      Content coming soon.

    3. Create an Intake User

      You must create a dedicated user for Debezium to authenticate against the Intake.

      Content coming soon.

  2. Define your Dremio target table (Recommended)

    To ensure a smooth ingestion process, we recommend pre-creating the target table in Dremio. This tutorial configures Debezium to send a simplified JSON payload matching this schema.

    1. Open your Dremio UI.

    2. Navigate to your source (e.g., catalog-s3).

    3. Execute the following SQL to create the table. Ensure the table name matches the catalog-table-name used in Step 1.

      CREATE TABLE "catalog-s3"."intake"."debezium_pg_users" (
      id INT,
      name VARCHAR,
      email VARCHAR
      )
  3. Set up local Postgres and Kafka Connect files

    You will now create the local configuration files required to run the connector in standalone mode.

    File Structure

    • Ordnerintake-debezium-tutorial/
      • Ordnerconnect-data/
      • Ordnerpostgres-init/
        • init.sql
      • connect-log4j.properties
      • connect-standalone.properties
      • docker-compose.yml
      • pg-connector.properties
    1. Create the project directories:

      Terminal window
      mkdir -p intake-debezium-tutorial/postgres-init intake-debezium-tutorial/connect-data
      cd intake-debezium-tutorial
    2. Create postgres-init/init.sql: This script initializes the database and enables logical replication.

      CREATE TABLE public.users (
      id SERIAL PRIMARY KEY,
      name VARCHAR(100),
      email VARCHAR(100)
      );
      INSERT INTO public.users (name, email) VALUES ('Alice', 'alice@example.com');
      ALTER TABLE public.users REPLICA IDENTITY FULL;
    3. Create connect-log4j.properties:

      log4j.rootLogger=INFO, stdout
      log4j.appender.stdout=org.apache.log4j.ConsoleAppender
      log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
      log4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n
    4. Create docker-compose.yml:

      services:
      postgres:
      image: debezium/postgres:15-alpine
      platform: linux/amd64
      container_name: postgres_cdc
      ports:
      - "5432:5432"
      environment:
      - POSTGRES_USER=postgres
      - POSTGRES_PASSWORD=postgres
      - POSTGRES_DB=testdb
      volumes:
      - ./postgres-init:/docker-entrypoint-initdb.d
      - pg-data:/var/lib/postgresql/data
      networks:
      - cdc-network
      connect:
      image: debezium/connect:3.0.0.Final
      platform: linux/amd64
      container_name: connect_cdc
      depends_on:
      - postgres
      command: >
      /kafka/bin/connect-standalone.sh
      /kafka/config/connect-standalone.properties
      /connect/pg-connector.properties
      environment:
      - CONNECT_REST_ADVERTISED_HOST_NAME=localhost
      - CONNECT_REST_ADVERTISED_PORT=8083
      volumes:
      - ./connect-data:/connect
      - ./connect-log4j.properties:/kafka/config/connect-log4j.properties
      - ./connect-standalone.properties:/kafka/config/connect-standalone.properties
      - ./pg-connector.properties:/connect/pg-connector.properties
      networks:
      - cdc-network
      volumes:
      pg-data:
      networks:
      cdc-network:
      driver: bridge
    5. Create connect-standalone.properties:

      Replace <BOOTSTRAP_SERVER> with the URI from Step 1.1 and <USERNAME>/<PASSWORD> with the credentials from Step 1.3.

      bootstrap.servers=<BOOTSTRAP_SERVER>
      plugin.path=/kafka/connect
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      internal.key.converter=org.apache.kafka.connect.json.JsonConverter
      internal.value.converter=org.apache.kafka.connect.json.JsonConverter
      config.storage.class=org.apache.kafka.connect.storage.FileConfigBackingStore
      offset.storage.class=org.apache.kafka.connect.storage.FileOffsetBackingStore
      status.storage.class=org.apache.kafka.connect.storage.FileStatusBackingStore
      config.storage.file.filename=/connect/standalone-configs.dat
      offset.storage.file.filename=/connect/standalone-offsets.dat
      status.storage.file.filename=/connect/standalone-status.dat
      offset.flush.interval.ms=10000
      security.protocol=SASL_SSL
      sasl.mechanism=SCRAM-SHA-512
      sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
      producer.security.protocol=SASL_SSL
      producer.sasl.mechanism=SCRAM-SHA-512
      producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
    6. Create pg-connector.properties:

      Replace <INTAKE_TOPIC> with the topic name from Step 1.2.

      name=postgres-source-connector
      connector.class=io.debezium.connector.postgresql.PostgresConnector
      database.hostname=postgres
      database.port=5432
      database.user=postgres
      database.password=postgres
      database.dbname=testdb
      database.server.name=my_postgres_server
      topic.prefix=postgres_server
      plugin.name=pgoutput
      table.include.list=public.users
      key.converter=org.apache.kafka.connect.storage.StringConverter
      value.converter=org.apache.kafka.connect.json.JsonConverter
      value.converter.schemas.enable=false
      transforms=extractKey, unwrap, route
      transforms.extractKey.type=org.apache.kafka.connect.transforms.ExtractField$Key
      transforms.extractKey.field=id
      transforms.unwrap.type=io.debezium.transforms.ExtractNewRecordState
      transforms.unwrap.drop.tombstones=false
      transforms.route.type=org.apache.kafka.connect.transforms.RegexRouter
      transforms.route.regex=.*
      transforms.route.replacement=<INTAKE_TOPIC>
  4. Launch the pipeline

    From your intake-debezium-tutorial directory, start the containers:

    Terminal window
    docker-compose up -d

    Monitor the logs to confirm the connection is successful:

    Terminal window
    docker-compose logs -f connect_cdc
  5. Verify the end-to-end flow

    1. Check Dremio: Query your table in Dremio. You should see the initial record “Alice”.
    2. Update Postgres: Open a shell in the database container:
      Terminal window
      docker exec -it postgres_cdc psql -U postgres -d testdb
      Run some changes:
      INSERT INTO public.users (name, email) VALUES ('Bob', 'bob@example.com');
      UPDATE public.users SET email = 'alice.new@example.com' WHERE name = 'Alice';
    3. Verify: Check Dremio again. The changes should be reflected in the Iceberg table.