Skip to content

Write to Iceberg Catalog with Daft/PyIceberg

Last updated on

The following tutorial walks through exchanging a Dremio Personal Access Token for a short-lived token that can be used to interact with STACKIT Dremio’s Instance underlying Iceberg REST Catalog. In this example, data is written to the Catalog and Object Storage using Daft and PyIceberg.

  • You have a STACKIT Dremio instance: Create a Dremio Instance
  • You have enabled Personal Access Tokens in your STACKIT Dremio instance
  • Python3
  • Default catalog is configured and contains, at least, one folder
  • Your Dremio user (or the user you got the PAT from) is allowed to create tables

Create convenience .env file which can be exported before proceeding to the next steps.

.env
export DREMIO_HOST="https://<dremio-instance-url>" # Can be retrieved from STACKIT portal/sdk
export DREMIO_AUTH_ENDPOINT="https://<dremio-instance-url>/oauth/token"
export DREMIO_CATALOG_ENDPOINT="https://catalog.<dremio-instance-url>" # Notice the 'catalog' subdomain
export DREMIO_CATALOG_NAME="c" # Must match the existing default Catalog name in Dremio
export DREMIO_CATALOG_FOLDER="f" # Must match an existing Catalog folder in Dremio
export DREMIO_CATALOG_TABLE="example_table_name" # This table will be ovewritten if exists
export DREMIO_PAT="<create PAT from STACKIT Dremio Instance>"
export BUCKET_NAME="dremio-<dremio-instance-id>"
Terminal window
pip3 install daft pyarrow pyiceberg requests python-dotenv

Exchange Dremio personal access token (PAT) with a Catalog token

Section titled “Exchange Dremio personal access token (PAT) with a Catalog token”

Start by defining a function to exchange the PAT with a short-lived token, valid for requests to the catalog. This function is used in the next section.

daft_create_table.py
import requests
def get_oauth_token(url: str, password: str) -> str:
payload = {
"subject_token": password,
"subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"scope": "dremio.all",
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=payload, headers=headers)
response.raise_for_status()
return response.json()["access_token"]

The following snippet, uses pyiceberg to connect to the remote catalog, using the short-lived token, creates a basic table with sample data, including a partition.

daft_create_table.py
# ...
# previous code
# ...
import os
import requests
from dotenv import load_dotenv
import daft
import pyarrow as pa
from pyiceberg.catalog import load_catalog
load_dotenv()
DREMIO_AUTH_ENDPOINT = os.environ.get("DREMIO_AUTH_ENDPOINT")
DREMIO_CATALOG_ENDPOINT = os.environ.get("DREMIO_CATALOG_ENDPOINT")
DREMIO_CATALOG_NAME = os.environ.get("DREMIO_CATALOG_NAME")
DREMIO_CATALOG_FOLDER = os.environ.get("DREMIO_CATALOG_FOLDER")
DREMIO_CATALOG_TABLE = os.environ.get("DREMIO_CATALOG_FOLDER")
DREMIO_PAT = os.environ.get("DREMIO_PAT")
def main():
catalog = load_catalog(
"dremio_rest",
**{
"type": "rest",
"uri": DREMIO_CATALOG_ENDPOINT,
"token": get_oauth_token(DREMIO_AUTH_ENDPOINT, DREMIO_PAT),
"warehouse": "default"
}
)
arrow_schema = pa.schema([
pa.field("id", pa.int64(), nullable=True),
pa.field("user_id", pa.int32(), nullable=True),
pa.field("metric_value", pa.float64(), nullable=True),
pa.field("country", pa.string(), nullable=True)
])
table_identifier = f"{DREMIO_CATALOG_FOLDER}.{DREMIO_CATALOG_TABLE}"
if catalog.table_exists(table_identifier):
catalog.drop_table(table_identifier)
iceberg_table = catalog.create_table(
identifier=table_identifier,
schema=arrow_schema
)
with iceberg_table.update_spec() as update_spec:
update_spec.add_identity("country")
mock_data = {
"id": [1, 2, 3],
"user_id": [1001, 1002, 1003],
"metric_value": [45.25, 12.80, 99.99],
"country": ["de", "pt", "es"]
}
pa_table = pa.Table.from_pydict(mock_data, schema=arrow_schema)
df = daft.from_arrow(pa_table)
write_summary = df.write_iceberg(iceberg_table, mode="append")
write_summary.show()
if __name__ == "__main__":
main()

Assuming all the code was saved in a file called daft_create_table.py, the script can be run:

Terminal window
python daft_create_table.py

After running the script, the results can be verified directly in Dremio UI, using the references to the catalog, folder and table initially set.

DESCRIBE TABLE <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;

You can run a basic select so see all data (3 rows, if you only ran the script once, successfully)

SELECT *
FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;

Since both Daft/PyIceberg and Dremio are using the same catalog, we can also use Dremio to change the underlying data.

DELETE FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>
WHERE id = 1;