Skip to main content

Large payload storage - Python SDK

Release, stability, and dependency info

External Storage is in Pre-Release. APIs and configuration may change before the stable release. Join the #large-payloads Slack channel to provide feedback or ask for help.

The Temporal Service enforces a 2 MB per-payload limit by default. This limit is configurable on self-hosted deployments. When your Workflows or Activities handle data larger than the limit, you can offload payloads to external storage, such as Amazon S3, and pass a small reference token through the Event History instead. This page shows you how to set up External Storage with Amazon S3 and how to implement a custom storage driver.

Your external storage system's lifecycle policy must ensure that payloads remain available for the entire lifetime of the Workflow plus its retention window. Refer to lifecycle management for more information.

For a conceptual overview of External Storage and its use cases, see External Storage.

Store and retrieve large payloads with Amazon S3

The Python SDK includes an S3 storage driver. Follow these steps to set it up:

Prerequisites

  • An Amazon S3 bucket that you have write access to. Refer to lifecycle management for the required lifecycle policy.
  • The aioboto3 library is installed and available.

Procedure

  1. Create an S3 client using aioboto3 and pass it to the S3StorageDriver. The driver uses your standard AWS credentials from the environment (environment variables, IAM role, or AWS config file):

    import aioboto3
    from temporalio.contrib.aws.s3driver import S3StorageDriver
    from temporalio.contrib.aws.s3driver.aioboto3 import new_aioboto3_client

    session = aioboto3.Session(profile_name=AWS_PROFILE, region_name=AWS_REGION)
    async with session.client("s3") as s3_client:
    driver = S3StorageDriver(
    client=new_aioboto3_client(s3_client),
    bucket="my-temporal-payloads",
    )
  2. Configure the driver on your DataConverter and pass the converter to your Client and Worker:

    import dataclasses
    import temporalio.converter
    from temporalio.converter import ExternalStorage

    data_converter = dataclasses.replace(
    temporalio.converter.default(),
    external_storage=ExternalStorage(
    drivers=[driver]
    ),
    )

    connect_config = ClientConfig.load_client_connect_config()

    client = await Client.connect(**connect_config, data_converter=data_converter)

    worker = Worker(
    client,
    task_queue="my-task-queue",
    workflows=[MyWorkflow],
    activities=[my_activity],
    )

    By default, payloads larger than 256 KiB are offloaded to external storage. You can adjust this with the payload_size_threshold parameter, or set it to 1 to externalize all payloads regardless of size. Refer to Configure payload size threshold for more information.

    All Workflows and Activities running on the Worker use the storage driver automatically without changes to your business logic. The driver uploads and downloads payloads concurrently and validates payload integrity on retrieve.

Implement a custom storage driver

If you need a storage backend other than what the built-in drivers allow, you can implement your own storage driver. Ensure the payloads stored in your external storage system are available for the entire lifetime of the Workflow plus its retention window. Refer to lifecycle management for more information.

The following example shows a complete custom driver implementation that uses local disk as the backing store:

import os
import uuid
from typing import Sequence

from temporalio.api.common.v1 import Payload
from temporalio.converter import (
StorageDriver,
StorageDriverClaim,
StorageDriverStoreContext,
StorageDriverRetrieveContext,
)


class LocalDiskStorageDriver(StorageDriver):
def __init__(self, store_dir: str = "/tmp/temporal-payload-store") -> None:
self._store_dir = store_dir

def name(self) -> str:
return "local-disk"

async def store(
self,
context: StorageDriverStoreContext,
payloads: Sequence[Payload],
) -> list[StorageDriverClaim]:
os.makedirs(self._store_dir, exist_ok=True)

prefix = self._store_dir
sc = context.serialization_context
if sc is not None and hasattr(sc, "workflow_id"):
prefix = os.path.join(self._store_dir, sc.namespace, sc.workflow_id)
os.makedirs(prefix, exist_ok=True)

claims = []
for payload in payloads:
key = f"{uuid.uuid4()}.bin"
file_path = os.path.join(prefix, key)
with open(file_path, "wb") as f:
f.write(payload.SerializeToString())
claims.append(StorageDriverClaim(claim_data={"path": file_path}))
return claims

async def retrieve(
self,
context: StorageDriverRetrieveContext,
claims: Sequence[StorageDriverClaim],
) -> list[Payload]:
payloads = []
for claim in claims:
file_path = claim.claim_data["path"]
with open(file_path, "rb") as f:
data = f.read()
payload = Payload()
payload.ParseFromString(data)
payloads.append(payload)
return payloads

1. Extend the StorageDriver class

A custom driver extends the StorageDriver abstract class and implements three methods:

  • name() returns a unique string that identifies the driver.
  • store() receives a list of payloads and returns one StorageDriverClaim per payload. A claim is a set of string key-value pairs that the driver uses to locate the payload later.
  • retrieve() receives the claims that store() produced and returns the original payloads.

2. Store payloads

In store(), convert each Payload protobuf message to bytes with payload.SerializeToString() and write the bytes to your storage system. The application data has already been serialized by the Payload Converter before it reaches the driver.

Return a StorageDriverClaim for each payload with enough information to retrieve it later. Structure your storage keys to include the namespace and Workflow ID from the serialization context so that you can identify which Workflow owns each payload. Within that scope, consider using content-addressable keys (such as a SHA-256 hash of the payload bytes) to deduplicate identical payloads.

3. Retrieve payloads

In retrieve(), download the bytes using the claim data, then reconstruct the Payload protobuf message with payload.ParseFromString(data). The Payload Converter handles deserializing the application data after the driver returns the payload.

4. Configure the Data Converter

Pass an ExternalStorage instance to your DataConverter and use the converter when creating your Client and Worker:

import dataclasses
import temporalio.converter
from temporalio.converter import ExternalStorage

data_converter = dataclasses.replace(
temporalio.converter.default(),
external_storage=ExternalStorage(
drivers=[MyStorageDriver()],
),
)

Configure payload size threshold

You can configure the payload size threshold that triggers external storage. By default, payloads larger than 256 KiB are offloaded to external storage. You can adjust this with the payload_size_threshold parameter, or set it to 1 to externalize all payloads regardless of size.

import dataclasses
import temporalio.converter
from temporalio.converter import ExternalStorage

data_converter = dataclasses.replace(
temporalio.converter.default(),
external_storage=ExternalStorage(
drivers=[driver],
payload_size_threshold=1,
),
)

Use multiple storage drivers

When you have multiple drivers, such as for hot and cold storage tiers, pass a driver_selector function that chooses which driver handles each payload:

from temporalio.converter import ExternalStorage

hot_driver = MyStorageDriver("hot-bucket")
cold_driver = MyStorageDriver("cold-bucket")

ExternalStorage(
drivers=[hot_driver, cold_driver],
driver_selector=lambda context, payload: (
cold_driver if payload.ByteSize() > 1_000_000 else hot_driver
),
)

Return None from the selector to keep a specific payload inline.