Skip to content

Launching a Spark Cluster

STACKIT Workflows integrates seamlessly with Spark on Kubernetes. You can launch ephemeral Spark clusters directly from Airflow tasks using the STACKIT Spark Operator. In this tutorial you’ll learn how to the STACKIT Spark Operator to configure and start a Spark Cluster.

  1. Create a spark script in your project.

    dags/tasks/my_spark_job.py

    dags/tasks/my_spark_job.py
    import stackit_spark
    import time
    import pandas as pd
    spark = stackit_spark.get_spark()
    data = pd.DataFrame({"number": [10]})
    df = spark.createDataFrame(data)
    time.sleep(30)
    df.show()
  2. Create a DAG in your project and paste the following code snippet into you DAG file. It shows all the configuration options of the STACKITITSparkScriptOperator. The same parameters can be used through the @stackit.spark_kubernetes_task decorator.

    dags/my_spark_dag.py

    import pendulum
    from airflow.decorators import dag
    from stackit_workflows.airflow_plugin.operators import STACKITSparkScriptOperator
    from stackit_workflows import config
    @dag(
    schedule=None,
    start_date=pendulum.datetime(2021, 1, 1, tz="UTC"),
    catchup=False,
    tags=["demo"],
    dag_id="05_spark_full_configuration",
    )
    def spark_full_configuration():
    # The same arguments are supported by the decorator.
    STACKITSparkScriptOperator(
    task_id="spark_full_configuration",
    # image : str, optional
    # Image of the launched container.
    # If not specified, a current spark image maintained by stackit is used.
    image="schwarzit-xx-sit-dp-customer-artifactory-docker-local.jfrog.io/stackit-spark:spark3.5.3-0.1.2",
    # cpu : float
    # CPU Request of the pod. If spark is used with no executors,
    # this is also used to determine the number of processes (i.e. local[2]).
    # If executors are used, this is the CPU request of the driver.
    cpu=4,
    # env_vars : dict, list of K8S.V1EnvVar, optional
    # Env-Vars to pass to the pod. Dictionary contains env-vars as keys and
    # their values as values. (i.e. {'MY_ENV_VAR': 42}). Please note that
    # all numeric env-vars are cast to string before creating the Kubernetes
    # Pod Template.
    env_vars={"MY_ENV_VAR": "my_value"},
    # executor_cpu : int or float, optional
    # CPUs to be used by Spark for each executor. Decimal values are allowed.
    executor_cpu=3.5,
    # executor_memory_gb : int, optional
    # Requested memory of the executors in gb. Decimal values are allowed.
    # If not specified, it is calculated from :paramref:`executor_cpu` as:
    # ``executor_cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
    executor_memory_gb=10,
    # executors: int, optional
    # Number of kubernetes executors. If specified, `spark_mode` defaults to `k8s`
    # instead of `local`.
    executors=2,
    # get_logs : bool, optional
    # Read stdout get the stdout of the container as logs of the tasks. Default: True
    get_logs=True,
    # git_sync_branch : str, optional
    # Branch in the specified :paramref:`git_repo` from which the script is cloned.
    # If not specified, the same branch as the DAG is used.
    git_sync_ref="main",
    # git_sync_repo : str, optional
    # SSH-path of the repository from which the scripts should be fetched.
    # If not specified, the DAGs Repo is used.
    git_sync_repo=config.GIT_SYNC_DAG_REPO,
    # git_sync_target_path : str, optional
    # Path to which the ``git-sync`` init container pulls their scripts.
    # Default: `/app/git-sync/data`
    git_sync_target_path="/app/git-sync/data",
    # git_sync_secret_name : str, optional
    # The secret name which contains the SSH-Key to pull the repository.
    # If not specified, the default of the DAGs Repo is used.
    lakehouse_connections=["lakehouse-rest"],
    # memory_gb : int or float, optional
    # Memory request in GB for the (driver) pod. Decimal values are allowed.
    # If not specified, it is calculated from :paramref:`cpu` as:
    # ``cpu * config.DEFAULT_MEMORY_PER_CPU_RATIO`` (Default is a factor of 6).
    memory_gb=10,
    # name : str, optional
    # Name of the pod spawned for this task.
    # We recommend to not change this value.
    # Default: ``"task-<dag_id>-<task_id>-<hash>"``
    name="ensure_this_is_unique",
    # provide_context : bool, optional
    # Provide Airflow context as environment variables. Default: True
    # Check example 03 for more information.
    provide_context=True,
    # container_resources : K8S.V1ResourceRequirements, optional
    # Currently not supported. Use :paramref:`cpu` and
    # :paramref:`memory_gb` instead.
    # script : str
    # Path of the script to execute. If a relative path is specified, it
    # is interpreted relative to the git-sync folder.
    # If an absolute path is specified it is not adapted further.
    script="Demo/scripts/my_spark_job.py",
    # Any other arguments as supported by the `KubernetesPodOperator` can be used here.
    labels={"foo": "bar"},
    )
    spark_full_configuration()
  3. Push to the git Remote which is synched to your STACKIT Workflows environment and start the DAG. Workflows will automatically create an ephemeral Spark Cluster on Kubernetes. After the task has finished all the spark pods will disappear from the cluster but the execution logs will remain in Workflow’s log storage.