Stream Postgres changes to Dremio
Diese Seite ist noch nicht in deiner Sprache verfügbar. Englische Seite aufrufen
Overview
Section titled “Overview”This guide explains how to configure Debezium to monitor a local PostgreSQL database, capture all row-level changes (INSERTs, UPDATEs, DELETEs), and stream those events directly into STACKIT Intake. Intake automatically ingests them into a Dremio Iceberg table for real-time analytics.
Prerequisites
Section titled “Prerequisites”Before you begin setting up the pipeline, ensure you have the following in place:
- You have a STACKIT project and the necessary permissions (
Intake.Admin,Intake.Editor, orIntake.Owner). - You have installed and configured the STACKIT CLI: User Guide
- You have access to a Dremio instance and possess a Personal Access Token (PAT).
- You have Docker and Docker Compose installed on your local machine.
Step-by-Step configuration
Section titled “Step-by-Step configuration”-
Set up your STACKIT Intake environment
You need to provision the infrastructure that will receive the data stream. This involves creating a Runner, an Intake, and an Intake User.
-
Create an Intake Runner
First, define the engine for your data ingestion.
Content coming soon.
Flag Description Default Optional display-nameA human-readable name for the Intake Runner. max-message-size-kibThe maximum size, in kibibytes, for a single message. max-messages-per-hourThe maximum number of messages the runner can process per hour. project-idThe ID of your STACKIT project. regionThe region for the runner (e.g., eu01).Execute the creation command
Section titled “Execute the creation command”To create your Intake Runner, execute the command providing the relevant flags:
Terminal window stackit beta intake runner create --display-name <DISPLAY_NAME> --max-message-size-kib <SIZE> --max-messages-per-hour <RATE> --project-id <PROJECT_ID> --region eu01After executing the command, confirm with y. When the instance is created successfully, the CLI will return the instance ID.
Example
Section titled “Example”Terminal window stackit beta intake runner create \--display-name "debezium-pg-runner" \--max-message-size-kib 1000 \--max-messages-per-hour 1000 \--project-id "1234-5678-90ab-cdef" \--region eu01Output:
Created Intake Runner for project "...". Runner ID: af1b6d5b-9dc5-4dee-ab48-e944a7a17a2bBootstrap Server: af1b6d5b.intake.eu01.onstackit.cloud:9094 -
Create an Intake
The Intake acts as the data pipe connecting the Runner to your Dremio table.
Content coming soon.
Flag Description runner-idThe ID of the Intake Runner created in the previous step. catalog-uriThe Dremio Catalog URI. catalog-warehouseThe Dremio warehouse name. catalog-table-nameThe target table name in Dremio. dremio-patYour Dremio Personal Access Token. dremio-token-endpointThe Dremio authentication endpoint. Execute the creation command
Section titled “Execute the creation command”To create your Intake, execute the following command:
Terminal window stackit beta intake create --runner-id <RUNNER_ID> --display-name <NAME> --catalog-uri <URI> --catalog-table-name <TABLE> --dremio-pat <PAT> ...Example
Section titled “Example”Terminal window stackit beta intake create \--display-name "debezium-intake" \--runner-id "af1b6d5b-9dc5-4dee-ab48-e944a7a17a2b" \--catalog-uri "[https://catalog.dremio](https://catalog.dremio)..." \--catalog-warehouse "catalog-s3" \--catalog-table-name "debezium_pg_users" \--catalog-auth-type "dremio" \--dremio-token-endpoint "[https://oauth.dremio](https://oauth.dremio)..." \--dremio-pat "MY_SECRET_PAT" \--project-id "1234-5678-90ab-cdef" \--region eu01Output:
Created Intake for Runner "...". Intake ID: 17802315-32c2-48ce-a33d-3043d8aec89bTopic: intake-17802315-32c2-48ce-a33d-3043d8aec89b -
Create an Intake User
You must create a dedicated user for Debezium to authenticate against the Intake.
Content coming soon.
Flag Description intake-idThe ID of the Intake created in the previous step. display-nameA name for the user. passwordA strong password (min 12 chars, mixed case, numbers, special). typeMust be set to intake.Execute the creation command
Section titled “Execute the creation command”Terminal window stackit beta intake user create --intake-id <INTAKE_ID> --display-name <NAME> --password <PASSWORD> --type intakeExample
Section titled “Example”Terminal window stackit beta intake user create \--intake-id "17802315-32c2-48ce-a33d-3043d8aec89b" \--display-name "debezium-connector-user" \--password "SuperSaf3Password!" \--type "intake" \--project-id "1234-5678-90ab-cdef" \--region eu01Output:
Created Intake user... User ID: e9e65d48...Username: intake-user-e9e65d48...
-
-
Define your Dremio target table (Recommended)
To ensure a smooth ingestion process, we recommend pre-creating the target table in Dremio. This tutorial configures Debezium to send a simplified JSON payload matching this schema.
-
Open your Dremio UI.
-
Navigate to your source (e.g.,
catalog-s3). -
Execute the following SQL to create the table. Ensure the table name matches the
catalog-table-nameused in Step 1.CREATE TABLE "catalog-s3"."intake"."debezium_pg_users" (id INT,name VARCHAR,email VARCHAR)
-
-
Set up local Postgres and Kafka Connect files
You will now create the local configuration files required to run the connector in standalone mode.
File Structure
Ordnerintake-debezium-tutorial/
Ordnerconnect-data/
- …
Ordnerpostgres-init/
- init.sql
- connect-log4j.properties
- connect-standalone.properties
- docker-compose.yml
- pg-connector.properties
-
Create the project directories:
Terminal window mkdir -p intake-debezium-tutorial/postgres-init intake-debezium-tutorial/connect-datacd intake-debezium-tutorial -
Create
postgres-init/init.sql: This script initializes the database and enables logical replication.CREATE TABLE public.users (id SERIAL PRIMARY KEY,name VARCHAR(100),email VARCHAR(100));INSERT INTO public.users (name, email) VALUES ('Alice', 'alice@example.com');ALTER TABLE public.users REPLICA IDENTITY FULL; -
Create
connect-log4j.properties:log4j.rootLogger=INFO, stdoutlog4j.appender.stdout=org.apache.log4j.ConsoleAppenderlog4j.appender.stdout.layout=org.apache.log4j.PatternLayoutlog4j.appender.stdout.layout.ConversionPattern=[%d] %p %m (%c:%L)%n -
Create
docker-compose.yml:services:postgres:image: debezium/postgres:15-alpineplatform: linux/amd64container_name: postgres_cdcports:- "5432:5432"environment:- POSTGRES_USER=postgres- POSTGRES_PASSWORD=postgres- POSTGRES_DB=testdbvolumes:- ./postgres-init:/docker-entrypoint-initdb.d- pg-data:/var/lib/postgresql/datanetworks:- cdc-networkconnect:image: debezium/connect:3.0.0.Finalplatform: linux/amd64container_name: connect_cdcdepends_on:- postgrescommand: >/kafka/bin/connect-standalone.sh/kafka/config/connect-standalone.properties/connect/pg-connector.propertiesenvironment:- CONNECT_REST_ADVERTISED_HOST_NAME=localhost- CONNECT_REST_ADVERTISED_PORT=8083volumes:- ./connect-data:/connect- ./connect-log4j.properties:/kafka/config/connect-log4j.properties- ./connect-standalone.properties:/kafka/config/connect-standalone.properties- ./pg-connector.properties:/connect/pg-connector.propertiesnetworks:- cdc-networkvolumes:pg-data:networks:cdc-network:driver: bridge -
Create
connect-standalone.properties:Replace
<BOOTSTRAP_SERVER>with the URI from Step 1.1 and<USERNAME>/<PASSWORD>with the credentials from Step 1.3.bootstrap.servers=<BOOTSTRAP_SERVER>plugin.path=/kafka/connectkey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConverterinternal.key.converter=org.apache.kafka.connect.json.JsonConverterinternal.value.converter=org.apache.kafka.connect.json.JsonConverterconfig.storage.class=org.apache.kafka.connect.storage.FileConfigBackingStoreoffset.storage.class=org.apache.kafka.connect.storage.FileOffsetBackingStorestatus.storage.class=org.apache.kafka.connect.storage.FileStatusBackingStoreconfig.storage.file.filename=/connect/standalone-configs.datoffset.storage.file.filename=/connect/standalone-offsets.datstatus.storage.file.filename=/connect/standalone-status.datoffset.flush.interval.ms=10000security.protocol=SASL_SSLsasl.mechanism=SCRAM-SHA-512sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";producer.security.protocol=SASL_SSLproducer.sasl.mechanism=SCRAM-SHA-512producer.sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>"; -
Create
pg-connector.properties:Replace
<INTAKE_TOPIC>with the topic name from Step 1.2.name=postgres-source-connectorconnector.class=io.debezium.connector.postgresql.PostgresConnectordatabase.hostname=postgresdatabase.port=5432database.user=postgresdatabase.password=postgresdatabase.dbname=testdbdatabase.server.name=my_postgres_servertopic.prefix=postgres_serverplugin.name=pgoutputtable.include.list=public.userskey.converter=org.apache.kafka.connect.storage.StringConvertervalue.converter=org.apache.kafka.connect.json.JsonConvertervalue.converter.schemas.enable=falsetransforms=extractKey, unwrap, routetransforms.extractKey.type=org.apache.kafka.connect.transforms.ExtractField$Keytransforms.extractKey.field=idtransforms.unwrap.type=io.debezium.transforms.ExtractNewRecordStatetransforms.unwrap.drop.tombstones=falsetransforms.route.type=org.apache.kafka.connect.transforms.RegexRoutertransforms.route.regex=.*transforms.route.replacement=<INTAKE_TOPIC>
-
Launch the pipeline
From your
intake-debezium-tutorialdirectory, start the containers:Terminal window docker-compose up -dMonitor the logs to confirm the connection is successful:
Terminal window docker-compose logs -f connect_cdc -
Verify the end-to-end flow
- Check Dremio: Query your table in Dremio. You should see the initial record “Alice”.
- Update Postgres: Open a shell in the database container:
Run some changes:
Terminal window docker exec -it postgres_cdc psql -U postgres -d testdbINSERT INTO public.users (name, email) VALUES ('Bob', 'bob@example.com');UPDATE public.users SET email = 'alice.new@example.com' WHERE name = 'Alice'; - Verify: Check Dremio again. The changes should be reflected in the Iceberg table.