API Reference 📚
This page provides an auto-generated API reference for the emr-serverless-pyspark-uv-rap-template project.
EMR Deployment CLI ⚙️
The main command-line interface for deploying the application.
CLI for building and deploying a PySpark package to EMR Serverless.
deploy(entry_point: str, package_name: str, pyproject: str, dry_run: bool, deployment_note: Optional[str], enable_cw: bool, config_s3: Optional[str], build_image: bool, create_app: bool, package: bool, submit: bool, cleanup: bool) -> None
Build, package, and deploy an EMR Serverless Spark job via CLI.
Orchestrates common deployment steps: Building/pushing a Docker image, creating/updating an EMR Serverless application, packaging Python code and uploading to S3, submitting a job, and optionally cleaning up the application. If no action flags are provided, all steps except cleanup are executed in sequence. A dry-run mode logs the computed payload (and manifest) without uploading or submitting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
entry_point
|
str
|
Path to the driver script (e.g., |
required |
package_name
|
str
|
Logical application/package name used to derive S3 prefixes and metadata. |
required |
pyproject
|
str
|
Path to |
required |
dry_run
|
bool
|
If True, print/log the StartJobRun payload and manifest and exit without uploading or submitting. |
required |
deployment_note
|
str or None
|
Optional note stored in the deployment manifest (e.g., change summary). |
required |
enable_cw
|
bool
|
Whether to enable CloudWatch logging on the EMR job submission payload. |
required |
config_s3
|
str or None
|
S3 URI to a configuration file for the job; propagated to driver args and
as |
required |
build_image
|
bool
|
Build and push the container image referenced by the EMR Serverless app. |
required |
create_app
|
bool
|
Create (or update) the EMR Serverless application. |
required |
package
|
bool
|
Package the code, upload artifacts (ZIP and entry script) to S3, and write manifest files. |
required |
submit
|
bool
|
Submit the Spark job to EMR Serverless using the packaged artifacts. |
required |
cleanup
|
bool
|
Stop and delete the EMR Serverless application and remove |
required |
Returns:
| Type | Description |
|---|---|
None
|
|
Environment Variables
The following are read depending on selected actions:
Core
REGION : AWS Region.
DEPLOY_ENV : Environment label for S3 prefixes (default: "dev").
Build Image (when --build-image)
IMAGE_URI
Create/Update App (when --create-app)
IMAGE_URI, APP_NAME, RELEASE_LABEL
Package/Submit (when --package or --submit)
S3_BUCKET, EMR_APP_ID, EMR_EXECUTION_ROLE
Optional Iceberg Integration (auto-detected)
ICEBERG_CATALOG_NAME, ICEBERG_GLUE_DB,
ICEBERG_S3_BUCKET``,ICEBERG_WAREHOUSE_PATH`
Optional Config
CONFIG_S3_URI (overridden by --config-s3)
Logging
ENABLE_CLOUDWATCH_LOGGING (overridden by --enable-cw/--no-enable-cw)
Raises:
| Type | Description |
|---|---|
SystemExit
|
If required environment variables are missing for a chosen step,
or when cleanup cannot determine the application ID (neither
|
Notes
- When no action flags are provided, the tool runs: build image → create/update app → package → submit.
- A successful
--create-appwrites the application ID to.emr_app_idand exports it toEMR_APP_IDfor subsequent steps. --dry-runprints the manifest and the EMR StartJobRun payload but does not upload artifacts or call the API.
Examples:
Run all steps (build, create/update app, package, submit):
>>> deploy() # invoked via CLI with no flags
Dry-run a job submission (no uploads or API calls): $ python -m my_module.deploy \ --package-name my_pipeline \ --entry-point main.py \ --dry-run
Build image and (re)create app only: $ python -m my_module.deploy --build-image --create-app
Package and submit using an external config: $ python -m my_module.deploy --package --submit --config-s3 s3://bucket/path/config.toml
Clean up (stop & delete the EMR app): $ python -m my_module.deploy --cleanup
EMR Deployment Utilities 🛠️
Core functions for packaging, uploading, and submitting EMR Serverless jobs.
Utilities for packaging, uploading, and submitting EMR Serverless Spark jobs.
build_and_push_docker_image(image_uri: str, region: str, arch: str = 'linux/amd64', root_path: str = '.') -> None
Build and push a Docker image to AWS ECR.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
image_uri
|
str
|
The URI of the ECR image repository (e.g., "123456789012.dkr.ecr.eu-west-2.amazonaws.com/my-image:latest"). |
required |
region
|
str
|
The AWS region where the ECR repository is located (e.g., "eu-west-2"). |
required |
arch
|
str
|
The target architecture for the Docker image build. Default is "linux/amd64". |
'linux/amd64'
|
root_path
|
str
|
The root path for the Docker build context/ Default is ".". |
'.'
|
Returns:
| Type | Description |
|---|---|
None
|
|
Examples:
>>> build_and_push_docker_image(
... image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-image:latest",
... region="eu-west-2"
... )
>>> build_and_push_docker_image(
... image_uri="123456789012.dkr.ecr.us-west-2.amazonaws.com/my-image:latest",
... region="eu-west-2",
... arch="linux/arm64",
... root_path="./docker"
... )
build_package(package_name: str, build_dir: Path) -> Path
Build and zip the application source code for EMR Serverless.
This function copies the contents of the src/ directory into a
staging directory, applying ignore patterns to exclude build artifacts
(e.g., *.egg-info and __pycache__). The staging directory is then
zipped into a versioned archive suitable for upload to EMR Serverless.
Dependencies are not packaged and are expected to be provided by the
runtime environment (e.g., via the container image).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
package_name
|
str
|
Logical name for the staged source directory inside |
required |
build_dir
|
Path
|
Path to the build directory. If it already exists, it will be deleted and recreated before staging. |
required |
Returns:
| Type | Description |
|---|---|
Path
|
Absolute path to the zipped application package. |
Raises:
| Type | Description |
|---|---|
FileNotFoundError
|
If the |
Notes
- This function is designed for packaging PySpark or Python code that will run on EMR Serverless. It assumes dependencies are managed externally.
- The generated archive is named with a timestamp to avoid collisions.
Examples:
>>> from pathlib import Path
>>> build_dir = Path("build")
>>> package_zip = build_package("my_app", build_dir)
>>> package_zip.exists()
True
create_or_update_emr_app(app_name: str, image_uri: str, release_label: str, region: str) -> str
Create or update an EMR Serverless application.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app_name
|
str
|
The name of the EMR Serverless application. |
required |
image_uri
|
str
|
The URI of the ECR image to use for the application. |
required |
release_label
|
str
|
The EMR release label (e.g., "emr-7.9.0"). |
required |
region
|
str
|
The AWS region in which to create or update the application. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The application ID of the created or existing EMR Serverless application. |
Examples:
>>> app_id = create_or_update_emr_app(
... app_name="my-emr-app",
... image_uri="123456789012.dkr.ecr.eu-west-2.amazonaws.com/my-image:latest",
... release_label="emr-7.9.0",
... region="eu-west-2"
... )
>>> print(app_id)
'00f1abcd1234efgh'
delete_emr_app(app_id: str, region: str) -> None
Stop and delete an EMR Serverless application.
This function first attempts to stop the specified EMR Serverless
application, waiting until its state becomes STOPPED. It then
deletes the application and waits for it to reach the TERMINATED
state. If the application is already stopped or deleted, warnings are
logged instead of raising an error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
app_id
|
str
|
The unique identifier of the EMR Serverless application to delete. |
required |
region
|
str
|
The AWS region where the EMR Serverless application is deployed. |
required |
Raises:
| Type | Description |
|---|---|
ClientError
|
If an unexpected AWS client error occurs while stopping or deleting the application. |
TimeoutError
|
If the application does not reach the expected target state
( |
Examples:
>>> delete_emr_app(app_id="00f1abcd1234wxyz", region="eu-west-2")
Application 00f1abcd1234wxyz stopped.
Successfully deleted application 00f1abcd1234wxyz.
submit_emr_job(code_s3_uri: str, entry_s3_uri: str, spark_submit_parameters: Optional[str], emr_app_id: str, execution_role: str, region: str, log_uri: str, *, enable_cloudwatch_logging: bool = True, application_arguments: Optional[List[str]] = None, dry_run_payload: Optional[Dict] = None) -> str
Submit a job to an EMR Serverless application.
This function constructs a Spark submit payload and launches a job on an EMR Serverless application. It optionally supports a dry run mode, which logs the payload instead of submitting it.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
code_s3_uri
|
str
|
S3 URI of the Python ZIP archive to add to |
required |
entry_s3_uri
|
str
|
S3 URI of the main entry script for the Spark job. |
required |
spark_submit_parameters
|
Optional[str]
|
Additional parameters passed to Spark via |
required |
emr_app_id
|
str
|
The EMR Serverless application ID. |
required |
execution_role
|
str
|
IAM role ARN used as the execution role for the EMR Serverless job. |
required |
region
|
str
|
The AWS region in which the EMR Serverless application is running. |
required |
log_uri
|
str
|
S3 URI where EMR Serverless job logs should be written. |
required |
enable_cloudwatch_logging
|
bool
|
Whether to enable CloudWatch logging in the job configuration. Default is True. |
True
|
application_arguments
|
Optional[List[str]]
|
Arguments passed to the job entry point, available as
|
None
|
dry_run_payload
|
Optional[Dict]
|
If provided, the function will not call the EMR Serverless API.
Instead, it logs the payload and returns a dummy job ID
( |
None
|
Returns:
| Type | Description |
|---|---|
str
|
The EMR Serverless job run ID. Returns |
Raises:
| Type | Description |
|---|---|
ClientError
|
If the EMR Serverless API request fails (when not in dry run mode). |
Examples:
>>> from my_module import submit_emr_job
>>> job_id = submit_emr_job(
... code_s3_uri="s3://my-bucket/code/app.zip",
... entry_s3_uri="s3://my-bucket/code/main.py",
... spark_submit_parameters=None,
... emr_app_id="00f1abcd1234wxyz",
... execution_role="arn:aws:iam::123456789012:role/EMRServerlessExecutionRole",
... region="eu-west-2",
... log_uri="s3://my-bucket/logs/",
... application_arguments=["--config", "s3://my-bucket/config.toml"],
... )
>>> print(job_id)
'jr-1234567890abcdef'
Spark Job Logic ✨
The main PySpark job logic.
Minimal PySpark job that writes to an Iceberg table.
get_catalog_and_db(spark: SparkSession) -> Tuple[str, str]
Retrieve the Iceberg catalog name and Glue database from Spark configuration.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
spark
|
SparkSession
|
The Spark session from which to retrieve configuration values. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[str, str]
|
The Iceberg catalog name and Glue database name. |
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If the Glue database configuration ("spark.emr_dummy.ICEBERG_GLUE_DB") is missing. |
Examples:
>>> catalog, db = get_catalog_and_db(spark)
>>> print(catalog, db)
glue_catalog my_glue_db
get_full_table_name_df(catalog: str, database: str, table: str) -> str
Construct the full table name from catalog, database, and table strings.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
The catalog name. |
required |
database
|
str
|
The database name. |
required |
table
|
str
|
The table name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The full table name in the format 'catalog.database.table'. |
Examples:
>>> get_full_table_name_df("awsdatacatalog", "mydb", "mytable")
'awsdatacatalog.mydb.mytable'
get_full_table_name_sql(catalog: str, database: str, table: str) -> str
Construct a full SQL table name from catalog, database, and table components.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
catalog
|
str
|
The catalog name. |
required |
database
|
str
|
The database name. |
required |
table
|
str
|
The table name. |
required |
Returns:
| Type | Description |
|---|---|
str
|
The fully qualified SQL table name. |
Examples:
>>> get_full_table_name_sql("awsdatacatalog", "mydb", "users")
'awsdatacatalog.`mydb`.users'
main() -> None
Run the dummy EMR job.
Parses arguments, loads configuration (from S3 or packaged file), retrieves Iceberg catalog and database from Spark configuration, creates the target Iceberg table if needed, and appends data.
Notes
- Writes 1,000 rows with schema
(id BIGINT, double BIGINT). - Creates the table if it does not exist.
- Stops the Spark session at the end.
S3 Configuration Loader 📄
Utilities for loading TOML configuration files from S3.
Utilities for working with Amazon S3 URIs and objects.
read_toml_from_s3(client: boto3.client, s3_uri: str, encoding: str = 'utf-8') -> Tuple[Dict[str, Any], Dict[str, str]]
Read a TOML file from Amazon S3 using a provided client.
Downloads an object from S3, decodes it, parses it as TOML, and returns both the parsed configuration and audit metadata.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
An initialised boto3 S3 client. |
required |
s3_uri
|
str
|
S3 URI to the TOML file. |
required |
encoding
|
str
|
The encoding to use for decoding the file. Defaults to "utf-8". |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
Tuple[Dict[str, Any], Dict[str, str]]
|
A tuple containing: - The parsed TOML as a dictionary. - Audit metadata including "sha256", "etag", and "version_id". The checksum is calculated using the SHA-256 algorithm. |
Raises:
| Type | Description |
|---|---|
ClientError
|
If the S3 get_object request fails. |
ValueError
|
If |
Examples:
>>> import boto3
>>> client = boto3.client("s3")
>>> config, meta = read_toml_from_s3(client, "s3://my-bucket/config.toml")
upload_file_to_s3_key(client: boto3.client, file_path: Union[str, Path], bucket: str, key: str) -> Tuple[bool, Optional[str]]
Upload a file to an exact S3 key, returning its URI.
This function provides a clear interface for uploading to a precise S3
location by calling the general upload_file utility.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
An initialised boto3 S3 client. |
required |
file_path
|
Union[str, Path]
|
The local path of the file to upload. |
required |
bucket
|
str
|
The name of the target S3 bucket. |
required |
key
|
str
|
The exact S3 key (path within the bucket) for the uploaded file. |
required |
Returns:
| Type | Description |
|---|---|
Tuple[bool, Optional[str]]
|
A tuple containing: - A boolean indicating if the upload was successful. - The S3 URI of the uploaded object, or None on failure. |
Examples:
>>> # from pathlib import Path
>>> # client = boto3.client("s3")
>>> # Path("local.txt").touch()
>>> # success, uri = upload_file_to_s3_key(
... # client, "local.txt", "my-bucket", "archive/report.txt"
... # )
upload_json(client: boto3.client, bucket: str, key: str, data: dict, indent: Optional[int] = 2, encoding: str = 'utf-8') -> Tuple[bool, Optional[str]]
Upload a JSON object to Amazon S3.
The input dictionary is serialised into a JSON string and uploaded to the specified S3 bucket and key.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
client
|
client
|
An initialised boto3 S3 client. |
required |
bucket
|
str
|
The name of the target S3 bucket. |
required |
key
|
str
|
The exact S3 key (path within the bucket) where the JSON object should be stored. |
required |
data
|
dict
|
The dictionary to serialise and upload as JSON. |
required |
indent
|
Optional[int]
|
The indentation level for pretty-printing the JSON. Defaults to 2. |
2
|
encoding
|
str
|
The encoding to use for the uploaded file. Defaults to "utf-8". |
'utf-8'
|
Returns:
| Type | Description |
|---|---|
Tuple[bool, Optional[str]]
|
A tuple containing: - A boolean indicating if the upload was successful. - The S3 URI of the uploaded object, or None on failure. |
Examples:
>>> # client = boto3.client("s3")
>>> # payload = {"name": "Alice", "age": 30}
>>> # success, uri = upload_json(client, "my-bucket", "users/alice.json", payload)