Skip to content

API Reference 📚

This page provides an auto-generated API reference for the emr-serverless-pyspark-uv-rap-template project.


EMR Deployment CLI ⚙️

The main command-line interface for deploying the application.

CLI for building and deploying a PySpark package to EMR Serverless.

deploy(entry_point: str, package_name: str, pyproject: str, dry_run: bool, deployment_note: Optional[str], enable_cw: bool, config_s3: Optional[str], build_image: bool, create_app: bool, package: bool, submit: bool, cleanup: bool) -> None

Build, package, and deploy an EMR Serverless Spark job via CLI.

Orchestrates common deployment steps: Building/pushing a Docker image, creating/updating an EMR Serverless application, packaging Python code and uploading to S3, submitting a job, and optionally cleaning up the application. If no action flags are provided, all steps except cleanup are executed in sequence. A dry-run mode logs the computed payload (and manifest) without uploading or submitting.

Parameters:

Name Type Description Default
entry_point str

Path to the driver script (e.g., "main.py"). Used when packaging.

required
package_name str

Logical application/package name used to derive S3 prefixes and metadata.

required
pyproject str

Path to pyproject.toml (used to extract name/version/requirements for the deployment manifest).

required
dry_run bool

If True, print/log the StartJobRun payload and manifest and exit without uploading or submitting.

required
deployment_note str or None

Optional note stored in the deployment manifest (e.g., change summary).

required
enable_cw bool

Whether to enable CloudWatch logging on the EMR job submission payload.

required
config_s3 str or None

S3 URI to a configuration file for the job; propagated to driver args and as spark.driverEnv.CONFIG_S3_URI.

required
build_image bool

Build and push the container image referenced by the EMR Serverless app.

required
create_app bool

Create (or update) the EMR Serverless application.

required
package bool

Package the code, upload artifacts (ZIP and entry script) to S3, and write manifest files.

required
submit bool

Submit the Spark job to EMR Serverless using the packaged artifacts.

required
cleanup bool

Stop and delete the EMR Serverless application and remove .emr_app_id.

required

Returns:

Type Description
None
Environment Variables

The following are read depending on selected actions:

Core REGION : AWS Region. DEPLOY_ENV : Environment label for S3 prefixes (default: "dev").

Build Image (when --build-image) IMAGE_URI

Create/Update App (when --create-app) IMAGE_URI, APP_NAME, RELEASE_LABEL

Package/Submit (when --package or --submit) S3_BUCKET, EMR_APP_ID, EMR_EXECUTION_ROLE

Optional Iceberg Integration (auto-detected) ICEBERG_CATALOG_NAME, ICEBERG_GLUE_DB, ICEBERG_S3_BUCKET``,ICEBERG_WAREHOUSE_PATH`

Optional Config CONFIG_S3_URI (overridden by --config-s3)

Logging ENABLE_CLOUDWATCH_LOGGING (overridden by --enable-cw/--no-enable-cw)

Raises:

Type Description
SystemExit

If required environment variables are missing for a chosen step, or when cleanup cannot determine the application ID (neither EMR_APP_ID set nor .emr_app_id file present).

Notes
  • When no action flags are provided, the tool runs: build image → create/update app → package → submit.
  • A successful --create-app writes the application ID to .emr_app_id and exports it to EMR_APP_ID for subsequent steps.
  • --dry-run prints the manifest and the EMR StartJobRun payload but does not upload artifacts or call the API.

Examples:

Run all steps (build, create/update app, package, submit):

>>> deploy()  # invoked via CLI with no flags

Dry-run a job submission (no uploads or API calls): $ python -m my_module.deploy \ --package-name my_pipeline \ --entry-point main.py \ --dry-run

Build image and (re)create app only: $ python -m my_module.deploy --build-image --create-app

Package and submit using an external config: $ python -m my_module.deploy --package --submit --config-s3 s3://bucket/path/config.toml

Clean up (stop & delete the EMR app): $ python -m my_module.deploy --cleanup


EMR Deployment Utilities 🛠️

Core functions for packaging, uploading, and submitting EMR Serverless jobs.

Utilities for packaging, uploading, and submitting EMR Serverless Spark jobs.

build_and_push_docker_image(image_uri: str, region: str, arch: str = 'linux/amd64', root_path: str = '.') -> None

Build and push a Docker image to AWS ECR.

Parameters:

Name Type Description Default
image_uri str

The URI of the ECR image repository (e.g., "123456789012.dkr.ecr.eu-west-2.amazonaws.com/my-image:latest").

required
region str

The AWS region where the ECR repository is located (e.g., "eu-west-2").

required
arch str

The target architecture for the Docker image build. Default is "linux/amd64".

'linux/amd64'
root_path str

The root path for the Docker build context/ Default is ".".

'.'

Returns:

Type Description
None

Examples:

>>> build_and_push_docker_image(
...     image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-image:latest",
...     region="eu-west-2"
... )
>>> build_and_push_docker_image(
...     image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-image:latest",
...     region="eu-west-2",
...     arch="linux/arm64",
...     root_path="./docker"
... )

build_package(package_name: str, build_dir: Path) -> Path

Build and zip the application source code for EMR Serverless.

This function copies the contents of the src/ directory into a staging directory, applying ignore patterns to exclude build artifacts (e.g., *.egg-info and __pycache__). The staging directory is then zipped into a versioned archive suitable for upload to EMR Serverless. Dependencies are not packaged and are expected to be provided by the runtime environment (e.g., via the container image).

Parameters:

Name Type Description Default
package_name str

Logical name for the staged source directory inside build_dir.

required
build_dir Path

Path to the build directory. If it already exists, it will be deleted and recreated before staging.

required

Returns:

Type Description
Path

Absolute path to the zipped application package.

Raises:

Type Description
FileNotFoundError

If the src/ directory does not exist at the project root.

Notes
  • This function is designed for packaging PySpark or Python code that will run on EMR Serverless. It assumes dependencies are managed externally.
  • The generated archive is named with a timestamp to avoid collisions.

Examples:

>>> from pathlib import Path
>>> build_dir = Path("build")
>>> package_zip = build_package("my_app", build_dir)
>>> package_zip.exists()
True

create_or_update_emr_app(app_name: str, image_uri: str, release_label: str, region: str) -> str

Create or update an EMR Serverless application.

Parameters:

Name Type Description Default
app_name str

The name of the EMR Serverless application.

required
image_uri str

The URI of the ECR image to use for the application.

required
release_label str

The EMR release label (e.g., "emr-7.9.0").

required
region str

The AWS region in which to create or update the application.

required

Returns:

Type Description
str

The application ID of the created or existing EMR Serverless application.

Examples:

>>> app_id = create_or_update_emr_app(
...     app_name="my-emr-app",
...     image_uri="123456789012.dkr.ecr.eu-west-2.amazonaws.com/my-image:latest",
...     release_label="emr-7.9.0",
...     region="eu-west-2"
... )
>>> print(app_id)
'00f1abcd1234efgh'

delete_emr_app(app_id: str, region: str) -> None

Stop and delete an EMR Serverless application.

This function first attempts to stop the specified EMR Serverless application, waiting until its state becomes STOPPED. It then deletes the application and waits for it to reach the TERMINATED state. If the application is already stopped or deleted, warnings are logged instead of raising an error.

Parameters:

Name Type Description Default
app_id str

The unique identifier of the EMR Serverless application to delete.

required
region str

The AWS region where the EMR Serverless application is deployed.

required

Raises:

Type Description
ClientError

If an unexpected AWS client error occurs while stopping or deleting the application.

TimeoutError

If the application does not reach the expected target state (STOPPED or TERMINATED) within the configured timeout.

Examples:

>>> delete_emr_app(app_id="00f1abcd1234wxyz", region="eu-west-2")
Application 00f1abcd1234wxyz stopped.
Successfully deleted application 00f1abcd1234wxyz.

submit_emr_job(code_s3_uri: str, entry_s3_uri: str, spark_submit_parameters: Optional[str], emr_app_id: str, execution_role: str, region: str, log_uri: str, *, enable_cloudwatch_logging: bool = True, application_arguments: Optional[List[str]] = None, dry_run_payload: Optional[Dict] = None) -> str

Submit a job to an EMR Serverless application.

This function constructs a Spark submit payload and launches a job on an EMR Serverless application. It optionally supports a dry run mode, which logs the payload instead of submitting it.

Parameters:

Name Type Description Default
code_s3_uri str

S3 URI of the Python ZIP archive to add to PYTHONPATH via --py-files.

required
entry_s3_uri str

S3 URI of the main entry script for the Spark job.

required
spark_submit_parameters Optional[str]

Additional parameters passed to Spark via sparkSubmitParameters (e.g., "--py-files ..." --conf ...). If None, defaults to "--py-files {code_s3_uri}".

required
emr_app_id str

The EMR Serverless application ID.

required
execution_role str

IAM role ARN used as the execution role for the EMR Serverless job.

required
region str

The AWS region in which the EMR Serverless application is running.

required
log_uri str

S3 URI where EMR Serverless job logs should be written.

required
enable_cloudwatch_logging bool

Whether to enable CloudWatch logging in the job configuration. Default is True.

True
application_arguments Optional[List[str]]

Arguments passed to the job entry point, available as sys.argv within the script.

None
dry_run_payload Optional[Dict]

If provided, the function will not call the EMR Serverless API. Instead, it logs the payload and returns a dummy job ID ("dry-run-job-id").

None

Returns:

Type Description
str

The EMR Serverless job run ID. Returns "dry-run-job-id" if dry_run_payload is set.

Raises:

Type Description
ClientError

If the EMR Serverless API request fails (when not in dry run mode).

Examples:

>>> from my_module import submit_emr_job
>>> job_id = submit_emr_job(
...     code_s3_uri="s3://my-bucket/code/app.zip",
...     entry_s3_uri="s3://my-bucket/code/main.py",
...     spark_submit_parameters=None,
...     emr_app_id="00f1abcd1234wxyz",
...     execution_role="arn:aws:iam::123456789012:role/EMRServerlessExecutionRole",
...     region="eu-west-2",
...     log_uri="s3://my-bucket/logs/",
...     application_arguments=["--config", "s3://my-bucket/config.toml"],
... )
>>> print(job_id)
'jr-1234567890abcdef'

Spark Job Logic ✨

The main PySpark job logic.

Minimal PySpark job that writes to an Iceberg table.

get_catalog_and_db(spark: SparkSession) -> Tuple[str, str]

Retrieve the Iceberg catalog name and Glue database from Spark configuration.

Parameters:

Name Type Description Default
spark SparkSession

The Spark session from which to retrieve configuration values.

required

Returns:

Type Description
Tuple[str, str]

The Iceberg catalog name and Glue database name.

Raises:

Type Description
RuntimeError

If the Glue database configuration ("spark.emr_dummy.ICEBERG_GLUE_DB") is missing.

Examples:

>>> catalog, db = get_catalog_and_db(spark)
>>> print(catalog, db)
glue_catalog my_glue_db

get_full_table_name_df(catalog: str, database: str, table: str) -> str

Construct the full table name from catalog, database, and table strings.

Parameters:

Name Type Description Default
catalog str

The catalog name.

required
database str

The database name.

required
table str

The table name.

required

Returns:

Type Description
str

The full table name in the format 'catalog.database.table'.

Examples:

>>> get_full_table_name_df("awsdatacatalog", "mydb", "mytable")
'awsdatacatalog.mydb.mytable'

get_full_table_name_sql(catalog: str, database: str, table: str) -> str

Construct a full SQL table name from catalog, database, and table components.

Parameters:

Name Type Description Default
catalog str

The catalog name.

required
database str

The database name.

required
table str

The table name.

required

Returns:

Type Description
str

The fully qualified SQL table name.

Examples:

>>> get_full_table_name_sql("awsdatacatalog", "mydb", "users")
'awsdatacatalog.`mydb`.users'

main() -> None

Run the dummy EMR job.

Parses arguments, loads configuration (from S3 or packaged file), retrieves Iceberg catalog and database from Spark configuration, creates the target Iceberg table if needed, and appends data.

Notes
  • Writes 1,000 rows with schema (id BIGINT, double BIGINT).
  • Creates the table if it does not exist.
  • Stops the Spark session at the end.

S3 Configuration Loader 📄

Utilities for loading TOML configuration files from S3.

Utilities for working with Amazon S3 URIs and objects.

read_toml_from_s3(client: boto3.client, s3_uri: str, encoding: str = 'utf-8') -> Tuple[Dict[str, Any], Dict[str, str]]

Read a TOML file from Amazon S3 using a provided client.

Downloads an object from S3, decodes it, parses it as TOML, and returns both the parsed configuration and audit metadata.

Parameters:

Name Type Description Default
client client

An initialised boto3 S3 client.

required
s3_uri str

S3 URI to the TOML file.

required
encoding str

The encoding to use for decoding the file. Defaults to "utf-8".

'utf-8'

Returns:

Type Description
Tuple[Dict[str, Any], Dict[str, str]]

A tuple containing: - The parsed TOML as a dictionary. - Audit metadata including "sha256", "etag", and "version_id". The checksum is calculated using the SHA-256 algorithm.

Raises:

Type Description
ClientError

If the S3 get_object request fails.

ValueError

If s3_uri is not a valid S3 URI.

Examples:

>>> import boto3
>>> client = boto3.client("s3")
>>> config, meta = read_toml_from_s3(client, "s3://my-bucket/config.toml")

upload_file_to_s3_key(client: boto3.client, file_path: Union[str, Path], bucket: str, key: str) -> Tuple[bool, Optional[str]]

Upload a file to an exact S3 key, returning its URI.

This function provides a clear interface for uploading to a precise S3 location by calling the general upload_file utility.

Parameters:

Name Type Description Default
client client

An initialised boto3 S3 client.

required
file_path Union[str, Path]

The local path of the file to upload.

required
bucket str

The name of the target S3 bucket.

required
key str

The exact S3 key (path within the bucket) for the uploaded file.

required

Returns:

Type Description
Tuple[bool, Optional[str]]

A tuple containing: - A boolean indicating if the upload was successful. - The S3 URI of the uploaded object, or None on failure.

Examples:

>>> # from pathlib import Path
>>> # client = boto3.client("s3")
>>> # Path("local.txt").touch()
>>> # success, uri = upload_file_to_s3_key(
... #     client, "local.txt", "my-bucket", "archive/report.txt"
... # )

upload_json(client: boto3.client, bucket: str, key: str, data: dict, indent: Optional[int] = 2, encoding: str = 'utf-8') -> Tuple[bool, Optional[str]]

Upload a JSON object to Amazon S3.

The input dictionary is serialised into a JSON string and uploaded to the specified S3 bucket and key.

Parameters:

Name Type Description Default
client client

An initialised boto3 S3 client.

required
bucket str

The name of the target S3 bucket.

required
key str

The exact S3 key (path within the bucket) where the JSON object should be stored.

required
data dict

The dictionary to serialise and upload as JSON.

required
indent Optional[int]

The indentation level for pretty-printing the JSON. Defaults to 2.

2
encoding str

The encoding to use for the uploaded file. Defaults to "utf-8".

'utf-8'

Returns:

Type Description
Tuple[bool, Optional[str]]

A tuple containing: - A boolean indicating if the upload was successful. - The S3 URI of the uploaded object, or None on failure.

Examples:

>>> # client = boto3.client("s3")
>>> # payload = {"name": "Alice", "age": 30}
>>> # success, uri = upload_json(client, "my-bucket", "users/alice.json", payload)