Starten eines Spark-Clusters
STACKIT Workflows lässt sich nahtlos in Spark auf Kubernetes integrieren. Mit dem STACKIT Spark-Operator können Sie flüchtige Spark-Cluster direkt aus Airflow-Tasks heraus starten. In diesem Tutorial lernen Sie, wie Sie den STACKIT Spark-Operator verwenden, um einen Spark-Cluster zu konfigurieren und zu starten.
-
Erstellen Sie ein Spark-Skript in Ihrem Projekt.
dags/tasks/my_spark_job.pydags/tasks/my_spark_job.py import stackit_sparkimport timeimport pandas as pdspark = stackit_spark.get_spark()data = pd.DataFrame({"number": [10]})df = spark.createDataFrame(data)time.sleep(30)df.show() -
Erstellen Sie einen DAG in Ihrem Projekt und fügen Sie den folgenden Code-Ausschnitt in Ihre DAG-Datei ein. Er zeigt alle Konfigurationsoptionen des
STACKITITSparkScriptOperator. Die gleichen Parameter können über den@stackit.spark_kubernetes_task-Decorator verwendet werden.dags/my_spark_dag.pyimport pendulumfrom airflow.decorators import dagfrom stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperatorfrom stackit_workflows import config@dag(schedule=None,start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),catchup=False,tags=["demo"],dag_id="05_spark_full_configuration",)def spark_full_configuration():# Die gleichen Argumente werden vom Decorator unterstützt.STACKITSparkScriptOperator(task_id="spark_full_configuration",# image : str, optional# Image des gestarteten Containers.# Falls nicht angegeben, wird ein aktuelles, von STACKIT gewartetes Spark-Image verwendet.image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",# cpu : float# CPU-Request des Pods. Wenn Spark ohne Executor verwendet wird,# wird dies auch zur Bestimmung der Anzahl der Prozesse verwendet (z. B. local[2]).# Wenn Executoren verwendet werden, ist dies der CPU-Request des Drivers.cpu=4,# env_vars : dict, list of K8S.V1EnvVar, optional# Umgebungsvariablen, die an den Pod übergeben werden. Das Dictionary enthält Umgebungsvariablen als Schlüssel und# deren Werte als Werte (z. B. {'MY_ENV_VAR': 42}). Bitte beachten Sie, dass# alle numerischen Umgebungsvariablen vor der Erstellung des Kubernetes# Pod-Templates in Strings umgewandelt werden.env_vars={"MY_ENV_VAR": "my_value"},# executor_cpu : int or float, optional# CPUs, die von Spark für jeden Executor verwendet werden sollen. Dezimalwerte sind zulässig.executor_cpu=3.5,# executor_memory_gb : int, optional# Angeforderter Speicher der Executoren in GB. Dezimalwerte sind zulässig.# Falls nicht angegeben, wird er aus :paramref:`executor_cpu` wie folgt berechnet:# ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Standardfaktor ist 6).executor_memory_gb=10,# executors: int, optional# Anzahl der Kubernetes-Executoren. Falls angegeben, wird `spark_mode` standardmäßig auf `k8s`# statt auf `local` gesetzt.executors=2,# get_logs : bool, optional# Stdout lesen, um das Stdout des Containers als Logs der Tasks zu erhalten. Standard: Trueget_logs=True,# git_sync_branch : str, optional# Branch im angegebenen :paramref:`git_repo`, aus dem das Skript geklont wird.# Falls nicht angegeben, wird derselbe Branch wie beim DAG verwendet.git_sync_ref="main",# git_sync_repo : str, optional# SSH-Pfad des Repositorys, aus dem die Skripte abgerufen werden sollen.# Falls nicht angegeben, wird das Repository des DAGs verwendet.git_sync_repo=config.GIT_SYNC_DAG_REPO,# git_sync_target_path : str, optional# Pfad, in den der ``git-sync``-Init-Container die Skripte zieht.# Standard: `/app/git-sync/data`git_sync_target_path="/app/git-sync/data",# git_sync_secret_name : str, optional# Der Name des Secrets, das den SSH-Schlüssel zum Ziehen des Repositorys enthält.# Falls nicht angegeben, wird der Standard des DAG-Repositorys verwendet.lakehouse_connections=["lakehouse-rest"],# memory_gb : int or float, optional# Speicheranforderung in GB für den (Driver-)Pod. Dezimalwerte sind zulässig.# Falls nicht angegeben, wird sie aus :paramref:`cpu` wie folgt berechnet:# ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Standardfaktor ist 6).memory_gb=10,# name : str, optional# Name des für diesen Task erzeugten Pods.# Wir empfehlen, diesen Wert nicht zu ändern.# Standard: ``"task-<dag_id>-<task_id>-<hash>"``name="ensure_this_is_unique",# provide_context : bool, optional# Airflow-Kontext als Umgebungsvariablen bereitstellen. Standard: True# Weitere Informationen finden Sie in Beispiel 03.provide_context=True,# container_resources : K8S.V1ResourceRequirements, optional# Wird derzeit nicht unterstützt. Verwenden Sie stattdessen :paramref:`cpu` und# :paramref:`memory_gb`.# script : str# Pfad des auszuführenden Skripts. Wenn ein relativer Pfad angegeben wird,# wird dieser relativ zum git-sync-Ordner interpretiert.# Wenn ein absoluter Pfad angegeben wird, wird er nicht weiter angepasst.script="Demo/scripts/my_spark_job.py",# Alle anderen vom `KubernetesPodOperator` unterstützten Argumente können hier verwendet werden.labels={"foo": "bar"},)spark_full_configuration() -
Pushen Sie die Änderungen an das Git-Remote, das mit Ihrer STACKIT Workflows-Umgebung synchronisiert ist, und starten Sie den DAG. Workflows erstellt automatisch einen temporären (flüchtig) Spark-Cluster auf Kubernetes. Nachdem der Task abgeschlossen ist, verschwinden alle Spark-Pods aus dem Cluster, aber die Ausführungsprotokolle bleiben im Log-Speicher von Workflows erhalten.