Source code for steamship.data.package.package_instance

from __future__ import annotations

import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Type

from pydantic import BaseModel, Field

from steamship import SteamshipError, Task
from steamship.base.client import Client
from steamship.base.model import CamelModel
from steamship.base.request import DeleteRequest, IdentifierRequest, ListRequest, Request, SortOrder
from steamship.base.response import ListResponse
from steamship.data.block import Block
from steamship.data.invocable_init_status import InvocableInitStatus
from steamship.data.workspace import Workspace
from steamship.utils.url import Verb

LOCAL_DEVELOPMENT_VERSION_HANDLE = (
    "local-development!"  # Special handle for a locally-running development instances.
)


[docs] class ListPackageInstancesRequest(ListRequest): package_id: Optional[str] = None package_version_id: Optional[str] = None include_workspace: Optional[bool] = None across_workspaces: Optional[bool] = None
[docs] class ListPackageInstancesResponse(ListResponse): package_instances: List[PackageInstance]
[docs] class CreatePackageInstanceRequest(Request): id: str = None package_id: str = None package_handle: str = None package_version_id: str = None package_version_handle: str = None handle: str = None fetch_if_exists: bool = None config: Dict[str, Any] = None workspace_id: str = None local_development_url: Optional[str] = None """Special argument only intended for creating an PackageInstance bound to a local development server. If used, the package_version_handle should be set to LOCAL_DEVELOPMENT_VERSION_HANDLE above. """
[docs] class PackageInstance(CamelModel): client: Client = Field(None, exclude=True) id: str = None handle: str = None package_id: str = None package_handle: Optional[str] = None user_handle: str = None package_version_id: str = None package_version_handle: Optional[str] = None user_id: str = None invocation_url: str = None config: Dict[str, Any] = None workspace_id: str = None workspace_handle: str = None init_status: Optional[InvocableInitStatus] = None created_at: Optional[datetime] = None
[docs] @classmethod def parse_obj(cls: Type[BaseModel], obj: Any) -> BaseModel: # TODO (enias): This needs to be solved at the engine side obj = obj["packageInstance"] if "packageInstance" in obj else obj return super().parse_obj(obj)
[docs] @staticmethod def create( client: Client, package_id: str = None, package_handle: str = None, package_version_id: str = None, package_version_handle: str = None, handle: str = None, fetch_if_exists: bool = None, config: Dict[str, Any] = None, ) -> PackageInstance: req = CreatePackageInstanceRequest( handle=handle, package_id=package_id, package_handle=package_handle, package_version_id=package_version_id, package_version_handle=package_version_handle, fetch_if_exists=fetch_if_exists, config=config, ) return client.post("package/instance/create", payload=req, expect=PackageInstance)
[docs] @staticmethod def create_local_development_instance( client: Client, local_development_url: str, package_id: str = None, package_handle: str = None, handle: str = None, fetch_if_exists: bool = True, config: Dict[str, Any] = None, ) -> PackageInstance: req = CreatePackageInstanceRequest( handle=handle, package_id=package_id, package_handle=package_handle, package_version_handle=LOCAL_DEVELOPMENT_VERSION_HANDLE, fetch_if_exists=fetch_if_exists, config=config, local_development_url=local_development_url, ) """Create a PackageInstance bound to a local development server.""" return client.post("package/instance/create", payload=req, expect=PackageInstance)
[docs] def delete(self) -> PackageInstance: req = DeleteRequest(id=self.id) return self.client.post("package/instance/delete", payload=req, expect=PackageInstance)
[docs] def load_missing_workspace_handle(self): if ( self.client is not None and self.workspace_handle is None and self.workspace_id is not None ): # Get the workspaceHandle workspace = Workspace.get(self.client, id_=self.workspace_id) if workspace: self.workspace_handle = workspace.handle
[docs] @staticmethod def get(client: Client, handle: str) -> PackageInstance: return client.post( "package/instance/get", IdentifierRequest(handle=handle), expect=PackageInstance )
[docs] def invoke( self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs ): self.load_missing_workspace_handle() if path[0] == "/": path = path[1:] return self.client.call( verb=verb, operation=f"/{self.workspace_handle or '_'}/{self.handle or '_'}/{path}", payload=kwargs, is_package_call=True, package_owner=self.user_handle, package_id=self.package_id, package_instance_id=self.id, as_background_task=False, timeout_s=timeout_s, )
[docs] def blocks_from_invoke( self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs ) -> List[Block]: potential_blocks = self.invoke(path=path, verb=verb, timeout_s=timeout_s, **kwargs) try: if isinstance(potential_blocks, list): return [Block(client=self.client, **raw) for raw in potential_blocks] else: return [Block(client=self.client, **potential_blocks)] except Exception as e: raise SteamshipError(f"Could not convert to blocks: {e}")
[docs] def task_from_invoke( self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs ) -> Task: task_dict = self.invoke(path=path, verb=verb, timeout_s=timeout_s, **kwargs) try: return Task(client=self.client, **task_dict) except Exception as e: raise SteamshipError(f"Could not convert to task: {e}")
[docs] def full_url_for(self, path: str): return f"{self.invocation_url}{path}"
[docs] def refresh_init_status(self): new_self = PackageInstance.get(self.client, handle=self.handle) self.init_status = new_self.init_status
[docs] def wait_for_init( self, max_timeout_s: float = 180, retry_delay_s: float = 1, ): """Polls and blocks until the init has succeeded or failed (or timeout reached). Parameters ---------- max_timeout_s : int Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown. retry_delay_s : float Delay between status checks. Default: 1s. """ t0 = time.perf_counter() while ( time.perf_counter() - t0 < max_timeout_s and self.init_status == InvocableInitStatus.INITIALIZING ): time.sleep(retry_delay_s) self.refresh_init_status() # If the task did not complete within the timeout, throw an error if self.init_status == InvocableInitStatus.INITIALIZING: raise SteamshipError( message=f"Package Instance {self.id} did not complete within requested timeout of {max_timeout_s}s. The init is still running on the server. You can retrieve its status via PackageInstance.get() or try waiting again with wait_for_init()." )
[docs] @staticmethod def list( client: Client, package_id: Optional[str] = None, package_version_id: Optional[str] = None, include_workspace: Optional[bool] = None, across_workspaces: Optional[bool] = None, page_size: Optional[int] = None, page_token: Optional[str] = None, sort_order: Optional[SortOrder] = SortOrder.DESC, ) -> ListPackageInstancesResponse: return client.post( "package/instance/list", ListPackageInstancesRequest( package_id=package_id, package_version_id=package_version_id, include_workspace=include_workspace, across_workspaces=across_workspaces, page_size=page_size, page_token=page_token, sort_order=sort_order, ), expect=ListPackageInstancesResponse, )
ListPackageInstancesResponse.update_forward_refs()