from __future__ import annotations
import time
from datetime import datetime
from typing import Any, Dict, List, Optional, Type
from pydantic import BaseModel, Field
from steamship import SteamshipError, Task
from steamship.base.client import Client
from steamship.base.model import CamelModel
from steamship.base.request import DeleteRequest, IdentifierRequest, ListRequest, Request, SortOrder
from steamship.base.response import ListResponse
from steamship.data.block import Block
from steamship.data.invocable_init_status import InvocableInitStatus
from steamship.data.workspace import Workspace
from steamship.utils.url import Verb
LOCAL_DEVELOPMENT_VERSION_HANDLE = (
"local-development!" # Special handle for a locally-running development instances.
)
[docs]
class ListPackageInstancesRequest(ListRequest):
package_id: Optional[str] = None
package_version_id: Optional[str] = None
include_workspace: Optional[bool] = None
across_workspaces: Optional[bool] = None
[docs]
class ListPackageInstancesResponse(ListResponse):
package_instances: List[PackageInstance]
[docs]
class CreatePackageInstanceRequest(Request):
id: str = None
package_id: str = None
package_handle: str = None
package_version_id: str = None
package_version_handle: str = None
handle: str = None
fetch_if_exists: bool = None
config: Dict[str, Any] = None
workspace_id: str = None
local_development_url: Optional[str] = None
"""Special argument only intended for creating an PackageInstance bound to a local development server.
If used, the package_version_handle should be set to LOCAL_DEVELOPMENT_VERSION_HANDLE above.
"""
[docs]
class PackageInstance(CamelModel):
client: Client = Field(None, exclude=True)
id: str = None
handle: str = None
package_id: str = None
package_handle: Optional[str] = None
user_handle: str = None
package_version_id: str = None
package_version_handle: Optional[str] = None
user_id: str = None
invocation_url: str = None
config: Dict[str, Any] = None
workspace_id: str = None
workspace_handle: str = None
init_status: Optional[InvocableInitStatus] = None
created_at: Optional[datetime] = None
[docs]
@classmethod
def parse_obj(cls: Type[BaseModel], obj: Any) -> BaseModel:
# TODO (enias): This needs to be solved at the engine side
obj = obj["packageInstance"] if "packageInstance" in obj else obj
return super().parse_obj(obj)
[docs]
@staticmethod
def create(
client: Client,
package_id: str = None,
package_handle: str = None,
package_version_id: str = None,
package_version_handle: str = None,
handle: str = None,
fetch_if_exists: bool = None,
config: Dict[str, Any] = None,
) -> PackageInstance:
req = CreatePackageInstanceRequest(
handle=handle,
package_id=package_id,
package_handle=package_handle,
package_version_id=package_version_id,
package_version_handle=package_version_handle,
fetch_if_exists=fetch_if_exists,
config=config,
)
return client.post("package/instance/create", payload=req, expect=PackageInstance)
[docs]
@staticmethod
def create_local_development_instance(
client: Client,
local_development_url: str,
package_id: str = None,
package_handle: str = None,
handle: str = None,
fetch_if_exists: bool = True,
config: Dict[str, Any] = None,
) -> PackageInstance:
req = CreatePackageInstanceRequest(
handle=handle,
package_id=package_id,
package_handle=package_handle,
package_version_handle=LOCAL_DEVELOPMENT_VERSION_HANDLE,
fetch_if_exists=fetch_if_exists,
config=config,
local_development_url=local_development_url,
)
"""Create a PackageInstance bound to a local development server."""
return client.post("package/instance/create", payload=req, expect=PackageInstance)
[docs]
def delete(self) -> PackageInstance:
req = DeleteRequest(id=self.id)
return self.client.post("package/instance/delete", payload=req, expect=PackageInstance)
[docs]
def load_missing_workspace_handle(self):
if (
self.client is not None
and self.workspace_handle is None
and self.workspace_id is not None
):
# Get the workspaceHandle
workspace = Workspace.get(self.client, id_=self.workspace_id)
if workspace:
self.workspace_handle = workspace.handle
[docs]
@staticmethod
def get(client: Client, handle: str) -> PackageInstance:
return client.post(
"package/instance/get", IdentifierRequest(handle=handle), expect=PackageInstance
)
[docs]
def invoke(
self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs
):
self.load_missing_workspace_handle()
if path[0] == "/":
path = path[1:]
return self.client.call(
verb=verb,
operation=f"/{self.workspace_handle or '_'}/{self.handle or '_'}/{path}",
payload=kwargs,
is_package_call=True,
package_owner=self.user_handle,
package_id=self.package_id,
package_instance_id=self.id,
as_background_task=False,
timeout_s=timeout_s,
)
[docs]
def blocks_from_invoke(
self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs
) -> List[Block]:
potential_blocks = self.invoke(path=path, verb=verb, timeout_s=timeout_s, **kwargs)
try:
if isinstance(potential_blocks, list):
return [Block(client=self.client, **raw) for raw in potential_blocks]
else:
return [Block(client=self.client, **potential_blocks)]
except Exception as e:
raise SteamshipError(f"Could not convert to blocks: {e}")
[docs]
def task_from_invoke(
self, path: str, verb: Verb = Verb.POST, timeout_s: Optional[float] = None, **kwargs
) -> Task:
task_dict = self.invoke(path=path, verb=verb, timeout_s=timeout_s, **kwargs)
try:
return Task(client=self.client, **task_dict)
except Exception as e:
raise SteamshipError(f"Could not convert to task: {e}")
[docs]
def full_url_for(self, path: str):
return f"{self.invocation_url}{path}"
[docs]
def refresh_init_status(self):
new_self = PackageInstance.get(self.client, handle=self.handle)
self.init_status = new_self.init_status
[docs]
def wait_for_init(
self,
max_timeout_s: float = 180,
retry_delay_s: float = 1,
):
"""Polls and blocks until the init has succeeded or failed (or timeout reached).
Parameters
----------
max_timeout_s : int
Max timeout in seconds. Default: 180s. After this timeout, an exception will be thrown.
retry_delay_s : float
Delay between status checks. Default: 1s.
"""
t0 = time.perf_counter()
while (
time.perf_counter() - t0 < max_timeout_s
and self.init_status == InvocableInitStatus.INITIALIZING
):
time.sleep(retry_delay_s)
self.refresh_init_status()
# If the task did not complete within the timeout, throw an error
if self.init_status == InvocableInitStatus.INITIALIZING:
raise SteamshipError(
message=f"Package Instance {self.id} did not complete within requested timeout of {max_timeout_s}s. The init is still running on the server. You can retrieve its status via PackageInstance.get() or try waiting again with wait_for_init()."
)
[docs]
@staticmethod
def list(
client: Client,
package_id: Optional[str] = None,
package_version_id: Optional[str] = None,
include_workspace: Optional[bool] = None,
across_workspaces: Optional[bool] = None,
page_size: Optional[int] = None,
page_token: Optional[str] = None,
sort_order: Optional[SortOrder] = SortOrder.DESC,
) -> ListPackageInstancesResponse:
return client.post(
"package/instance/list",
ListPackageInstancesRequest(
package_id=package_id,
package_version_id=package_version_id,
include_workspace=include_workspace,
across_workspaces=across_workspaces,
page_size=page_size,
page_token=page_token,
sort_order=sort_order,
),
expect=ListPackageInstancesResponse,
)
ListPackageInstancesResponse.update_forward_refs()