steamship.base package#

Submodules#

steamship.base.client module#

class steamship.base.client.Client(api_key: str = None, api_base: str = None, app_base: str = None, web_base: str = None, workspace: str = None, fail_if_workspace_exists: bool = False, profile: str = None, config_file: str = None, config: Configuration = None, trust_workspace_config: bool = False)[source]#

Bases: CamelModel, ABC

Client model.py class.

Separated primarily as a hack to prevent circular imports.

call(verb: Verb, operation: str, payload: Request | dict | bytes = None, file: Any = None, expect: Type[T] = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: str = None, package_id: str = None, package_instance_id: str = None, as_background_task: bool = False, wait_on_tasks: List[str | Task] = None, timeout_s: float | None = None, task_delay_ms: int | None = None) Any | Task[source]#

Post to the Steamship API.

All responses have the format:

.. code-block:: json
{

“data”: “<actual response>”, “error”: {“reason”: “<message>”}

} # noqa: RST203

For the Python client we return the contents of the data field if present, and we raise an exception if the error field is filled in.

config: Configuration#
dict(**kwargs) dict[source]#

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

get(operation: str, payload: Request | dict = None, file: Any = None, expect: Any = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: str = None, package_id: str = None, package_instance_id: str = None, as_background_task: bool = False, wait_on_tasks: List[str | Task] = None, timeout_s: float | None = None, task_delay_ms: int | None = None) Any | Task[source]#
logs(offset: int = 0, number: int = 50, invocable_handle: str | None = None, instance_handle: str | None = None, invocable_version_handle: str | None = None, path: str | None = None, field_values: Dict[str, str] | None = None) Dict[str, Any][source]#

Return generated logs for a client.

The logs will be workspace-scoped. You will only receive logs for packages and plugins that you own.

Parameters:
  • offset – The index of the first log entry to return. This can be used with number to page through logs.

  • number – The number of log entries to return. This can be used with offset to page through logs.

  • invocable_handle – Enables optional filtering based on the handle of package or plugin. Example: my-package

  • instance_handle – Enables optional filtering based on the handle of package instance or plugin instance. Example: my-instance

  • invocable_version_handle – Enables optional filtering based on the version handle of package or plugin. Example: 0.0.2

  • path – Enables optional filtering based on request path. Example: /generate.

  • field_values – Enables optional filtering based on user-provided field values.

Returns:

Returns a dictionary containing the offset and number of log entries as well as a list of entries that match the specificed filters.

post(operation: str, payload: Request | dict | BaseModel | bytes = None, file: Any = None, expect: Any = None, debug: bool = False, raw_response: bool = False, is_package_call: bool = False, package_owner: str = None, package_id: str = None, package_instance_id: str = None, as_background_task: bool = False, wait_on_tasks: List[str | Task] = None, timeout_s: float | None = None, task_delay_ms: int | None = None) Any | Task[source]#
switch_workspace(workspace_handle: str = None, workspace_id: str = None, fail_if_workspace_exists: bool = False, trust_workspace_config: bool = False)[source]#

Switches this client to the requested workspace, possibly creating it. If all arguments are None, the client actively switches into the default workspace.

  • API calls are performed manually to not result in circular imports.

  • Note that the default workspace is technically not necessary for API usage; it will be assumed by the Engine in the absense of a Workspace ID or Handle being manually specified in request headers.

steamship.base.configuration module#

class steamship.base.configuration.Configuration(config_file: Path | None = None, *, apiKey: SecretStr, apiBase: AnyHttpUrl = 'https://api.steamship.com/api/v1/', appBase: AnyHttpUrl = 'https://steamship.run/', webBase: AnyHttpUrl = 'https://steamship.com/', workspaceId: str = None, workspaceHandle: str = None, profile: str | None = None, requestId: str | None = None)[source]#

Bases: CamelModel

api_base: AnyHttpUrl#
api_key: SecretStr#
app_base: AnyHttpUrl#
static default_config_file_has_api_key() bool[source]#
profile: str | None#
static remove_api_key_from_default_config()[source]#
request_id: str | None#
web_base: AnyHttpUrl#
workspace_handle: str#
workspace_id: str#

steamship.base.environments module#

class steamship.base.environments.RuntimeEnvironments(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

LOCALHOST = 'localhost'#
REPLIT = 'replit'#
steamship.base.environments.check_environment(env: RuntimeEnvironments, interactively_set_key: bool = True)[source]#

steamship.base.error module#

exception steamship.base.error.SteamshipError(message: str = 'Undefined remote error', internal_message: str = None, suggestion: str = None, code: str = None, error: Exception | str = None)[source]#

Bases: Exception

code: str = None#
error: str = None#
static from_dict(d: Any) SteamshipError[source]#

Last resort if subclass doesn’t override: pass through.

internal_message: str = None#
log()[source]#
message: str = None#
suggestion: str = None#
to_dict() dict[source]#

steamship.base.mime_types module#

class steamship.base.mime_types.ContentEncodings[source]#

Bases: object

BASE64 = 'base64'#
class steamship.base.mime_types.MimeTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

BINARY = 'application/octet-stream'#
DOC = 'application/msword'#
DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'#
EPUB = 'application/epub+zip'#
FILE_JSON = 'fileJson'#
GIF = 'image/gif'#
HTML = 'text/html'#
JPG = 'image/jpeg'#
JSON = 'application/json'#
MKD = 'text/markdown'#
MP3 = 'audio/mp3'#
MP4_AUDIO = 'audio/mp4'#
MP4_VIDEO = 'video/mp4'#
OGG_AUDIO = 'audio/ogg'#
OGG_VIDEO = 'video/ogg'#
PDF = 'application/pdf'#
PNG = 'image/png'#
PPT = 'applicatino/ms-powerpoint'#
PPTX = 'application/vnd.openxmlformats-officedocument.presentationml.presentation'#
RTF = 'application/rtf'#
STEAMSHIP_BLOCK_JSON = 'application/vnd.steamship-block.json.v1'#
STEAMSHIP_PLUGIN_CAPABILITIES_REQUEST = 'application/vnd.steamship-block.plugin-capabilities-request+json'#
STEAMSHIP_PLUGIN_CAPABILITIES_RESPONSE = 'application/vnd.steamship-block.plugin-capabilities-response+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_INVOCATION = 'application/vnd.steamship-block.function-calling-support-invocation+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_RESULT = 'application/vnd.steamship-block.function-calling-support-result+json'#
TIFF = 'image/tiff'#
TXT = 'text/plain'#
UNKNOWN = 'unknown'#
WAV = 'audio/wav'#
WEBM_AUDIO = 'audio/webm'#
WEBM_VIDEO = 'video/webm'#
classmethod has_value(value)[source]#
classmethod is_binary(value)[source]#

Returns whether the mime type is likely a binary file.

steamship.base.model module#

class steamship.base.model.CamelModel[source]#

Bases: BaseModel

class Config[source]#

Bases: object

alias_generator() str#
allow_population_by_field_name = True#
use_enum_values = True#
class steamship.base.model.GenericCamelModel[source]#

Bases: CamelModel, GenericModel

steamship.base.model.to_camel(s: str) str[source]#

steamship.base.package_spec module#

Objects for recording and reporting upon the introspected interface of a Steamship Package.

class steamship.base.package_spec.ArgSpec(name: str, parameter: Parameter)[source]#

Bases: CamelModel

An argument passed to a method.

kind: str#
name: str#
pprint(name_width: int | None = None, prefix: str = '') str[source]#

Returns a pretty printable representation of this argument.

values: List[str] | None#
class steamship.base.package_spec.MethodSpec(*, path: str, verb: str, returns: str, doc: str | None = None, args: List[ArgSpec] | None = None, config: Dict | None = None, funcBinding: str | Callable[[...], Any] | None = None, className: str | None = None)[source]#

Bases: CamelModel

A method, callable remotely, on an object.

args: List[ArgSpec] | None#
class_name: str | None#
static clean_path(path: str = '') str[source]#

Ensure that the path always starts with /, and at minimum must be at least /.

clone() MethodSpec[source]#
config: Dict | None#
doc: str | None#
static from_class(cls: object, name: str, path: str = None, verb: Verb = Verb.POST, config: Dict[str, str | bool | int | float] = None, func_binding: str | Callable[[...], Any] | None = None)[source]#
func_binding: str | Callable[[...], Any] | None#
get_bound_function(service_instance: Any | None) Callable[[...], Any] | None[source]#

Get the bound method described by this spec.

The func_binding, if a string, resolves to a function on the provided Invocable. Else is just a function.

is_same_route_as(other: MethodSpec) bool[source]#

Two methods are the same route if they share a path and verb.

path: str#
pprint(name_width: int | None = None, prefix: str = '  ') str[source]#

Returns a pretty printable representation of this method.

returns: str#
verb: str#
class steamship.base.package_spec.PackageSpec(*, name: str, doc: str | None = None, sdkVersion: str = 'unknown', usedMixins: List[str] | None = None, methodMappings: Dict[str, Dict[str, MethodSpec]] = None)[source]#

Bases: CamelModel

A package, representing a remotely instantiable service.

add_method(new_method: MethodSpec, permit_overwrite_of_existing: bool = False)[source]#

Add a method to the MethodSpec, overwriting the existing if it exists.

property all_methods: List[MethodSpec]#

Return a list of all methods mapped in this Package.

clone() PackageSpec[source]#

Return a copy-by-value clone of this PackageSpec.

dict(**kwargs) dict[source]#

Return the dict representation of this object.

Manually adds the methods computed field. Note that if we upgrade to Pydantic 2xx we can automatically include this via decorators.

doc: str | None#
get_method(http_verb: str, http_path: str) MethodSpec | None[source]#

Matches the provided HTTP Verb and Path to registered methods.

This is intended to be the single place where a provided (VERB, PATH) is mapped to a MethodSpec, such that if we eventually support path variables (/posts/:id/raw), it can be done within this function.

import_parent_methods(parent: PackageSpec | None = None)[source]#
method_mappings: Dict[str, Dict[str, MethodSpec]]#
name: str#
pprint(prefix: str = '  ') str[source]#

Returns a pretty printable representation of this package.

sdk_version: str#
used_mixins: List[str] | None#
exception steamship.base.package_spec.RouteConflictError(message: str, existing_method_spec: MethodSpec)[source]#

Bases: SteamshipError

existing_method_spec: MethodSpec#

steamship.base.request module#

class steamship.base.request.CreateRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.DeleteRequest(*, id: str)[source]#

Bases: Request

id: str#
class steamship.base.request.GetRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.IdentifierRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#
class steamship.base.request.ListRequest(*, pageSize: int | None = None, pageToken: str | None = None, sortOrder: SortOrder | None = SortOrder.DESC)[source]#

Bases: Request

page_size: int | None#
page_token: str | None#
sort_order: SortOrder | None#
class steamship.base.request.Request[source]#

Bases: CamelModel

class steamship.base.request.SortOrder(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

ASC = 'ASC'#
DESC = 'DESC'#
class steamship.base.request.UpdateRequest(*, id: str = None, handle: str = None)[source]#

Bases: Request

handle: str#
id: str#

steamship.base.response module#

class steamship.base.response.ListResponse(*, nextPageToken: str | None = None)[source]#

Bases: Response

next_page_token: str | None#
class steamship.base.response.Response[source]#

Bases: CamelModel

steamship.base.tasks module#

class steamship.base.tasks.CreateTaskCommentRequest(*, taskId: str, externalId: str = None, externalType: str = None, externalGroup: str = None, metadata: str = None)[source]#

Bases: Request

external_group: str#
external_id: str#
external_type: str#
metadata: str#
task_id: str#
class steamship.base.tasks.ListTaskCommentRequest(*, pageSize: int | None = None, pageToken: str | None = None, sortOrder: SortOrder | None = SortOrder.DESC, taskId: str = None, externalId: str = None, externalType: str = None, externalGroup: str = None)[source]#

Bases: ListRequest

external_group: str#
external_id: str#
external_type: str#
task_id: str#
class steamship.base.tasks.Task(*, client: Client = None, taskId: str = None, userId: str = None, workspaceId: str = None, requestId: str | None = None, expect: Type = None, input: str = None, output: T = None, state: str = None, statusMessage: str = None, statusSuggestion: str = None, statusCode: str = None, statusCreatedOn: str = None, taskType: str = None, taskExecutor: str = None, taskCreatedOn: str = None, taskLastModifiedOn: str = None, remoteStatusInput: Dict | None = None, remoteStatusOutput: Dict | None = None, remoteStatusMessage: str = None, assignedWorker: str = None, startedAt: str = None, maxRetries: int = None, retries: int = None)[source]#

Bases: GenericCamelModel, Generic[T]

Encapsulates a unit of asynchronously performed work.

add_comment(external_id: str = None, external_type: str = None, external_group: str = None, metadata: Any = None) TaskComment[source]#
as_error() SteamshipError[source]#
assigned_worker: str#
client: Client#
expect: Type#
static get(client, _id: str = None, handle: str = None) Task[source]#
input: str#
max_retries: int#
output: T#
classmethod parse_obj(obj: Any) Task[source]#
post_update(fields: Set[str] = None) Task[source]#

Updates this task in the Steamship Engine.

refresh()[source]#
remote_status_input: Dict | None#
remote_status_message: str#
remote_status_output: Dict | None#
request_id: str | None#
retries: int#
started_at: str#
state: str#
status_code: str#
status_created_on: str#
status_message: str#
status_suggestion: str#
task_created_on: str#
task_executor: str#
task_id: str#
task_last_modified_on: str#
task_type: str#
update(other: Task | None = None)[source]#

Incorporates a Task into this object.

user_id: str#
wait(max_timeout_s: float = 180, retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed (or timeout reached).

Parameters:
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown. A timeout of -1 is equivalent to no timeout.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

wait_until_completed(retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed. No timeout on waiting is applied.

Parameters:
  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

workspace_id: str#
class steamship.base.tasks.TaskComment(*, client: Client = None, id: str = None, userId: str = None, taskId: str = None, externalId: str = None, externalType: str = None, externalGroup: str = None, metadata: Any = None, createdAt: str = None)[source]#

Bases: CamelModel

client: Client#
static create(client: Client, task_id: str = None, external_id: str = None, external_type: str = None, external_group: str = None, metadata: Any = None) TaskComment[source]#
created_at: str#
delete() TaskComment[source]#
external_group: str#
external_id: str#
external_type: str#
id: str#
static list(client: Client, task_id: str = None, external_id: str = None, external_type: str = None, external_group: str = None, page_size: int | None = None, page_token: str | None = None, sort_order: SortOrder | None = SortOrder.DESC) TaskCommentList[source]#
metadata: Any#
classmethod parse_obj(obj: Any) BaseModel[source]#
task_id: str#
user_id: str#
class steamship.base.tasks.TaskCommentList(*, nextPageToken: str | None = None, comments: List[TaskComment])[source]#

Bases: ListResponse

comments: List[TaskComment]#
class steamship.base.tasks.TaskRunRequest(*, taskId: str)[source]#

Bases: Request

task_id: str#
class steamship.base.tasks.TaskState[source]#

Bases: object

failed = 'failed'#
running = 'running'#
succeeded = 'succeeded'#
waiting = 'waiting'#
class steamship.base.tasks.TaskStatusRequest(*, taskId: str)[source]#

Bases: Request

task_id: str#
class steamship.base.tasks.TaskType[source]#

Bases: object

infer = 'infer'#
internal_api = 'internalApi'#
train = 'train'#

Module contents#

class steamship.base.Configuration(config_file: Path | None = None, *, apiKey: SecretStr, apiBase: AnyHttpUrl = 'https://api.steamship.com/api/v1/', appBase: AnyHttpUrl = 'https://steamship.run/', webBase: AnyHttpUrl = 'https://steamship.com/', workspaceId: str = None, workspaceHandle: str = None, profile: str | None = None, requestId: str | None = None)[source]#

Bases: CamelModel

api_base: AnyHttpUrl#
api_key: SecretStr#
app_base: AnyHttpUrl#
static default_config_file_has_api_key() bool[source]#
profile: str | None#
static remove_api_key_from_default_config()[source]#
request_id: str | None#
web_base: AnyHttpUrl#
workspace_handle: str#
workspace_id: str#
class steamship.base.MimeTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

BINARY = 'application/octet-stream'#
DOC = 'application/msword'#
DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'#
EPUB = 'application/epub+zip'#
FILE_JSON = 'fileJson'#
GIF = 'image/gif'#
HTML = 'text/html'#
JPG = 'image/jpeg'#
JSON = 'application/json'#
MKD = 'text/markdown'#
MP3 = 'audio/mp3'#
MP4_AUDIO = 'audio/mp4'#
MP4_VIDEO = 'video/mp4'#
OGG_AUDIO = 'audio/ogg'#
OGG_VIDEO = 'video/ogg'#
PDF = 'application/pdf'#
PNG = 'image/png'#
PPT = 'applicatino/ms-powerpoint'#
PPTX = 'application/vnd.openxmlformats-officedocument.presentationml.presentation'#
RTF = 'application/rtf'#
STEAMSHIP_BLOCK_JSON = 'application/vnd.steamship-block.json.v1'#
STEAMSHIP_PLUGIN_CAPABILITIES_REQUEST = 'application/vnd.steamship-block.plugin-capabilities-request+json'#
STEAMSHIP_PLUGIN_CAPABILITIES_RESPONSE = 'application/vnd.steamship-block.plugin-capabilities-response+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_INVOCATION = 'application/vnd.steamship-block.function-calling-support-invocation+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_RESULT = 'application/vnd.steamship-block.function-calling-support-result+json'#
TIFF = 'image/tiff'#
TXT = 'text/plain'#
UNKNOWN = 'unknown'#
WAV = 'audio/wav'#
WEBM_AUDIO = 'audio/webm'#
WEBM_VIDEO = 'video/webm'#
classmethod has_value(value)[source]#
classmethod is_binary(value)[source]#

Returns whether the mime type is likely a binary file.

class steamship.base.RuntimeEnvironments(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

LOCALHOST = 'localhost'#
REPLIT = 'replit'#
exception steamship.base.SteamshipError(message: str = 'Undefined remote error', internal_message: str = None, suggestion: str = None, code: str = None, error: Exception | str = None)[source]#

Bases: Exception

code: str = None#
error: str = None#
static from_dict(d: Any) SteamshipError[source]#

Last resort if subclass doesn’t override: pass through.

internal_message: str = None#
log()[source]#
message: str = None#
suggestion: str = None#
to_dict() dict[source]#
class steamship.base.Task(*, client: Client = None, taskId: str = None, userId: str = None, workspaceId: str = None, requestId: str | None = None, expect: Type = None, input: str = None, output: T = None, state: str = None, statusMessage: str = None, statusSuggestion: str = None, statusCode: str = None, statusCreatedOn: str = None, taskType: str = None, taskExecutor: str = None, taskCreatedOn: str = None, taskLastModifiedOn: str = None, remoteStatusInput: Dict | None = None, remoteStatusOutput: Dict | None = None, remoteStatusMessage: str = None, assignedWorker: str = None, startedAt: str = None, maxRetries: int = None, retries: int = None)[source]#

Bases: GenericCamelModel, Generic[T]

Encapsulates a unit of asynchronously performed work.

add_comment(external_id: str = None, external_type: str = None, external_group: str = None, metadata: Any = None) TaskComment[source]#
as_error() SteamshipError[source]#
assigned_worker: str#
client: Client#
expect: Type#
static get(client, _id: str = None, handle: str = None) Task[source]#
input: str#
max_retries: int#
output: T#
classmethod parse_obj(obj: Any) Task[source]#
post_update(fields: Set[str] = None) Task[source]#

Updates this task in the Steamship Engine.

refresh()[source]#
remote_status_input: Dict | None#
remote_status_message: str#
remote_status_output: Dict | None#
request_id: str | None#
retries: int#
started_at: str#
state: str#
status_code: str#
status_created_on: str#
status_message: str#
status_suggestion: str#
task_created_on: str#
task_executor: str#
task_id: str#
task_last_modified_on: str#
task_type: str#
update(other: Task | None = None)[source]#

Incorporates a Task into this object.

user_id: str#
wait(max_timeout_s: float = 180, retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed (or timeout reached).

Parameters:
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown. A timeout of -1 is equivalent to no timeout.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

wait_until_completed(retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed. No timeout on waiting is applied.

Parameters:
  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

workspace_id: str#
class steamship.base.TaskState[source]#

Bases: object

failed = 'failed'#
running = 'running'#
succeeded = 'succeeded'#
waiting = 'waiting'#
steamship.base.check_environment(env: RuntimeEnvironments, interactively_set_key: bool = True)[source]#