Zum Inhalt springen

Mit Daft/PyIceberg in den Iceberg Catalog schreiben

Zuletzt aktualisiert am

Dieses Tutorial führt Sie durch den Austausch eines Dremio Personal Access Tokens (PAT) gegen einen kurzlebigen Token, welcher die Interaktion mit dem zugrunde liegenden Iceberg REST Catalog einer STACKIT Dremio-Instanz verwendet werden kann. In diesem Beispiel werden Daten mit Daft und PyIceberg in den Catalog und den Object Storage geschrieben.

  • Sie verfügen über eine STACKIT Dremio-Instanz: Dremio-Instanz erstellen
  • Sie haben in ihrer STACKIT Dremio-Instanz Personal Access Tokens aktiviert
  • Python3
  • Der Standard-Catalog ist konfiguriert und enthält mindestens einen Ordner
  • Ihr Dremio-Benutzer (oder der Benutzer, von dem Sie den PAT erhalten haben) ist berechtigt, Tabellen zu erstellen

Erstellen Sie eine .env-Datei, die exportiert werden kann, bevor Sie mit den nächsten Schritten fortfahren.

.env
DREMIO_HOST="https://<dremio-instance-url>" # Kann aus dem STACKIT Portal/SDK abgerufen werden
DREMIO_AUTH_ENDPOINT="https://<dremio-instance-url>/oauth/token"
DREMIO_CATALOG_ENDPOINT="https://catalog.<dremio-instance-url>" # Beachten Sie die Subdomain 'catalog'
DREMIO_CATALOG_NAME="c" # Muss mit dem Namen des vorhandenen Standard-Catalogs in Dremio übereinstimmen
DREMIO_CATALOG_FOLDER="f" # Muss mit einem vorhandenen Catalog-Ordner in Dremio übereinstimmen
DREMIO_CATALOG_TABLE="example_table_name" # Diese Tabelle wird überschrieben, falls sie bereits existiert
DREMIO_PAT="<PAT aus der STACKIT Dremio-Instanz erstellen>"
BUCKET_NAME="dremio-<dremio-instance-id>"
Terminal-Fenster
pip3 install daft pyarrow pyiceberg requests python-dotenv

Dremio Personal Access Token (PAT) gegen einen Catalog-Token austauschen

Abschnitt betitelt „Dremio Personal Access Token (PAT) gegen einen Catalog-Token austauschen“

Definieren Sie zunächst eine Funktion, um den PAT gegen ein kurzlebiges Token auszutauschen, der für Anfragen an den Catalog gültig ist. Diese Funktion wird im nächsten Abschnitt verwendet.

daft_create_table.py
import requests
def get_oauth_token(url: str, password: str) -> str:
payload = {
"subject_token": password,
"subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token",
"grant_type": "urn:ietf:params:oauth:grant-type:token-exchange",
"scope": "dremio.all",
}
headers = {"Content-Type": "application/x-www-form-urlencoded"}
response = requests.post(url, data=payload, headers=headers)
response.raise_for_status()
return response.json()["access_token"]

Das folgende Skript verwendet pyiceberg, um sich über den kurzlebigen Token mit dem Remote-Catalog zu verbinden und erstellt eine einfache Tabelle mit Beispieldaten, einschließlich einer Partition.

daft_create_table.py
# ...
# previous code
# ...
import os
import requests
from dotenv import load_dotenv
import daft
import pyarrow as pa
from pyiceberg.catalog import load_catalog
load_dotenv()
DREMIO_AUTH_ENDPOINT = os.environ.get("DREMIO_AUTH_ENDPOINT")
DREMIO_CATALOG_ENDPOINT = os.environ.get("DREMIO_CATALOG_ENDPOINT")
DREMIO_CATALOG_NAME = os.environ.get("DREMIO_CATALOG_NAME")
DREMIO_CATALOG_FOLDER = os.environ.get("DREMIO_CATALOG_FOLDER")
DREMIO_CATALOG_TABLE = os.environ.get("DREMIO_CATALOG_FOLDER")
DREMIO_PAT = os.environ.get("DREMIO_PAT")
def main():
catalog = load_catalog(
"dremio_rest",
**{
"type": "rest",
"uri": DREMIO_CATALOG_ENDPOINT,
"token": get_oauth_token(DREMIO_AUTH_ENDPOINT, DREMIO_PAT),
"warehouse": "default"
}
)
arrow_schema = pa.schema([
pa.field("id", pa.int64(), nullable=True),
pa.field("user_id", pa.int32(), nullable=True),
pa.field("metric_value", pa.float64(), nullable=True),
pa.field("country", pa.string(), nullable=True)
])
table_identifier = f"{DREMIO_CATALOG_FOLDER}.{DREMIO_CATALOG_TABLE}"
if catalog.table_exists(table_identifier):
catalog.drop_table(table_identifier)
iceberg_table = catalog.create_table(
identifier=table_identifier,
schema=arrow_schema
)
with iceberg_table.update_spec() as update_spec:
update_spec.add_identity("country")
mock_data = {
"id": [1, 2, 3],
"user_id": [1001, 1002, 1003],
"metric_value": [45.25, 12.80, 99.99],
"country": ["de", "pt", "es"]
}
pa_table = pa.Table.from_pydict(mock_data, schema=arrow_schema)
df = daft.from_arrow(pa_table)
write_summary = df.write_iceberg(iceberg_table, mode="append")
write_summary.show()
if __name__ == "__main__":
main()

Angenommen, der gesamte Code wurde in einer Datei namens daft_create_table.py gespeichert, kann das Skript wie folgt ausgeführt werden:

Terminal-Fenster
python daft_create_table.py

Nach der Ausführung des Skripts können die Ergebnisse direkt in der Dremio-Benutzeroberfläche anhand der ursprünglich festgelegten Verweise auf Catalog, Ordner und Tabelle überprüft werden.

DESCRIBE TABLE <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;

Sie können ein einfaches SELECT ausführen, um alle Daten anzuzeigen (3 Zeilen, wenn Sie das Skript einmal erfolgreich ausgeführt haben):

SELECT *
FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;

Da sowohl Daft/PyIceberg als auch Dremio denselben Catalog verwenden, können wir die zugrunde liegenden Daten auch direkt über Dremio ändern.

DELETE FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>
WHERE id = 1;