Write to Iceberg Catalog with Daft/PyIceberg
Last updated on
The following tutorial walks through exchanging a Dremio Personal Access Token for a short-lived token that can be used to interact with STACKIT Dremio’s Instance underlying Iceberg REST Catalog. In this example, data is written to the Catalog and Object Storage using Daft and PyIceberg.
Prerequisites
Section titled “Prerequisites”- You have a STACKIT Dremio instance: Create a Dremio Instance
- You have enabled Personal Access Tokens in your STACKIT Dremio instance
- Python3
- Default catalog is configured and contains, at least, one folder
- Your Dremio user (or the user you got the PAT from) is allowed to create tables
Tutorial
Section titled “Tutorial”Gather required parameters
Section titled “Gather required parameters”Create convenience .env file which can be exported before proceeding to the next steps.
export DREMIO_HOST="https://<dremio-instance-url>" # Can be retrieved from STACKIT portal/sdkexport DREMIO_AUTH_ENDPOINT="https://<dremio-instance-url>/oauth/token"export DREMIO_CATALOG_ENDPOINT="https://catalog.<dremio-instance-url>" # Notice the 'catalog' subdomainexport DREMIO_CATALOG_NAME="c" # Must match the existing default Catalog name in Dremioexport DREMIO_CATALOG_FOLDER="f" # Must match an existing Catalog folder in Dremioexport DREMIO_CATALOG_TABLE="example_table_name" # This table will be ovewritten if existsexport DREMIO_PAT="<create PAT from STACKIT Dremio Instance>"export BUCKET_NAME="dremio-<dremio-instance-id>"Install python dependencies
Section titled “Install python dependencies”pip3 install daft pyarrow pyiceberg requests python-dotenvExchange Dremio personal access token (PAT) with a Catalog token
Section titled “Exchange Dremio personal access token (PAT) with a Catalog token”Start by defining a function to exchange the PAT with a short-lived token, valid for requests to the catalog. This function is used in the next section.
import requests
def get_oauth_token(url: str, password: str) -> str: payload = { "subject_token": password, "subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token", "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", "scope": "dremio.all", } headers = {"Content-Type": "application/x-www-form-urlencoded"} response = requests.post(url, data=payload, headers=headers) response.raise_for_status() return response.json()["access_token"]Interact with the catalog
Section titled “Interact with the catalog”The following snippet, uses pyiceberg to connect to the remote catalog, using the short-lived token, creates a basic table with sample data, including a partition.
# ...# previous code# ...import osimport requestsfrom dotenv import load_dotenv
import daftimport pyarrow as pafrom pyiceberg.catalog import load_catalog
load_dotenv()
DREMIO_AUTH_ENDPOINT = os.environ.get("DREMIO_AUTH_ENDPOINT")DREMIO_CATALOG_ENDPOINT = os.environ.get("DREMIO_CATALOG_ENDPOINT")DREMIO_CATALOG_NAME = os.environ.get("DREMIO_CATALOG_NAME")DREMIO_CATALOG_FOLDER = os.environ.get("DREMIO_CATALOG_FOLDER")DREMIO_CATALOG_TABLE = os.environ.get("DREMIO_CATALOG_FOLDER")DREMIO_PAT = os.environ.get("DREMIO_PAT")
def main(): catalog = load_catalog( "dremio_rest", **{ "type": "rest", "uri": DREMIO_CATALOG_ENDPOINT, "token": get_oauth_token(DREMIO_AUTH_ENDPOINT, DREMIO_PAT), "warehouse": "default" } ) arrow_schema = pa.schema([ pa.field("id", pa.int64(), nullable=True), pa.field("user_id", pa.int32(), nullable=True), pa.field("metric_value", pa.float64(), nullable=True), pa.field("country", pa.string(), nullable=True) ])
table_identifier = f"{DREMIO_CATALOG_FOLDER}.{DREMIO_CATALOG_TABLE}"
if catalog.table_exists(table_identifier): catalog.drop_table(table_identifier)
iceberg_table = catalog.create_table( identifier=table_identifier, schema=arrow_schema )
with iceberg_table.update_spec() as update_spec: update_spec.add_identity("country")
mock_data = { "id": [1, 2, 3], "user_id": [1001, 1002, 1003], "metric_value": [45.25, 12.80, 99.99], "country": ["de", "pt", "es"] }
pa_table = pa.Table.from_pydict(mock_data, schema=arrow_schema) df = daft.from_arrow(pa_table)
write_summary = df.write_iceberg(iceberg_table, mode="append") write_summary.show()
if __name__ == "__main__": main()Run the script
Section titled “Run the script”Assuming all the code was saved in a file called daft_create_table.py, the script can be run:
python daft_create_table.pyInspecting the results
Section titled “Inspecting the results”After running the script, the results can be verified directly in Dremio UI, using the references to the catalog, folder and table initially set.
Describe the table
Section titled “Describe the table”DESCRIBE TABLE <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;Read data
Section titled “Read data”You can run a basic select so see all data (3 rows, if you only ran the script once, successfully)
SELECT *FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;Delete a data point
Section titled “Delete a data point”Since both Daft/PyIceberg and Dremio are using the same catalog, we can also use Dremio to change the underlying data.
DELETE FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>WHERE id = 1;