steamship package#

Subpackages#

Module contents#

class steamship.Block(*, client: Client = None, id: str = None, fileId: str = None, text: str = None, tags: List[Tag] | None = [], index: int | None = None, mimeType: MimeTypes | None = None, publicData: bool = False, streamState: StreamState | None = None, requestId: str | None = None, url: str | None = None, contentURL: str | None = None, uploadType: BlockUploadType | None = None, uploadBytes: bytes | None = None)[source]#

Bases: CamelModel

A Block is a chunk of content within a File. It can be plain text content, image content, video content, etc. If the content is not text, the text value may be the empty string for backwards compatibility.

class ListRequest(*, fileId: str = None)[source]#

Bases: Request

file_id: str#
class ListResponse(*, blocks: List[Block] = [])[source]#

Bases: Response

blocks: List[Block]#
abort_stream()[source]#
append_stream(bytes: bytes)[source]#
as_llm_input(exclude_block_wrapper: bool | None = False) str[source]#
property chat_id: str#
property chat_role: RoleTag | None#
client: Client#
content_url: str | None#
static create(client: Client, file_id: str, text: str = None, tags: List[Tag] = None, content: str | bytes = None, url: str | None = None, mime_type: MimeTypes | None = None, public_data: bool = False, streaming: bool | None = None) Block[source]#

Create a new Block within a File specified by file_id.

You can create a Block in several ways: - Providing raw text as the text parameter; - Providing the content of the block as string or bytes; - Providing a publicly accessible URL where the content is stored.

delete() Block[source]#
file_id: str#
finish_stream()[source]#
static get(client: Client, _id: str = None) Block[source]#
id: str#
index(embedding_plugin_instance: Any = None)[source]#

Index this block.

index_in_file: int | None#
is_audio()[source]#

Return whether this is an audio Block.

is_image()[source]#

Return whether this is an image Block.

is_steamship_internal()[source]#
is_text() bool[source]#

Return whether this is a text Block.

is_video()[source]#

Return whether this is a video Block.

property message_id: str#
mime_type: MimeTypes | None#
classmethod parse_obj(obj: Any) BaseModel[source]#
public_data: bool#
static query(client: Client, tag_filter_query: str) BlockQueryResponse[source]#
raw()[source]#
property raw_data_url: str | None#

Return a URL at which the data content of this Block can be accessed. If public_data is True, this content can be accessed without an API key.

request_id: str | None#

The Steamship request ID associated with the creation of this block.

set_chat_id(chat_id: str)[source]#
set_chat_role(role: RoleTag)[source]#
set_message_id(message_id: str)[source]#
set_public_data(public_data: bool)[source]#

Set the public_data flag on this Block. If this object already exists server-side, update the flag.

set_request_id(request_id: str | None)[source]#
set_thread_id(thread_id: str) None[source]#
set_user_id(user_id: str) None[source]#
stream_state: StreamState | None#
tags: List[Tag] | None#
text: str#
property thread_id: str | None#
to_public_url() str[source]#

Return a public URL to access the Block’s data.

upload_bytes: bytes | None#
upload_type: BlockUploadType | None#
url: str | None#
property user_id: str | None#
class steamship.Configuration(config_file: Path | None = None, *, apiKey: SecretStr, apiBase: AnyHttpUrl = 'https://api.steamship.com/api/v1/', appBase: AnyHttpUrl = 'https://steamship.run/', webBase: AnyHttpUrl = 'https://steamship.com/', workspaceId: str = None, workspaceHandle: str = None, profile: str | None = None, requestId: str | None = None)[source]#

Bases: CamelModel

api_base: AnyHttpUrl#
api_key: SecretStr#
app_base: AnyHttpUrl#
static default_config_file_has_api_key() bool[source]#
profile: str | None#
static remove_api_key_from_default_config()[source]#
request_id: str | None#
web_base: AnyHttpUrl#
workspace_handle: str#
workspace_id: str#
class steamship.DocTag(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

A set of name constants for Tags with a kind of TagKind.doc; appropriate for HTML and Markdown ideas.

ARTICLE = 'article'#
BLOCK_CODE = 'block-code'#
BLOCK_QUOTE = 'block-quote'#
CAPTION = 'caption'#
CHAPTER = 'chapter'#
CHAT = 'chat'#
DIV = 'div'#
DOCUMENT = 'document'#
EMPHASIZED = 'emphasized'#
FOOTER = 'footer'#
H1 = 'h1'#
H2 = 'h2'#
H3 = 'h3'#
H4 = 'h4'#
H5 = 'h5'#
HEADER = 'header'#
IMAGE = 'image'#
LINE = 'line'#
LIST_ITEM = 'list-item'#
MAIN = 'main'#
METADATA = 'metadata'#
ORDERED_LIST = 'ordered-list'#
PAGE = 'page'#
PARAGRAPH = 'paragraph'#
PRE = 'pre'#
REGION = 'region'#
SENTENCE = 'sentence'#
SOURCE = 'source'#
SPAN = 'span'#
STRONG = 'strong'#
SUBTITLE = 'subtitle'#
TELETYPE = 'teletype'#
TEXT = 'text'#
TITLE = 'title'#
TOKEN = 'token'#
UNDERLINED = 'underlined'#
UNKNOWN = 'unknown'#
UNORDERED_LIST = 'unordered-list'#
static from_html_tag(tagname: str | None) DocTag | None[source]#
class steamship.EmbeddingIndex(*, client: Client = None, id: str = None, handle: str = None, name: str = None, plugin: str = None, externalId: str = None, externalType: str = None, metadata: str = None)[source]#

Bases: CamelModel

A persistent, read-optimized index over embeddings.

client: Client#
static create(client: Client, handle: str = None, name: str = None, embedder_plugin_instance_handle: str = None, fetch_if_exists: bool = True, external_id: str = None, external_type: str = None, metadata: Any = None) EmbeddingIndex[source]#
delete() EmbeddingIndex[source]#
embed() Task[IndexEmbedResponse][source]#
external_id: str#
external_type: str#
handle: str#
id: str#
insert(value: str, external_id: str = None, external_type: str = None, metadata: int | float | bool | str | List | Dict = None, reindex: bool = True, allow_long_records=False) IndexInsertResponse[source]#
insert_file(file_id: str, block_type: str = None, external_id: str = None, external_type: str = None, metadata: int | float | bool | str | List | Dict = None, reindex: bool = True) IndexInsertResponse[source]#
insert_many(items: List[EmbeddedItem | str], reindex: bool = True, allow_long_records=False) IndexInsertResponse[source]#
list_items(file_id: str = None, block_id: str = None, span_id: str = None, page_size: int | None = None, page_token: str | None = None, sort_order: SortOrder | None = SortOrder.DESC) ListItemsResponse[source]#
metadata: str#
name: str#
classmethod parse_obj(obj: Any) BaseModel[source]#
plugin: str#
search(query: str | List[str], k: int = 1, include_metadata: bool = False) Task[QueryResults][source]#
class steamship.File(*, client: Client = None, id: str = None, handle: str = None, mimeType: MimeTypes = None, workspaceId: str = None, blocks: List[Block] = [], tags: List[Tag] = [], filename: str = None, publicData: bool = False)[source]#

Bases: CamelModel

A file.

class CreateResponse(data: Any = None, string: str = None, _bytes: bytes | io.BytesIO = None, json: io.BytesIO = None, mime_type: str = None)[source]#

Bases: Response

data_: Any#
mime_type: str#
classmethod parse_obj(obj: Any) Response[source]#
append_block(text: str = None, tags: List[Tag] = None, content: str | bytes = None, url: str | None = None, mime_type: MimeTypes | None = None, public_data: bool = False) Block[source]#

Append a new block to this File. This is a convenience wrapper around Block.create(). You should provide only one of text, content, or url.

This is a server-side operation, saving the new Block to the file. The new block is appended to this client-side File as well for convenience.

blockify(plugin_instance: str = None, wait_on_tasks: List[Task] = None) Task[source]#
blocks: List[Block]#
client: Client#
static create(client: Client, content: str | bytes = None, mime_type: MimeTypes = None, handle: str = None, blocks: List[Block] = None, tags: List[Tag] = None, public_data: bool = False) File[source]#
static create_with_plugin(client: Client, plugin_instance: str, url: str = None, mime_type: str = None) Task[File][source]#
delete() File[source]#
filename: str#
static from_local(client: Client, file_path: str, mime_type: MimeTypes = None, handle: str = None, tags: List[Tag] = None, public_data: bool = False) Any[source]#

Loads a local file into a Steamship File.

NOTE: the file_path should be relative to where the call to from_local is happening.

Loaded files will automatically be tagged with a provenance tag.

Parameters:
  • client – Steamship client for the workspace

  • file_path – Location of the file to upload relative to the current directory of the client

  • mime_type – Optional specification of a particular mime type. If not provided, a guess will be made.

  • handle – Intended handle (for lookups, etc.) for Steamship File

  • tags – Metadata to add to the Steamship File

  • public_data – Whether to make the Steamship File publicly-accessible

generate(plugin_instance_handle: str, start_block_index: int = None, end_block_index: int | None = None, block_index_list: List[int] | None = None, append_output_to_file: bool = True, options: dict | None = None, wait_on_tasks: List[Task] = None, make_output_public: bool = False, streaming: bool | None = False) Task[GenerateResponse][source]#

Generate new content from this file. Assumes this file as context for input and output. May specify start and end blocks.

static get(client: Client, _id: str = None, handle: str = None) File[source]#
handle: str#
id: str#
import_with_plugin(plugin_instance: str, url: str = None, mime_type: str = None) Task[File][source]#

Run an import operation on an (empty) file object that has already been created.

index(plugin_instance: Any = None) EmbeddingIndex[source]#

Index every block in the file.

TODO(ted): Enable indexing the results of a tag query. TODO(ted): It’s hard to load the EmbeddingIndexPluginInstance with just a handle because of the chain of things that need to be created to it to function.

static list(client: Client, page_size: int | None = None, page_token: str | None = None, sort_order: SortOrder | None = SortOrder.DESC) ListFileResponse[source]#
mime_type: MimeTypes#
classmethod parse_obj(obj: Any) BaseModel[source]#
public_data: bool#
static query(client: Client, tag_filter_query: str) FileQueryResponse[source]#
raw()[source]#
property raw_data_url: str | None#

Return a URL at which the data content of this File can be accessed. If public_data is True, this content can be accessed without an API key.

refresh() File[source]#
set_public_data(public_data: bool)[source]#

Set the public_data flag on this File. If this object already exists server-side, update the flag.

tag(plugin_instance: str = None, wait_on_tasks: List[Task] = None) Task[TagResponse][source]#
tags: List[Tag]#
workspace_id: str#
class steamship.MimeTypes(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

BINARY = 'application/octet-stream'#
DOC = 'application/msword'#
DOCX = 'application/vnd.openxmlformats-officedocument.wordprocessingml.document'#
EPUB = 'application/epub+zip'#
FILE_JSON = 'fileJson'#
GIF = 'image/gif'#
HTML = 'text/html'#
JPG = 'image/jpeg'#
JSON = 'application/json'#
MKD = 'text/markdown'#
MP3 = 'audio/mp3'#
MP4_AUDIO = 'audio/mp4'#
MP4_VIDEO = 'video/mp4'#
OGG_AUDIO = 'audio/ogg'#
OGG_VIDEO = 'video/ogg'#
PDF = 'application/pdf'#
PNG = 'image/png'#
PPT = 'applicatino/ms-powerpoint'#
PPTX = 'application/vnd.openxmlformats-officedocument.presentationml.presentation'#
RTF = 'application/rtf'#
STEAMSHIP_BLOCK_JSON = 'application/vnd.steamship-block.json.v1'#
STEAMSHIP_PLUGIN_CAPABILITIES_REQUEST = 'application/vnd.steamship-block.plugin-capabilities-request+json'#
STEAMSHIP_PLUGIN_CAPABILITIES_RESPONSE = 'application/vnd.steamship-block.plugin-capabilities-response+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_INVOCATION = 'application/vnd.steamship-block.function-calling-support-invocation+json'#
STEAMSHIP_PLUGIN_FUNCTION_CALL_RESULT = 'application/vnd.steamship-block.function-calling-support-result+json'#
TIFF = 'image/tiff'#
TXT = 'text/plain'#
UNKNOWN = 'unknown'#
WAV = 'audio/wav'#
WEBM_AUDIO = 'audio/webm'#
WEBM_VIDEO = 'video/webm'#
classmethod has_value(value)[source]#
classmethod is_binary(value)[source]#

Returns whether the mime type is likely a binary file.

class steamship.Package(*, client: Client = None, id: str = None, handle: str = None, userId: str = None, profile: Manifest | None = None, description: str | None = None, readme: str | None = None, isPublic: bool = False)[source]#

Bases: CamelModel

client: Client#
static create(client: Client, handle: str = None, profile: Manifest = None, is_public=False, fetch_if_exists=False) Package[source]#
delete() Package[source]#

Delete this package. If this package is public and another user has created an instance of it, this method will throw an error and the package will not be deleted. Deleting the package will delete any versions and instances of it that you have created.

description: str | None#
static get(client: Client, handle: str) Package[source]#
handle: str#
id: str#
is_public: bool#
classmethod parse_obj(obj: Any) BaseModel[source]#
profile: Manifest | None#
readme: str | None#
update(client: Client) Package[source]#
user_id: str#
class steamship.PackageInstance(*, client: Client = None, id: str = None, handle: str = None, packageId: str = None, packageHandle: str | None = None, userHandle: str = None, packageVersionId: str = None, packageVersionHandle: str | None = None, userId: str = None, invocationURL: str = None, config: Dict[str, Any] = None, workspaceId: str = None, workspaceHandle: str = None, initStatus: InvocableInitStatus | None = None, createdAt: datetime | None = None)[source]#

Bases: CamelModel

blocks_from_invoke(path: str, verb: Verb = Verb.POST, timeout_s: float | None = None, **kwargs) List[Block][source]#
client: Client#
config: Dict[str, Any]#
static create(client: Client, package_id: str = None, package_handle: str = None, package_version_id: str = None, package_version_handle: str = None, handle: str = None, fetch_if_exists: bool = None, config: Dict[str, Any] = None) PackageInstance[source]#
static create_local_development_instance(client: Client, local_development_url: str, package_id: str = None, package_handle: str = None, handle: str = None, fetch_if_exists: bool = True, config: Dict[str, Any] = None) PackageInstance[source]#
created_at: datetime | None#
delete() PackageInstance[source]#
full_url_for(path: str)[source]#
static get(client: Client, handle: str) PackageInstance[source]#
handle: str#
id: str#
init_status: InvocableInitStatus | None#
invocation_url: str#
invoke(path: str, verb: Verb = Verb.POST, timeout_s: float | None = None, **kwargs)[source]#
static list(client: Client, package_id: str | None = None, package_version_id: str | None = None, include_workspace: bool | None = None, across_workspaces: bool | None = None, page_size: int | None = None, page_token: str | None = None, sort_order: SortOrder | None = SortOrder.DESC) ListPackageInstancesResponse[source]#
load_missing_workspace_handle()[source]#
package_handle: str | None#
package_id: str#
package_version_handle: str | None#
package_version_id: str#
classmethod parse_obj(obj: Any) BaseModel[source]#
refresh_init_status()[source]#
task_from_invoke(path: str, verb: Verb = Verb.POST, timeout_s: float | None = None, **kwargs) Task[source]#
user_handle: str#
user_id: str#
wait_for_init(max_timeout_s: float = 180, retry_delay_s: float = 1)[source]#

Polls and blocks until the init has succeeded or failed (or timeout reached).

Parameters:
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

workspace_handle: str#
workspace_id: str#
class steamship.PackageVersion(*, client: Client = None, id: str = None, packageId: str = None, handle: str = None, configTemplate: Dict[str, Any] = None)[source]#

Bases: CamelModel

client: Client#
config_template: Dict[str, Any]#
static create(client: Client, package_id: str | None = None, handle: str | None = None, filename: str | None = None, filebytes: bytes | None = None, config_template: Dict[str, Any] | None = None, hosting_handler: str | None = None) PackageVersion[source]#
delete() PackageVersion[source]#

Delete this package version. If this package is public and another user has created an instance of this version, this method will throw an error and the PackageVersion will not be deleted. Deleting the PackageVersion will delete any instances of it that you have created.

handle: str#
id: str#
package_id: str#
classmethod parse_obj(obj: Any) BaseModel[source]#
class steamship.PluginInstance(*, client: Client = None, id: str = None, handle: str = None, pluginId: str = None, pluginVersionId: str = None, pluginHandle: str | None = None, pluginVersionHandle: str | None = None, workspaceId: str | None = None, userId: str = None, config: Dict[str, Any] = None, hostingType: HostingType | None = None, hostingCpu: HostingCpu | None = None, hostingMemory: HostingMemory | None = None, hostingTimeout: HostingTimeout | None = None, hostingEnvironment: HostingEnvironment | None = None, initStatus: InvocableInitStatus | None = None)[source]#

Bases: CamelModel

client: Client#
config: Dict[str, Any]#
static create(client: Client, plugin_id: str = None, plugin_handle: str = None, plugin_version_id: str = None, plugin_version_handle: str = None, handle: str = None, fetch_if_exists: bool = True, config: Dict[str, Any] = None) PluginInstance[source]#

Create a plugin instance

When handle is empty the engine will automatically assign one fetch_if_exists controls whether we want to re-use an existing plugin instance or not.

delete() PluginInstance[source]#
generate(input_file_id: str = None, input_file_start_block_index: int = None, input_file_end_block_index: int | None = None, input_file_block_index_list: List[int] | None = None, text: str | None = None, block_query: str | None = None, append_output_to_file: bool = False, output_file_id: str | None = None, make_output_public: bool | None = None, options: dict | None = None, streaming: bool | None = None, tags: List[Tag] | None = None) Task[GenerateResponse][source]#

See GenerateRequest for description of parameter options

static get(client: Client, handle: str) PluginInstance[source]#
get_training_parameters(training_request: TrainingParameterPluginInput) TrainingParameterPluginOutput[source]#
handle: str#
hosting_cpu: HostingCpu | None#
hosting_environment: HostingEnvironment | None#
hosting_memory: HostingMemory | None#
hosting_timeout: HostingTimeout | None#
hosting_type: HostingType | None#
id: str#
init_status: InvocableInitStatus | None#
classmethod parse_obj(obj: Any) BaseModel[source]#
plugin_handle: str | None#
plugin_id: str#
plugin_version_handle: str | None#
plugin_version_id: str#
refresh_init_status()[source]#
tag(doc: str | File) Task[TagResponse][source]#
train(training_request: TrainingParameterPluginInput = None, training_epochs: int | None = None, export_query: str | None = None, testing_holdout_percent: float | None = None, test_split_seed: int | None = None, training_params: Dict | None = None, inference_params: Dict | None = None) Task[TrainPluginOutput][source]#

Train a plugin instance. Please provide either training_request OR the other parameters; passing training_request ignores all other parameters, but is kept for backwards compatibility.

user_id: str#
wait_for_init(max_timeout_s: float = 180, retry_delay_s: float = 1)[source]#

Polls and blocks until the init has succeeded or failed (or timeout reached).

Parameters:
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

workspace_id: str | None#
class steamship.PluginVersion(*, client: Client = None, id: str = None, pluginId: str = None, handle: str = None, hostingMemory: HostingMemory | None = None, hostingTimeout: HostingTimeout | None = None, hostingHandler: str = None, isPublic: bool = None, isDefault: bool = None, configTemplate: Dict[str, Any] = None)[source]#

Bases: CamelModel

client: Client#
config_template: Dict[str, Any]#
static create(client: Client, handle: str, plugin_id: str | None = None, filename: str | None = None, filebytes: bytes | None = None, hosting_memory: HostingMemory | None = None, hosting_timeout: HostingTimeout | None = None, hosting_handler: str | None = None, is_public: bool | None = None, is_default: bool | None = None, config_template: Dict[str, Any] | None = None, streaming: bool | None = None) PluginVersion[source]#
delete() PluginVersion[source]#

Delete this plugin version. If this plugin is public and another user has created an instance of this version, this method will throw an error and the PluginVersion will not be deleted. Deleting the PluginVersion will delete any instances of it that you have created.

handle: str#
hosting_handler: str#
hosting_memory: HostingMemory | None#
hosting_timeout: HostingTimeout | None#
id: str#
is_default: bool#
is_public: bool#
static list(client: Client, plugin_id: str = None, handle: str = None, public: bool = True) ListPluginVersionsResponse[source]#
classmethod parse_obj(obj: Any) BaseModel[source]#
plugin_id: str#
class steamship.RuntimeEnvironments(value, names=None, *, module=None, qualname=None, type=None, start=1, boundary=None)[source]#

Bases: str, Enum

LOCALHOST = 'localhost'#
REPLIT = 'replit'#
class steamship.Steamship(api_key: str = None, api_base: str = None, app_base: str = None, web_base: str = None, workspace: str = None, fail_if_workspace_exists: bool = False, profile: str = None, config_file: str = None, config: Configuration = None, trust_workspace_config: bool = False)[source]#

Bases: Client

Steamship Python Client.

get_workspace() Workspace[source]#
static temporary_workspace(**kwargs) Generator[Steamship, None, None][source]#

Create a client rooted in a temporary workspace that will be deleted after use.

static use(package_handle: str, instance_handle: str | None = None, config: Dict[str, Any] | None = None, version: str | None = None, fetch_if_exists: bool = True, workspace_handle: str | None = None, wait_for_init: bool = True, **kwargs) PackageInstance[source]#

Creates/loads an instance of package package_handle.

The instance is named instance_handle and located in the Workspace named instance_handle. If no instance_handle is provided, the default is package_handle.

For example, one may write the following to always get back the same package instance, no matter how many times you run it, scoped into its own workspace:

`python instance = Steamship.use('package-handle', 'instance-handle') `

One may also write:

`python instance = Steamship.use('package-handle') # Instance will also be named `package-handle` `

If you wish to override the usage of a workspace named instance_handle, you can provide the workspace_handle parameter.

static use_plugin(plugin_handle: str, instance_handle: str | None = None, config: Dict[str, Any] | None = None, version: str | None = None, fetch_if_exists: bool = True, workspace_handle: str | None = None, wait_for_init: bool = True, **kwargs) PluginInstance[source]#

Creates/loads an instance of plugin plugin_handle.

The instance is named instance_handle and located in the Workspace named instance_handle. If no instance_handle is provided, the default is plugin_handle.

For example, one may write the following to always get back the same plugin instance, no matter how many times you run it, scoped into its own workspace:

`python instance = Steamship.use_plugin('plugin-handle', 'instance-handle') `

One may also write:

`python instance = Steamship.use('plugin-handle') # Instance will also be named `plugin-handle` `

use_skill(skill: Skill, provider: Vendor | None = None, instance_handle: str | None = None, fetch_if_exists: bool | None = True) PluginInstance[source]#
exception steamship.SteamshipError(message: str = 'Undefined remote error', internal_message: str = None, suggestion: str = None, code: str = None, error: Exception | str = None)[source]#

Bases: Exception

code: str = None#
error: str = None#
static from_dict(d: Any) SteamshipError[source]#

Last resort if subclass doesn’t override: pass through.

internal_message: str = None#
log()[source]#
message: str = None#
suggestion: str = None#
to_dict() dict[source]#
class steamship.Tag(*, client: Client = None, id: str = None, fileId: str = None, blockId: str | None = None, kind: TagKind | str = None, name: str | None = None, value: Dict[TagValueKey | str, Any] | None = None, startIdx: int | None = None, endIdx: int | None = None, text: str | None = None)[source]#

Bases: CamelModel

class DeleteRequest(*, id: str = None, fileId: str = None, blockId: str = None)[source]#

Bases: Request

block_id: str#
file_id: str#
id: str#
class ListRequest(*, fileId: str = None, blockId: str = None)[source]#

Bases: Request

block_id: str#
file_id: str#
class ListResponse(*, tags: List[Tag] = None)[source]#

Bases: Response

tags: List[Tag]#
block_id: str | None#
client: Client#
static create(client: Client, file_id: str = None, block_id: str = None, kind: str = None, name: str = None, start_idx: int = None, end_idx: int = None, value: Dict[str, Any] = None) Tag[source]#
delete() Tag[source]#
end_idx: int | None#
file_id: str#
id: str#
index(plugin_instance: Any = None)[source]#

Index this tag.

kind: TagKind | str#
name: str | None#
classmethod parse_obj(obj: Any) BaseModel[source]#
static query(client: Client, tag_filter_query: str) TagQueryResponse[source]#
start_idx: int | None#
text: str | None#
value: Dict[TagValueKey | str, Any] | None#
class steamship.Task(*, client: Client = None, taskId: str = None, userId: str = None, workspaceId: str = None, requestId: str | None = None, expect: Type = None, input: str = None, output: T = None, state: str = None, statusMessage: str = None, statusSuggestion: str = None, statusCode: str = None, statusCreatedOn: str = None, taskType: str = None, taskExecutor: str = None, taskCreatedOn: str = None, taskLastModifiedOn: str = None, remoteStatusInput: Dict | None = None, remoteStatusOutput: Dict | None = None, remoteStatusMessage: str = None, assignedWorker: str = None, startedAt: str = None, maxRetries: int = None, retries: int = None)[source]#

Bases: GenericCamelModel, Generic[T]

Encapsulates a unit of asynchronously performed work.

add_comment(external_id: str = None, external_type: str = None, external_group: str = None, metadata: Any = None) TaskComment[source]#
as_error() SteamshipError[source]#
assigned_worker: str#
client: Client#
expect: Type#
static get(client, _id: str = None, handle: str = None) Task[source]#
input: str#
max_retries: int#
output: T#
classmethod parse_obj(obj: Any) Task[source]#
post_update(fields: Set[str] = None) Task[source]#

Updates this task in the Steamship Engine.

refresh()[source]#
remote_status_input: Dict | None#
remote_status_message: str#
remote_status_output: Dict | None#
request_id: str | None#
retries: int#
started_at: str#
state: str#
status_code: str#
status_created_on: str#
status_message: str#
status_suggestion: str#
task_created_on: str#
task_executor: str#
task_id: str#
task_last_modified_on: str#
task_type: str#
update(other: Task | None = None)[source]#

Incorporates a Task into this object.

user_id: str#
wait(max_timeout_s: float = 180, retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed (or timeout reached).

Parameters:
  • max_timeout_s (int) – Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown. A timeout of -1 is equivalent to no timeout.

  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

wait_until_completed(retry_delay_s: float = 1, on_each_refresh: Callable[[int, float, Task], None] | None = None)[source]#

Polls and blocks until the task has succeeded or failed. No timeout on waiting is applied.

Parameters:
  • retry_delay_s (float) – Delay between status checks. Default: 1s.

  • on_each_refresh (Optional[Callable[[int, float, Task], None]]) –

    Optional call back you can get after each refresh is made, including success state refreshes. The signature represents: (refresh #, total elapsed time, task)

    WARNING: Do not pass a long-running function to this variable. It will block the update polling.

workspace_id: str#
class steamship.TaskState[source]#

Bases: object

failed = 'failed'#
running = 'running'#
succeeded = 'succeeded'#
waiting = 'waiting'#
class steamship.Workspace(*, client: Client = None, id: str = None, handle: str = None)[source]#

Bases: CamelModel

class CreateRequest(*, id: str | None = None, handle: str | None = None, fetchIfExists: bool | None = None, externalId: str | None = None, externalType: str | None = None, metadata: str | None = None)[source]#

Bases: Request

external_id: str | None#
external_type: str | None#
fetch_if_exists: bool | None#
handle: str | None#
id: str | None#
metadata: str | None#
client: Client#
static create(client: Client, handle: str | None = None, external_id: str | None = None, external_type: str | None = None, metadata: Any = None, fetch_if_exists: bool = True) Workspace[source]#
create_signed_url(request: Request) Response[source]#
delete() Workspace[source]#
static get(client: Client, id_: str = None, handle: str = None, fetch_if_exists: bool = None) Workspace[source]#
handle: str#
id: str#
static list(client: Client, t: str = None, page_size: int | None = None, page_token: str | None = None, sort_order: SortOrder | None = SortOrder.DESC) ListWorkspacesResponse[source]#
classmethod parse_obj(obj: Any) BaseModel[source]#
steamship.check_environment(env: RuntimeEnvironments, interactively_set_key: bool = True)[source]#