This tutorial is focused on using Dagster Pipes to launch & monitor general PySpark jobs. The Spark integration page provides more information on using Pipes with specific Spark providers, such as AWS EMR or Databricks.
Spark is often used with object stores such as Amazon S3. We are going to simulate this setup locally with MinIO, which has an API compatible with AWS S3. All communication between Dagster and the Spark job (metadata and logs collection) will happen through a MinIO bucket.
Make sure hadoop-aws JAR and AWS Java SDK versions are compatible with your Spark/Hadoop build.
We are going to upload the PySpark script to the S3 bucket directly inside the Dagster asset. In production, consider doing this via CI/CD instead.
Step 1: Writing the Dagster orchestration code (dagster_code.py)#
We will set up a few non-default Pipes components to streamline the otherwise challenging problem of orchestrating Spark jobs.
Let's start by creating the asset and opening a Pipes session. We will be using S3 to pass Pipes messages from the Spark job to Dagster, so we will create PipesS3MessageReader and PipesS3ContextInjector objects. (Technically, it's not strictly required to use S3 for passing the Dagster context, but storing it there will decrease the CLI arguments size).
import os
import subprocess
from collections.abc import Mapping, Sequence
from pathlib import Path
import boto3
from dagster_aws.pipes import PipesS3ContextInjector, PipesS3MessageReader
import dagster as dg
LOCAL_SCRIPT_PATH = Path(__file__).parent /"script.py"@dg.assetdefpipes_spark_asset(context: dg.AssetExecutionContext):
s3_client = boto3.client("s3")
bucket = os.environ["DAGSTER_PIPES_BUCKET"]# upload the script to S3# ideally, this should be done via CI/CD processes and not in the asset body# but for the sake of this example we are doing it here
s3_script_path =f"{context.dagster_run.run_id}/pyspark_script.py"
s3_client.upload_file(LOCAL_SCRIPT_PATH, bucket, s3_script_path)
context_injector = PipesS3ContextInjector(
client=s3_client,
bucket=bucket,)
message_reader = PipesS3MessageReader(
client=s3_client,
bucket=bucket,# the following setting will configure the Spark job to collect logs from the driver# and send them to Dagster via Pipes
include_stdio_in_messages=True,)
Notice how PipesS3MessageReader has include_stdio_in_messages=True. This setting will configure the Pipes message writer in the Spark job to collect logs from the Spark driver and send them to Dagster via Pipes messages.
We will be using CLI arguments to pass the bootstrap information from Dagster to the Spark job. We will fetch them from the session.get_bootstrap_cli_arguments method. We pass these arguments to spark-submit along with a few other settings.
# pipes_spark_asset body continues belowwith dg.open_pipes_session(
context=context,
message_reader=message_reader,
context_injector=context_injector,)as session:
dagster_pipes_args =" ".join(# prepare Pipes bootstrap CLI arguments[f"{key}{value}"for key, value in session.get_bootstrap_cli_arguments().items()])
cmd =" ".join(["spark-submit",# change --master and --deploy-mode according to specific Spark setup"--master","local[*]","--conf","spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem",# custom S3 endpoint for MinIO"--conf","spark.hadoop.fs.s3a.endpoint=http://minio:9000","--conf","spark.hadoop.fs.s3a.path.style.access=true",f"s3a://{bucket}/{s3_script_path}",
dagster_pipes_args,])
subprocess.run(# we do not forward stdio on purpose to demonstrate how Pipes collect logs from the driver
cmd,
shell=True,
check=True,)return session.get_results()
In other Pipes workflows, passing the bootstrap information from Dagster to the remote Pipes session is typically done via environment variables, but setting environment variables for Spark jobs can be complicated (the exact way of doing this depends on the Spark deployment) or not possible at all. CLI arguments are a convenient alternative.
First, create a new file named script.py, then add the following code to create a context that can be used to send messages to Dagster:
import boto3
from dagster_pipes import(
PipesCliArgsParamsLoader,
PipesS3ContextLoader,
PipesS3MessageWriter,
open_dagster_pipes,)from pyspark.sql import SparkSession
defmain():with open_dagster_pipes(
message_writer=PipesS3MessageWriter(client=boto3.client("s3")),
context_loader=PipesS3ContextLoader(client=boto3.client("s3")),
params_loader=PipesCliArgsParamsLoader(),)as pipes:print("Hello from the Spark driver!")
pipes.log.info("I am logging a Dagster message from the Spark driver!")
spark = SparkSession.builder.appName("HelloWorld").getOrCreate()
df = spark.createDataFrame([(1,"Alice",34),(2,"Bob",45),(3,"Charlie",56)],["id","name","age"],)# calculate a really important statistic
avg_age =float(df.agg({"age":"avg"}).collect()[0][0])# attach it to the asset materialization in Dagster
pipes.report_asset_materialization(
metadata={"average_age":{"raw_value": avg_age,"type":"float"}},
data_version="alpha",)
spark.stop()if __name__ =="__main__":
main()
Note how PipesCliArgsParamsLoader is used to load the CLI arguments passed by Dagster. This information will be used to automatically configure PipesS3MessageWriter and PipesS3ContextLoader.
Because the Dagster code has include_stdio_in_messages=True, the message writer will collect logs from the driver and send them to Dagster via Pipes messages.
# this docker compose file creates a mini Spark cluster with 1 master and 2 workers to simulate a distributed environmentvolumes:spark-logs:spark-data:minio-data:dagster_home:networks:spark:services:minio:image: bitnami/minio
ports:-"9000:9000"-"9001:9001"environment:MINIO_ROOT_USER: minio
MINIO_ROOT_PASSWORD: minio123
MINIO_DEFAULT_BUCKETS:"dagster-pipes:public"volumes:- minio-data:/data
networks:- spark
dagster-dev:develop:watch:-action: sync
path: .
target: /src
build:context: .
dockerfile: Dockerfile
command:-"dagster"-"dev"-"-f"-"/src/dagster_code.py"-"--host"-"0.0.0.0"-"--port"-"3000"ports:-"3000:3000"volumes:- spark-logs:/spark/logs
- spark-data:/spark/data
- dagster_home:/dagster_home
environment:AWS_ACCESS_KEY_ID: minio
AWS_SECRET_ACCESS_KEY: minio123
AWS_ENDPOINT_URL: http://minio:9000DAGSTER_PIPES_BUCKET: dagster-pipes
depends_on:- minio
networks:- spark
Start the Dagster dev instance inside Docker:
docker compose up --build --watch
`--watch` will automatically sync the changes in the local filesystem to the Docker container, which is useful for live code updates without volume mounts.
Navigate to http://localhost:3000 to open the Dagster UI and materialize the asset. Metadata and stdio logs emitted in the PySpark job will become available in Dagster.