Source code for steamship.data.plugin.plugin_instance

from __future__ import annotations

import time
from typing import Any, Dict, List, Optional, Type, Union

from pydantic import BaseModel, Field

from steamship import SteamshipError
from steamship.base import Task
from steamship.base.client import Client
from steamship.base.model import CamelModel
from steamship.base.request import DeleteRequest, IdentifierRequest, Request
from steamship.data.block import Block
from steamship.data.file import File
from steamship.data.invocable_init_status import InvocableInitStatus
from steamship.data.operations.generator import GenerateRequest, GenerateResponse
from steamship.data.operations.tagger import TagRequest, TagResponse
from steamship.data.plugin import (
    HostingCpu,
    HostingEnvironment,
    HostingMemory,
    HostingTimeout,
    HostingType,
)
from steamship.data.tags.tag import Tag
from steamship.plugin.inputs.export_plugin_input import ExportPluginInput
from steamship.plugin.inputs.training_parameter_plugin_input import TrainingParameterPluginInput
from steamship.plugin.outputs.train_plugin_output import TrainPluginOutput
from steamship.plugin.outputs.training_parameter_plugin_output import TrainingParameterPluginOutput


[docs] class CreatePluginInstanceRequest(Request): id: str = None plugin_id: str = None plugin_handle: str = None plugin_version_id: str = None plugin_version_handle: str = None handle: str = None fetch_if_exists: bool = None config: Dict[str, Any] = None
SIGNED_URL_EXPORTER_INSTANCE_HANDLE = "signed-url-exporter-1.0"
[docs] class PluginInstance(CamelModel): client: Client = Field(None, exclude=True) id: str = None handle: str = None plugin_id: str = None plugin_version_id: str = None plugin_handle: Optional[str] = None plugin_version_handle: Optional[str] = None workspace_id: Optional[str] = None user_id: str = None config: Dict[str, Any] = None hosting_type: Optional[HostingType] = None hosting_cpu: Optional[HostingCpu] = None hosting_memory: Optional[HostingMemory] = None hosting_timeout: Optional[HostingTimeout] = None hosting_environment: Optional[HostingEnvironment] = None init_status: Optional[InvocableInitStatus] = None
[docs] @classmethod def parse_obj(cls: Type[BaseModel], obj: Any) -> BaseModel: # TODO (enias): This needs to be solved at the engine side obj = obj["pluginInstance"] if "pluginInstance" in obj else obj return super().parse_obj(obj)
[docs] @staticmethod def create( client: Client, plugin_id: str = None, plugin_handle: str = None, plugin_version_id: str = None, plugin_version_handle: str = None, handle: str = None, fetch_if_exists: bool = True, config: Dict[str, Any] = None, ) -> PluginInstance: """Create a plugin instance When handle is empty the engine will automatically assign one fetch_if_exists controls whether we want to re-use an existing plugin instance or not.""" req = CreatePluginInstanceRequest( handle=handle, plugin_id=plugin_id, plugin_handle=plugin_handle, plugin_version_id=plugin_version_id, plugin_version_handle=plugin_version_handle, fetch_if_exists=fetch_if_exists, config=config, ) return client.post("plugin/instance/create", payload=req, expect=PluginInstance)
[docs] @staticmethod def get(client: Client, handle: str) -> PluginInstance: return client.post( "plugin/instance/get", IdentifierRequest(handle=handle), expect=PluginInstance )
[docs] def tag( self, doc: Union[str, File], ) -> Task[ TagResponse ]: # TODO (enias): Should we remove this helper function in favor of always working with files? req = TagRequest( type="inline", file=File(blocks=[Block(text=doc)]) if isinstance(doc, str) else doc, plugin_instance=self.handle, ) return self.client.post( "plugin/instance/tag", req, expect=TagResponse, )
[docs] def generate( self, input_file_id: str = None, input_file_start_block_index: int = None, input_file_end_block_index: Optional[int] = None, input_file_block_index_list: Optional[List[int]] = None, text: Optional[str] = None, # bytes: Optional[bytes] = None, [Not yet implemented] block_query: Optional[str] = None, # url: Optional[str] = None, [Not yet implemented] append_output_to_file: bool = False, output_file_id: Optional[str] = None, make_output_public: Optional[bool] = None, options: Optional[dict] = None, streaming: Optional[bool] = None, tags: Optional[List[Tag]] = None, ) -> Task[GenerateResponse]: """See GenerateRequest for description of parameter options""" req = GenerateRequest( plugin_instance=self.handle, input_file_id=input_file_id, input_file_start_block_index=input_file_start_block_index, input_file_end_block_index=input_file_end_block_index, input_file_block_index_list=input_file_block_index_list, text=text, # bytes=bytes, block_query=block_query, # url=url, append_output_to_file=append_output_to_file, output_file_id=output_file_id, make_output_public=make_output_public, options=options, streaming=streaming, tags=tags, ) return self.client.post("plugin/instance/generate", req, expect=GenerateResponse)
[docs] def delete(self) -> PluginInstance: req = DeleteRequest(id=self.id) return self.client.post("plugin/instance/delete", payload=req, expect=PluginInstance)
[docs] def train( self, training_request: TrainingParameterPluginInput = None, training_epochs: Optional[int] = None, export_query: Optional[str] = None, testing_holdout_percent: Optional[float] = None, test_split_seed: Optional[int] = None, training_params: Optional[Dict] = None, inference_params: Optional[Dict] = None, ) -> Task[TrainPluginOutput]: """Train a plugin instance. Please provide either training_request OR the other parameters; passing training_request ignores all other parameters, but is kept for backwards compatibility. """ input_params = training_request or TrainingParameterPluginInput( plugin_instance=self.handle, training_epochs=training_epochs, export_plugin_input=ExportPluginInput( plugin_instance=SIGNED_URL_EXPORTER_INSTANCE_HANDLE, type="file", query=export_query ), testing_holdout_percent=testing_holdout_percent, test_split_seed=test_split_seed, training_params=training_params, inference_params=inference_params, ) return self.client.post( "plugin/instance/train", payload=input_params, expect=TrainPluginOutput, )
[docs] def get_training_parameters( self, training_request: TrainingParameterPluginInput ) -> TrainingParameterPluginOutput: return self.client.post( "plugin/instance/getTrainingParameters", payload=training_request, expect=TrainingParameterPluginOutput, )
[docs] def refresh_init_status(self): new_self = PluginInstance.get(self.client, handle=self.handle) self.init_status = new_self.init_status
[docs] def wait_for_init( self, max_timeout_s: float = 180, retry_delay_s: float = 1, ): """Polls and blocks until the init has succeeded or failed (or timeout reached). Parameters ---------- max_timeout_s : int Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown. retry_delay_s : float Delay between status checks. Default: 1s. """ t0 = time.perf_counter() refresh_count = 0 while ( time.perf_counter() - t0 < max_timeout_s and self.init_status == InvocableInitStatus.INITIALIZING ): time.sleep(retry_delay_s) self.refresh_init_status() refresh_count += 1 # If the task did not complete within the timeout, throw an error if self.init_status == InvocableInitStatus.INITIALIZING: raise SteamshipError( message=f"Plugin Instance {self.id} did not complete within requested timeout of {max_timeout_s}s. The init is still running on the server. You can retrieve its status via PluginInstance.get() or try waiting again with wait_for_init()." )