Source code for steamship.invocable.package_service

from __future__ import annotations

import logging
from functools import partial
from typing import Any, Dict, List, Optional, Type

from steamship import SteamshipError, Task
from steamship.base.package_spec import MethodSpec, PackageSpec, RouteConflictError
from steamship.invocable import Invocable

# Note!
# =====
#
# This the files in this package are for Package Implementors.
# If you are using the Steamship Client, you probably are looking for either steamship.client or steamship.data
#
from steamship.invocable.invocable import find_route_methods
from steamship.invocable.package_mixin import PackageMixin
from steamship.utils.url import Verb


[docs] class PackageService(Invocable): """The Abstract Base Class of a Steamship Package. Packages may implement whatever methods they like. To expose these methods as invocable HTTP routes, annotate the method with @get or @post and the route name. Package *implementations* are effectively stateless, though they will have stateful """ USED_MIXIN_CLASSES: List[Type[PackageMixin]] = [] mixins: List[PackageMixin] def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.mixins = [] def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) # Now must add routes for mixins for used_mixin_class in cls.USED_MIXIN_CLASSES: mixin_qualified_name = used_mixin_class.__module__ + "." + used_mixin_class.__qualname__ if mixin_qualified_name not in cls._package_spec.used_mixins: cls.scan_mixin(cls._package_spec, used_mixin_class) cls._package_spec.used_mixins.append(mixin_qualified_name)
[docs] @classmethod def scan_mixin( cls, package_spec: PackageSpec, mixin_class: Type[PackageMixin], mixin_instance: Optional[PackageMixin] = None, permit_overwrite_of_existing_methods: bool = False, ): for route_method in find_route_methods(mixin_class): if mixin_instance is not None: func_binding = partial(route_method.attribute, self=mixin_instance) else: func_binding = ( route_method.attribute ) # This binding is not truly valid. It must be overwritten during add_mixin method_spec = MethodSpec.from_class( mixin_class, route_method.attribute.__name__, path=route_method.path, verb=route_method.verb, config=route_method.config, func_binding=func_binding, ) try: allow_override_for_this_route = permit_overwrite_of_existing_methods comparison_path = ( route_method.path if route_method.path.startswith("/") else "/" + route_method.path ) existing_route = package_spec.method_mappings.get(route_method.verb, {}).get( comparison_path ) if ( existing_route is not None and existing_route.class_name == mixin_class.__name__ and mixin_instance is not None ): allow_override_for_this_route = True package_spec.add_method( method_spec, permit_overwrite_of_existing=allow_override_for_this_route, ) except RouteConflictError as conflict_error: message = f"When attempting to add mixin {mixin_class.__name__}, route {route_method.verb} {route_method.path} conflicted with already added route {route_method.verb} {route_method.path} on class {conflict_error.existing_method_spec.class_name}" raise RouteConflictError( message=message, existing_method_spec=conflict_error.existing_method_spec, )
[docs] def add_mixin(self, mixin: PackageMixin, permit_overwrite_of_existing_methods: bool = False): PackageService.scan_mixin( self._package_spec, mixin.__class__, mixin, permit_overwrite_of_existing_methods=permit_overwrite_of_existing_methods, ) self.mixins.append(mixin)
[docs] def instance_init(self): """The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this calls instance_init on mixins, in order.""" for mixin in self.mixins: mixin.instance_init()
[docs] def invoke_later( self, method: str, verb: Verb = Verb.POST, wait_on_tasks: List[Task] = None, arguments: Dict[str, Any] = None, delay_ms: Optional[int] = None, ) -> Task[Any]: """Schedule a method for future invocation. Parameters ---------- method: str The method to invoke, as registered with Steamship in the @get or @post decorator. verb: Verb The HTTP Verb to use. Default is POST. wait_on_tasks: List[Task] A list of Task objects (or task IDs) that should be waited upon before invocation. arguments: Dict[str, Any] The keyword arguments of the invoked method delay_ms: Optional[int] A delay, in milliseconds, before the invocation should execute. Returns ------- Task[Any] A Task representing the future work """ if self.context is None: raise SteamshipError( message="Unable to call invoke_later because the InvocationContext was None" ) if self.context.invocable_instance_handle is None: raise SteamshipError( message="Unable to call invoke_later because the invocable_instance_handle on InvocationContext was None" ) payload = { "instanceHandle": self.context.invocable_instance_handle, "payload": { "httpVerb": verb.value, "invocationPath": method, "arguments": arguments or {}, }, } operation = "package/instance/invoke" logging.info( f"Scheduling {verb} {method} for future invocation on me ({self.context.invocable_handle})" ) resp = self.client.post( operation, payload, expect=Task[Task], # This operation should return a task as_background_task=True, # This operation should always be asynchronous wait_on_tasks=wait_on_tasks, # This operation might await other tasks first task_delay_ms=delay_ms, # This operation might have a required delay ) return resp