Zum Inhalt springen

Starten eines Spark-Clusters

STACKIT Workflows lässt sich nahtlos in Spark auf Kubernetes integrieren. Mit dem STACKIT Spark-Operator können Sie flüchtige Spark-Cluster direkt aus Airflow-Tasks heraus starten. In diesem Tutorial lernen Sie, wie Sie den STACKIT Spark-Operator verwenden, um einen Spark-Cluster zu konfigurieren und zu starten.

  1. Erstellen Sie ein Spark-Skript in Ihrem Projekt.

    dags/tasks/my_spark_job.py

    dags/tasks/my_spark_job.py
    import stackit_spark
    import time
    import pandas as pd
    spark = stackit_spark.get_spark()
    data = pd.DataFrame({"number": [10]})
    df = spark.createDataFrame(data)
    time.sleep(30)
    df.show()
  2. Erstellen Sie einen DAG in Ihrem Projekt und fügen Sie den folgenden Code-Ausschnitt in Ihre DAG-Datei ein. Er zeigt alle Konfigurationsoptionen des STACKITITSparkScriptOperator. Die gleichen Parameter können über den @stackit.spark_kubernetes_task-Decorator verwendet werden.

    dags/my_spark_dag.py

    import pendulum
    from airflow.decorators import dag
    from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
    from stackit_workflows import config
    @dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["demo"],
    dag_id="05_spark_full_configuration",
    )
    def spark_full_configuration():
    # Die gleichen Argumente werden vom Decorator unterstützt.
    STACKITSparkScriptOperator(
    task_id="spark_full_configuration",
    # image : str, optional
    # Image des gestarteten Containers.
    # Falls nicht angegeben, wird ein aktuelles, von STACKIT gewartetes Spark-Image verwendet.
    image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
    # cpu : float
    # CPU-Request des Pods. Wenn Spark ohne Executor verwendet wird,
    # wird dies auch zur Bestimmung der Anzahl der Prozesse verwendet (z. B. local[2]).
    # Wenn Executoren verwendet werden, ist dies der CPU-Request des Drivers.
    cpu=4,
    # env_vars : dict, list of K8S.V1EnvVar, optional
    # Umgebungsvariablen, die an den Pod übergeben werden. Das Dictionary enthält Umgebungsvariablen als Schlüssel und
    # deren Werte als Werte (z. B. {'MY_ENV_VAR': 42}). Bitte beachten Sie, dass
    # alle numerischen Umgebungsvariablen vor der Erstellung des Kubernetes
    # Pod-Templates in Strings umgewandelt werden.
    env_vars={"MY_ENV_VAR": "my_value"},
    # executor_cpu : int or float, optional
    # CPUs, die von Spark für jeden Executor verwendet werden sollen. Dezimalwerte sind zulässig.
    executor_cpu=3.5,
    # executor_memory_gb : int, optional
    # Angeforderter Speicher der Executoren in GB. Dezimalwerte sind zulässig.
    # Falls nicht angegeben, wird er aus :paramref:`executor_cpu` wie folgt berechnet:
    # ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Standardfaktor ist 6).
    executor_memory_gb=10,
    # executors: int, optional
    # Anzahl der Kubernetes-Executoren. Falls angegeben, wird `spark_mode` standardmäßig auf `k8s`
    # statt auf `local` gesetzt.
    executors=2,
    # get_logs : bool, optional
    # Stdout lesen, um das Stdout des Containers als Logs der Tasks zu erhalten. Standard: True
    get_logs=True,
    # git_sync_branch : str, optional
    # Branch im angegebenen :paramref:`git_repo`, aus dem das Skript geklont wird.
    # Falls nicht angegeben, wird derselbe Branch wie beim DAG verwendet.
    git_sync_ref="main",
    # git_sync_repo : str, optional
    # SSH-Pfad des Repositorys, aus dem die Skripte abgerufen werden sollen.
    # Falls nicht angegeben, wird das Repository des DAGs verwendet.
    git_sync_repo=config.GIT_SYNC_DAG_REPO,
    # git_sync_target_path : str, optional
    # Pfad, in den der ``git-sync``-Init-Container die Skripte zieht.
    # Standard: `/app/git-sync/data`
    git_sync_target_path="/app/git-sync/data",
    # git_sync_secret_name : str, optional
    # Der Name des Secrets, das den SSH-Schlüssel zum Ziehen des Repositorys enthält.
    # Falls nicht angegeben, wird der Standard des DAG-Repositorys verwendet.
    lakehouse_connections=["lakehouse-rest"],
    # memory_gb : int or float, optional
    # Speicheranforderung in GB für den (Driver-)Pod. Dezimalwerte sind zulässig.
    # Falls nicht angegeben, wird sie aus :paramref:`cpu` wie folgt berechnet:
    # ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Standardfaktor ist 6).
    memory_gb=10,
    # name : str, optional
    # Name des für diesen Task erzeugten Pods.
    # Wir empfehlen, diesen Wert nicht zu ändern.
    # Standard: ``"task-<dag_id>-<task_id>-<hash>"``
    name="ensure_this_is_unique",
    # provide_context : bool, optional
    # Airflow-Kontext als Umgebungsvariablen bereitstellen. Standard: True
    # Weitere Informationen finden Sie in Beispiel 03.
    provide_context=True,
    # container_resources : K8S.V1ResourceRequirements, optional
    # Wird derzeit nicht unterstützt. Verwenden Sie stattdessen :paramref:`cpu` und
    # :paramref:`memory_gb`.
    # script : str
    # Pfad des auszuführenden Skripts. Wenn ein relativer Pfad angegeben wird,
    # wird dieser relativ zum git-sync-Ordner interpretiert.
    # Wenn ein absoluter Pfad angegeben wird, wird er nicht weiter angepasst.
    script="Demo/scripts/my_spark_job.py",
    # Alle anderen vom `KubernetesPodOperator` unterstützten Argumente können hier verwendet werden.
    labels={"foo": "bar"},
    )
    spark_full_configuration()
  3. Pushen Sie die Änderungen an das Git-Remote, das mit Ihrer STACKIT Workflows-Umgebung synchronisiert ist, und starten Sie den DAG. Workflows erstellt automatisch einen temporären (flüchtig) Spark-Cluster auf Kubernetes. Nachdem der Task abgeschlossen ist, verschwinden alle Spark-Pods aus dem Cluster, aber die Ausführungsprotokolle bleiben im Log-Speicher von Workflows erhalten.