steamship.invocable package#

Subpackages#

Submodules#

steamship.invocable.config module#

class steamship.invocable.config.Config[source]#

Bases: CamelModel

Base class Steamship Package and Plugin configuration objects.

extend_with_dict(d: dict, overwrite: bool = False)[source]#

Sets the attributes on this object with provided keys and values.

extend_with_json_file(path: Path, overwrite: bool = False, fail_on_missing_file: bool = True)[source]#

Extends this config object’s values with a JSON file from disk.

This is useful for applying late-bound defaults, such as API keys added to a deployment bundle.

classmethod get_config_parameters() Dict[str, ConfigParameter][source]#
static strip_enum(default_value)[source]#

steamship.invocable.dev_logging_handler module#

class steamship.invocable.dev_logging_handler.DevelopmentLoggingHandler(log_level: any = 30, log_level_for_messages: any = 20, file_log_level: any = 20)[source]#

Bases: StreamHandler

A logging handler for developing Steamship Agents, Tools, Packages, and Plugins locally.

emit(record)[source]#

Emit the record, printing it to console out.

We rely on TWO logging levels for the mechanics of this LoggingHandler:

  • One for standard logging

  • One for specific system Agent-related events, flagged with metadata

This is to permit INFO-level logging of key Agent/Tool actions without committing the user to see all INFO-level logging globally.

A future implementation may use a cascade of loggers attached to the AgentContext to do this more cleanly.

file_handler: FileHandler | None#
file_log_level: any#
static init_and_take_root(log_level: any = 20) DevelopmentLoggingHandler[source]#
log_filename: str#
log_level: any#
log_level_with_message_type: any#

steamship.invocable.entrypoint module#

This class is necessary to be able to please the entrypoints of both localstack and AWS.

If we set the entrypoint directly to steamship.invocable.safe_handler (imported in the init from lambda_handler), AWS is happy, but localstack is not because it tries to read steamship.invocable as a py file, not a module.

If we set the entrypoint to steamship.invocable.lambda_handler.safe_handler, Localstack is happy, but AWS is not, because it tries to read lambda_handler first, which imports things from steamship.invocable, which imports things from lambda_handler.

By adding this file which basically no-ops safe_handler into steamship.invocable.entrypoint.safe_handler, both are happy.

steamship.invocable.invocable module#

Please see https://docs.steamship.com/ for information about building a Steamship Package

class steamship.invocable.invocable.Invocable(client: Steamship = None, config: Dict[str, Any] = None, context: InvocationContext = None)[source]#

Bases: ABC

A Steamship microservice.

This model.py class:

  1. Provide a pre-authenticated instance of the Steamship client

  2. Provides a Lambda handler that routes to registered functions

  3. Provides useful methods connecting functions to the router.

add_api_route(method_spec: MethodSpec, permit_overwrite_of_existing: bool = False)[source]#

Add an API route to this Invocable instance.

config: Config#
classmethod config_cls() Type[Config][source]#

Returns the configuration object for the Invocable.

By default, Steamship packages and plugins will not take any configuration. Steamship packages and plugins may declare a configuration object which extends from Config, if needed, as follows:

class MyPackageOrPlugin:
class MyConfig(Config):

@classmethod def config_cls(cls):

return MyPackageOrPlugin.MyConfig

context: InvocationContext#
classmethod get_config_parameters() Dict[str, ConfigParameter][source]#
instance_init()[source]#

The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this does nothing.

invocable_instance_init() InvocableResponse[source]#
class steamship.invocable.invocable.RouteMethod(*, attribute: Any = None, verb: Verb | None = None, path: str, config: Dict[str, Any])[source]#

Bases: BaseModel

attribute: Any#
config: Dict[str, Any]#
path: str#
verb: Verb | None#
steamship.invocable.invocable.endpoint(verb: str = None, path: str = None, **kwargs)[source]#

By using kwargs we can tag the function with Any parameters.

steamship.invocable.invocable.find_route_methods(on_class: Type) List[RouteMethod][source]#
steamship.invocable.invocable.get(path: str, **kwargs)[source]#
steamship.invocable.invocable.make_registering_decorator(decorator)[source]#

Returns a copy of foreignDecorator, which is identical in every way(*), except also appends a .decorator property to the callable it spits out.

(*)We can be somewhat “hygienic”, but newDecorator still isn’t signature-preserving, i.e. you will not be able to get a runtime list of parameters. For that, you need hackish libraries…but in this case, the only argument is func, so it’s not a big issue

steamship.invocable.invocable.merge_routes_respecting_override(base_routes: List[RouteMethod], this_class_routes: List[RouteMethod]) List[RouteMethod][source]#

Merge routes from base classes into the routes from this class. If this class already has verb/path combo, ignore the one from the superclass, since it has now been overridden.

steamship.invocable.invocable.post(path: str, **kwargs)[source]#
steamship.invocable.invocable.route_list_contains(route_method: RouteMethod, routes: List[RouteMethod])[source]#

steamship.invocable.invocable_localhost module#

steamship.invocable.invocable_request module#

class steamship.invocable.invocable_request.InvocableRequest(*, clientConfig: Configuration = None, invocation: Invocation = None, loggingConfig: LoggingConfig = None, invocationContext: InvocationContext = None)[source]#

Bases: CamelModel

A request as the Steamship Hosting Framework receives it from the Engine.

This class is different from the other Request class:
  • steamship.base.request represents a request from the Steamship Client

  • this class represents a request from the Steamship Engine to a Steamship-hosted App/Plugin

It contains both a package/plugin invocation and also the client configuration in which that invocation is intended to execute.

client_config: Configuration#
invocation: Invocation#
invocation_context: InvocationContext#
logging_config: LoggingConfig#
class steamship.invocable.invocable_request.Invocation(*, httpVerb: str = None, invocationPath: str = None, arguments: Dict[str, Any] = None, config: Dict[str, Any] = None)[source]#

Bases: CamelModel

arguments: Dict[str, Any]#
config: Dict[str, Any]#
http_verb: str#
invocation_path: str#
class steamship.invocable.invocable_request.InvocationContext(*, tenantId: str = None, userId: str = None, workspaceId: str = None, invocableHandle: str = None, invocableVersionHandle: str = None, invocableInstanceHandle: str = None, invocableType: str = None, invocableOwnerId: str = None, invocableURL: str = None, invocableOwnerHandle: str | None = None, workspaceHandle: str | None = None, headers: Dict[str, str] = None)[source]#

Bases: CamelModel

headers: Dict[str, str]#
invocable_handle: str#
invocable_instance_handle: str#
invocable_owner_handle: str | None#
invocable_owner_id: str#
invocable_type: str#
invocable_url: str#
invocable_version_handle: str#
tenant_id: str#
user_id: str#
workspace_handle: str | None#
workspace_id: str#
class steamship.invocable.invocable_request.LoggingConfig(*, loggingHost: str = None, loggingPort: str = None)[source]#

Bases: CamelModel

logging_host: str#
logging_port: str#

steamship.invocable.invocable_response module#

class steamship.invocable.invocable_response.Http(*, status: int = None, base64Wrapped: bool = None, headers: Dict[str, str] = None)[source]#

Bases: CamelModel

base64_wrapped: bool#
headers: Dict[str, str]#
status: int#
class steamship.invocable.invocable_response.InvocableResponse(status: Task = None, error: SteamshipError = None, http: Http = None, data: Any = None, string: str = None, json: Any = None, _bytes: bytes | io.BytesIO = None, mime_type=None)[source]#

Bases: GenericModel, Generic[T]

Mirrors the Response object in the Steamship server.

data: T#
static error(code: int, message: str | None = None, error: SteamshipError | None = None, exception: Exception | None = None, prefix: str | None = None) InvocableResponse[source]#

Merges a number of error channels into one unified Response object.

Aggregates all possible messages into a single “ | “-delimeted error message.

If the final resulting error message is non-null, prefixes with the provided prefix

static from_obj(obj: Any) InvocableResponse[source]#
http: Http#
post_update(client: Client)[source]#

Pushes this response object to the corresponding Task on the Steamship Engine.

Typically apps and plugins return their results to the Engine synchronously via HTTP. But sometimes that’s not practice – for example:

  • Microsoft’s OCR endpoint returns a Job Token that can be exchanged for updates, and eventually a result.

  • Google’s AutoML can take 20-30 minutes to train.

  • Fine-tuning BERT on ECS can take an arbitrarily long amount of time.

In these cases, it can be useful for the package/plugin to occasionally post updates to the Engine outside of the Engine’s initial synchronous request-response conversation.

set_data(data: Any = None, string: str = None, json: Any = None, _bytes: bytes | BytesIO = None, mime_type=None)[source]#
status: Task#
class steamship.invocable.invocable_response.StreamingResponse(*, task: Task, file: File)[source]#

Bases: CamelModel

StreamingResponse holds the basic information for an asynchronous request to Steamship that returns a stream.

It consists of two parts:
  1. A Task that represents the scheduled background work. This Task SHOULD be dependent on any other resultant child Task`s that are scheduled by a package. The returned `Task will have a requestId that can be used to reference the originating request (and can be used to filter streams).

  2. A File object that can be used to stream back Block`s added by a package in response to the request (when completing the `Task).

Streams in Steamship are File-centric. The streams consist of Server-Side Events for Block creation events. These events represent work performed, as observed by recording new data to a common File. For example, AgentService operations log all work to a ChatHistory-managed File. This File will contain all intermediate work for a request and be used for streaming interactions with LLMs, etc.

To consume a Steamship stream, clients should use the File::id and the Task::requestId fields to request the data via: https://steamship.run/api/v1/file/{file_id}/stream?requestId={req_id}&timeoutSeconds=30”.

StreamingResponse differs from InvocableResponse in that it contains both Task and File simultaneously (rather than a status OR data field).

file: File#

Provides the stream of events, as a series of block creation events (sent as SSEs).

task: Task#

Represents background work that will produce a stream of events for a given request.

steamship.invocable.lambda_handler module#

steamship.invocable.lambda_handler.create_custom_format(invocation_context: InvocationContext, event: Dict) Callable[[LogRecord], Dict][source]#
steamship.invocable.lambda_handler.create_handler(invocable_cls: Type[Invocable])[source]#

Deprecated wrapper function for a Steamship invocable within an AWS Lambda function. Called by code within a plugin or package.

steamship.invocable.lambda_handler.create_safe_handler(known_invocable_for_testing: Type[Invocable] = None)[source]#
steamship.invocable.lambda_handler.encode_exception(obj)[source]#

When logging an exception ex: logging.exception(some_error), the exception must be turned into a string so that it is accepted by elasticsearch

steamship.invocable.lambda_handler.get_class_from_module(module) Type[Invocable][source]#
steamship.invocable.lambda_handler.handler(bound_internal_handler, event: Dict, _: Dict = None, running_locally: bool = False) dict[source]#
steamship.invocable.lambda_handler.internal_handler(invocable_cls_func: Callable[[], Type[Invocable]], event: Dict, client: Steamship, invocation_context: InvocationContext, call_instance_init: bool = False) InvocableResponse[source]#
steamship.invocable.lambda_handler.safe_handler(event, context=None)#
steamship.invocable.lambda_handler.safely_find_invocable_class() Type[Invocable][source]#

Safely find the invocable class within invocable code.

steamship.invocable.package_mixin module#

class steamship.invocable.package_mixin.PackageMixin[source]#

Bases: ABC

instance_init()[source]#

The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this does nothing.

steamship.invocable.package_service module#

class steamship.invocable.package_service.PackageService(*args, **kwargs)[source]#

Bases: Invocable

The Abstract Base Class of a Steamship Package.

Packages may implement whatever methods they like. To expose these methods as invocable HTTP routes, annotate the method with @get or @post and the route name.

Package implementations are effectively stateless, though they will have stateful

USED_MIXIN_CLASSES: List[Type[PackageMixin]] = []#
add_mixin(mixin: PackageMixin, permit_overwrite_of_existing_methods: bool = False)[source]#
instance_init()[source]#

The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this calls instance_init on mixins, in order.

invoke_later(method: str, verb: Verb = Verb.POST, wait_on_tasks: List[Task] = None, arguments: Dict[str, Any] = None, delay_ms: int | None = None) Task[Any][source]#

Schedule a method for future invocation.

Parameters:
  • method (str) – The method to invoke, as registered with Steamship in the @get or @post decorator.

  • verb (Verb) – The HTTP Verb to use. Default is POST.

  • wait_on_tasks (List[Task]) – A list of Task objects (or task IDs) that should be waited upon before invocation.

  • arguments (Dict[str, Any]) – The keyword arguments of the invoked method

  • delay_ms (Optional[int]) – A delay, in milliseconds, before the invocation should execute.

Returns:

A Task representing the future work

Return type:

Task[Any]

mixins: List[PackageMixin]#
classmethod scan_mixin(package_spec: PackageSpec, mixin_class: Type[PackageMixin], mixin_instance: PackageMixin | None = None, permit_overwrite_of_existing_methods: bool = False)[source]#

steamship.invocable.paramater_types module#

class steamship.invocable.paramater_types.fileurl[source]#

Bases: str

Type alias that, if used in a package method argument, will cause a file upload widget to appear.

class steamship.invocable.paramater_types.longstr[source]#

Bases: str

Long string functions mostly as a type annotation for the web.

steamship.invocable.plugin_service module#

class steamship.invocable.plugin_service.PluginService(client: Steamship = None, config: Dict[str, Any] = None, context: InvocationContext = None)[source]#

Bases: Invocable, Generic[IN, OUT], ABC

The Abstract Base Class of a Steamship Plugin.

All Steamship Plugins implement the operation:

  • run(PluginRequest[T]) -> Response[U]

Many plugins are effectively stateless. This run operation defines their entire capability. Examples of such stateless plugins are: - File Import Plugin - Export Plugin

Other plugins have state but in a very controlled way: - they can be trained, - this trainable process produces a “model”, - that model acts as the state on which the run method is conditioned

This model is stored in the Steamship Workspace that owns the Plugin Instance, and access to it is provided by the hosting environment that runs the model. - TODO(ted) Document this process.

These stateful plugins are called “Trainable Plugins,” and they must implement the following additional methods:

  • get_training_parameters(PluginRequest[TrainingParameterInput]) -> Response[TrainingParameterOutput]

  • train(PluginRequest[TrainPluginInput]) -> Response[TrainPluginOutput]

abstract run(request: PluginRequest[IN]) OUT | InvocableResponse[OUT][source]#

Runs the core operation implemented by this plugin: import, export, blockify, tag, etc.

This is the method that a Steamship Plugin implements to perform its main work.

class steamship.invocable.plugin_service.TrainablePluginService(client: Steamship = None, config: Dict[str, Any] = None, context: InvocationContext = None)[source]#

Bases: PluginService, Generic[IN, OUT], ABC

abstract get_training_parameters(request: PluginRequest[TrainingParameterPluginInput]) InvocableResponse[TrainingParameterPluginOutput][source]#

Produces the trainable parameters for this plugin.

This method is run by the Steamship Engine prior to training to fetch hyperparameters.

  • The user themselves can provide hyperparameters on the TrainingParameterPluginInput object.

  • This method then transforms those into the TrainingParameterPluginOutput object, altering the user’s values if desired.

  • The Engine then takes those TrainingParameterPluginOutput and presents them on the TrainPluginInput

abstract model_cls() Type[TrainableModel][source]#

Returns the constructor of the TrainableModel this TrainablePluginService uses.

This is required so the run method below can load the model and provide it to the subclass implementor.

run(request: PluginRequest[IN]) OUT | InvocableResponse[OUT][source]#

Loads the trainable model before passing the request to the run_with_model handler on the subclass.

abstract run_with_model(request: PluginRequest[IN], model: TrainableModel) OUT | InvocableResponse[OUT][source]#

Rather than implementing run(request), a TrainablePluginService implements run_with_model(request, model)

abstract train(request: PluginRequest[TrainPluginInput], model: TrainableModel) InvocableResponse[TrainPluginOutput][source]#

Train the model.

abstract train_status(request: PluginRequest[TrainPluginInput], model: TrainableModel) InvocableResponse[TrainPluginOutput][source]#

Train the model.

Module contents#

class steamship.invocable.Config[source]#

Bases: CamelModel

Base class Steamship Package and Plugin configuration objects.

extend_with_dict(d: dict, overwrite: bool = False)[source]#

Sets the attributes on this object with provided keys and values.

extend_with_json_file(path: Path, overwrite: bool = False, fail_on_missing_file: bool = True)[source]#

Extends this config object’s values with a JSON file from disk.

This is useful for applying late-bound defaults, such as API keys added to a deployment bundle.

classmethod get_config_parameters() Dict[str, ConfigParameter][source]#
static strip_enum(default_value)[source]#
class steamship.invocable.Invocable(client: Steamship = None, config: Dict[str, Any] = None, context: InvocationContext = None)[source]#

Bases: ABC

A Steamship microservice.

This model.py class:

  1. Provide a pre-authenticated instance of the Steamship client

  2. Provides a Lambda handler that routes to registered functions

  3. Provides useful methods connecting functions to the router.

add_api_route(method_spec: MethodSpec, permit_overwrite_of_existing: bool = False)[source]#

Add an API route to this Invocable instance.

config: Config#
classmethod config_cls() Type[Config][source]#

Returns the configuration object for the Invocable.

By default, Steamship packages and plugins will not take any configuration. Steamship packages and plugins may declare a configuration object which extends from Config, if needed, as follows:

class MyPackageOrPlugin:
class MyConfig(Config):

@classmethod def config_cls(cls):

return MyPackageOrPlugin.MyConfig

context: InvocationContext#
classmethod get_config_parameters() Dict[str, ConfigParameter][source]#
instance_init()[source]#

The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this does nothing.

invocable_instance_init() InvocableResponse[source]#
class steamship.invocable.InvocableRequest(*, clientConfig: Configuration = None, invocation: Invocation = None, loggingConfig: LoggingConfig = None, invocationContext: InvocationContext = None)[source]#

Bases: CamelModel

A request as the Steamship Hosting Framework receives it from the Engine.

This class is different from the other Request class:
  • steamship.base.request represents a request from the Steamship Client

  • this class represents a request from the Steamship Engine to a Steamship-hosted App/Plugin

It contains both a package/plugin invocation and also the client configuration in which that invocation is intended to execute.

client_config: Configuration#
invocation: Invocation#
invocation_context: InvocationContext#
logging_config: LoggingConfig#
class steamship.invocable.InvocableResponse(status: Task = None, error: SteamshipError = None, http: Http = None, data: Any = None, string: str = None, json: Any = None, _bytes: bytes | io.BytesIO = None, mime_type=None)[source]#

Bases: GenericModel, Generic[T]

Mirrors the Response object in the Steamship server.

data: T#
static error(code: int, message: str | None = None, error: SteamshipError | None = None, exception: Exception | None = None, prefix: str | None = None) InvocableResponse[source]#

Merges a number of error channels into one unified Response object.

Aggregates all possible messages into a single “ | “-delimeted error message.

If the final resulting error message is non-null, prefixes with the provided prefix

static from_obj(obj: Any) InvocableResponse[source]#
http: Http#
post_update(client: Client)[source]#

Pushes this response object to the corresponding Task on the Steamship Engine.

Typically apps and plugins return their results to the Engine synchronously via HTTP. But sometimes that’s not practice – for example:

  • Microsoft’s OCR endpoint returns a Job Token that can be exchanged for updates, and eventually a result.

  • Google’s AutoML can take 20-30 minutes to train.

  • Fine-tuning BERT on ECS can take an arbitrarily long amount of time.

In these cases, it can be useful for the package/plugin to occasionally post updates to the Engine outside of the Engine’s initial synchronous request-response conversation.

set_data(data: Any = None, string: str = None, json: Any = None, _bytes: bytes | BytesIO = None, mime_type=None)[source]#
status: Task#
class steamship.invocable.Invocation(*, httpVerb: str = None, invocationPath: str = None, arguments: Dict[str, Any] = None, config: Dict[str, Any] = None)[source]#

Bases: CamelModel

arguments: Dict[str, Any]#
config: Dict[str, Any]#
http_verb: str#
invocation_path: str#
class steamship.invocable.InvocationContext(*, tenantId: str = None, userId: str = None, workspaceId: str = None, invocableHandle: str = None, invocableVersionHandle: str = None, invocableInstanceHandle: str = None, invocableType: str = None, invocableOwnerId: str = None, invocableURL: str = None, invocableOwnerHandle: str | None = None, workspaceHandle: str | None = None, headers: Dict[str, str] = None)[source]#

Bases: CamelModel

headers: Dict[str, str]#
invocable_handle: str#
invocable_instance_handle: str#
invocable_owner_handle: str | None#
invocable_owner_id: str#
invocable_type: str#
invocable_url: str#
invocable_version_handle: str#
tenant_id: str#
user_id: str#
workspace_handle: str | None#
workspace_id: str#
class steamship.invocable.LoggingConfig(*, loggingHost: str = None, loggingPort: str = None)[source]#

Bases: CamelModel

logging_host: str#
logging_port: str#
class steamship.invocable.PackageService(*args, **kwargs)[source]#

Bases: Invocable

The Abstract Base Class of a Steamship Package.

Packages may implement whatever methods they like. To expose these methods as invocable HTTP routes, annotate the method with @get or @post and the route name.

Package implementations are effectively stateless, though they will have stateful

USED_MIXIN_CLASSES: List[Type[PackageMixin]] = []#
add_mixin(mixin: PackageMixin, permit_overwrite_of_existing_methods: bool = False)[source]#
config: Config#
context: InvocationContext#
instance_init()[source]#

The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this calls instance_init on mixins, in order.

invoke_later(method: str, verb: Verb = Verb.POST, wait_on_tasks: List[Task] = None, arguments: Dict[str, Any] = None, delay_ms: int | None = None) Task[Any][source]#

Schedule a method for future invocation.

Parameters:
  • method (str) – The method to invoke, as registered with Steamship in the @get or @post decorator.

  • verb (Verb) – The HTTP Verb to use. Default is POST.

  • wait_on_tasks (List[Task]) – A list of Task objects (or task IDs) that should be waited upon before invocation.

  • arguments (Dict[str, Any]) – The keyword arguments of the invoked method

  • delay_ms (Optional[int]) – A delay, in milliseconds, before the invocation should execute.

Returns:

A Task representing the future work

Return type:

Task[Any]

mixins: List[PackageMixin]#
classmethod scan_mixin(package_spec: PackageSpec, mixin_class: Type[PackageMixin], mixin_instance: PackageMixin | None = None, permit_overwrite_of_existing_methods: bool = False)[source]#
steamship.invocable.create_handler(invocable_cls: Type[Invocable])[source]#

Deprecated wrapper function for a Steamship invocable within an AWS Lambda function. Called by code within a plugin or package.

class steamship.invocable.fileurl[source]#

Bases: str

Type alias that, if used in a package method argument, will cause a file upload widget to appear.

steamship.invocable.get(path: str, **kwargs)[source]#
class steamship.invocable.longstr[source]#

Bases: str

Long string functions mostly as a type annotation for the web.

steamship.invocable.post(path: str, **kwargs)[source]#
steamship.invocable.safe_handler(event, context=None)#