Mit Daft/PyIceberg in den Iceberg Catalog schreiben
Zuletzt aktualisiert am
Dieses Tutorial führt Sie durch den Austausch eines Dremio Personal Access Tokens (PAT) gegen einen kurzlebigen Token, welcher die Interaktion mit dem zugrunde liegenden Iceberg REST Catalog einer STACKIT Dremio-Instanz verwendet werden kann. In diesem Beispiel werden Daten mit Daft und PyIceberg in den Catalog und den Object Storage geschrieben.
Voraussetzungen
Abschnitt betitelt „Voraussetzungen“- Sie verfügen über eine STACKIT Dremio-Instanz: Dremio-Instanz erstellen
- Sie haben in ihrer STACKIT Dremio-Instanz Personal Access Tokens aktiviert
- Python3
- Der Standard-Catalog ist konfiguriert und enthält mindestens einen Ordner
- Ihr Dremio-Benutzer (oder der Benutzer, von dem Sie den PAT erhalten haben) ist berechtigt, Tabellen zu erstellen
Tutorial
Abschnitt betitelt „Tutorial“Parameter für die Ausführung
Abschnitt betitelt „Parameter für die Ausführung“Erstellen Sie eine .env-Datei, die exportiert werden kann, bevor Sie mit den nächsten Schritten fortfahren.
DREMIO_HOST="https://<dremio-instance-url>" # Kann aus dem STACKIT Portal/SDK abgerufen werdenDREMIO_AUTH_ENDPOINT="https://<dremio-instance-url>/oauth/token"DREMIO_CATALOG_ENDPOINT="https://catalog.<dremio-instance-url>" # Beachten Sie die Subdomain 'catalog'DREMIO_CATALOG_NAME="c" # Muss mit dem Namen des vorhandenen Standard-Catalogs in Dremio übereinstimmenDREMIO_CATALOG_FOLDER="f" # Muss mit einem vorhandenen Catalog-Ordner in Dremio übereinstimmenDREMIO_CATALOG_TABLE="example_table_name" # Diese Tabelle wird überschrieben, falls sie bereits existiertDREMIO_PAT="<PAT aus der STACKIT Dremio-Instanz erstellen>"BUCKET_NAME="dremio-<dremio-instance-id>"Installation von Python Dependencies
Abschnitt betitelt „Installation von Python Dependencies“pip3 install daft pyarrow pyiceberg requests python-dotenvDremio Personal Access Token (PAT) gegen einen Catalog-Token austauschen
Abschnitt betitelt „Dremio Personal Access Token (PAT) gegen einen Catalog-Token austauschen“Definieren Sie zunächst eine Funktion, um den PAT gegen ein kurzlebiges Token auszutauschen, der für Anfragen an den Catalog gültig ist. Diese Funktion wird im nächsten Abschnitt verwendet.
import requests
def get_oauth_token(url: str, password: str) -> str: payload = { "subject_token": password, "subject_token_type": "urn:ietf:params:oauth:token-type:dremio:personal-access-token", "grant_type": "urn:ietf:params:oauth:grant-type:token-exchange", "scope": "dremio.all", } headers = {"Content-Type": "application/x-www-form-urlencoded"} response = requests.post(url, data=payload, headers=headers) response.raise_for_status() return response.json()["access_token"]Mit dem Catalog interagieren
Abschnitt betitelt „Mit dem Catalog interagieren“Das folgende Skript verwendet pyiceberg, um sich über den kurzlebigen Token mit dem Remote-Catalog zu verbinden und erstellt eine einfache Tabelle mit Beispieldaten, einschließlich einer Partition.
# ...# previous code# ...import osimport requestsfrom dotenv import load_dotenv
import daftimport pyarrow as pafrom pyiceberg.catalog import load_catalog
load_dotenv()
DREMIO_AUTH_ENDPOINT = os.environ.get("DREMIO_AUTH_ENDPOINT")DREMIO_CATALOG_ENDPOINT = os.environ.get("DREMIO_CATALOG_ENDPOINT")DREMIO_CATALOG_NAME = os.environ.get("DREMIO_CATALOG_NAME")DREMIO_CATALOG_FOLDER = os.environ.get("DREMIO_CATALOG_FOLDER")DREMIO_CATALOG_TABLE = os.environ.get("DREMIO_CATALOG_FOLDER")DREMIO_PAT = os.environ.get("DREMIO_PAT")
def main(): catalog = load_catalog( "dremio_rest", **{ "type": "rest", "uri": DREMIO_CATALOG_ENDPOINT, "token": get_oauth_token(DREMIO_AUTH_ENDPOINT, DREMIO_PAT), "warehouse": "default" } ) arrow_schema = pa.schema([ pa.field("id", pa.int64(), nullable=True), pa.field("user_id", pa.int32(), nullable=True), pa.field("metric_value", pa.float64(), nullable=True), pa.field("country", pa.string(), nullable=True) ])
table_identifier = f"{DREMIO_CATALOG_FOLDER}.{DREMIO_CATALOG_TABLE}"
if catalog.table_exists(table_identifier): catalog.drop_table(table_identifier)
iceberg_table = catalog.create_table( identifier=table_identifier, schema=arrow_schema )
with iceberg_table.update_spec() as update_spec: update_spec.add_identity("country")
mock_data = { "id": [1, 2, 3], "user_id": [1001, 1002, 1003], "metric_value": [45.25, 12.80, 99.99], "country": ["de", "pt", "es"] }
pa_table = pa.Table.from_pydict(mock_data, schema=arrow_schema) df = daft.from_arrow(pa_table)
write_summary = df.write_iceberg(iceberg_table, mode="append") write_summary.show()
if __name__ == "__main__": main()Das Skript ausführen
Abschnitt betitelt „Das Skript ausführen“Angenommen, der gesamte Code wurde in einer Datei namens daft_create_table.py gespeichert, kann das Skript wie folgt ausgeführt werden:
python daft_create_table.pyErgebnisse überprüfen
Abschnitt betitelt „Ergebnisse überprüfen“Nach der Ausführung des Skripts können die Ergebnisse direkt in der Dremio-Benutzeroberfläche anhand der ursprünglich festgelegten Verweise auf Catalog, Ordner und Tabelle überprüft werden.
Tabelle beschreiben
Abschnitt betitelt „Tabelle beschreiben“DESCRIBE TABLE <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;Daten auslesen
Abschnitt betitelt „Daten auslesen“Sie können ein einfaches SELECT ausführen, um alle Daten anzuzeigen (3 Zeilen, wenn Sie das Skript einmal erfolgreich ausgeführt haben):
SELECT *FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>;Einen Datenpunkt löschen
Abschnitt betitelt „Einen Datenpunkt löschen“Da sowohl Daft/PyIceberg als auch Dremio denselben Catalog verwenden, können wir die zugrunde liegenden Daten auch direkt über Dremio ändern.
DELETE FROM <DREMIO_CATALOG_NAME>.<DREMIO_CATALOG_FOLDER>.<DREMIO_CATALOG_TABLE>WHERE id = 1;