Source code for steamship.invocable.invocable

"""Please see https://docs.steamship.com/ for information about building a Steamship Package"""
import inspect
import logging
import pathlib
import time
from abc import ABC
from functools import wraps
from http import HTTPStatus
from typing import Any, Dict, List, Optional, Type, Union

import toml
from pydantic import BaseModel

from steamship import SteamshipError
from steamship.base.package_spec import MethodSpec, PackageSpec
from steamship.client.steamship import Steamship
from steamship.invocable import Config
from steamship.invocable.config import ConfigParameter
from steamship.invocable.invocable_request import InvocableRequest, InvocationContext
from steamship.invocable.invocable_response import InvocableResponse
from steamship.utils.url import Verb


[docs] def make_registering_decorator(decorator): """ Returns a copy of foreignDecorator, which is identical in every way(*), except also appends a .decorator property to the callable it spits out. (*)We can be somewhat "hygienic", but newDecorator still isn't signature-preserving, i.e. you will not be able to get a runtime list of parameters. For that, you need hackish libraries...but in this case, the only argument is func, so it's not a big issue """ def new_decorator(func): # Call to newDecorator(method) # Exactly like old decorator, but output keeps track of what decorated it output = decorator( func ) # apply foreignDecorator, like call to foreignDecorator(method) would have done output.decorator = new_decorator # keep track of decorator # R.original = func # might as well keep track of everything! return output new_decorator.__name__ = decorator.__name__ new_decorator.__doc__ = decorator.__doc__ new_decorator.__is_endpoint__ = True return new_decorator
# https://stackoverflow.com/questions/2366713/can-a-decorator-of-an-instance-method-access-the-class # noinspection PyUnusedLocal
[docs] def endpoint(verb: str = None, path: str = None, **kwargs): """By using ``kwargs`` we can tag the function with Any parameters.""" # noqa: RST210 def decorator(function): # This is used in conjunction with the __init_subclass__ code! # Otherwise the __name__ won't be correct in maybeDecorated.__name__! # noinspection PyShadowingNames @wraps(function) def wrap(self, *args, **kwargs): return function(self, *args, **kwargs) # Build a dictionary of String->Primitive Types to pass back with endpoint # This enables the Engine to add support for features like public=True, etc, without the Client changing. config: Dict[str, Union[str, bool, int, float]] = {} for key, val in kwargs.items(): if isinstance(val, (str, bool, int, float)): config[key] = val wrap.__path__ = path wrap.__verb__ = verb wrap.__endpoint_config__ = config return wrap decorator = make_registering_decorator(decorator) return decorator
[docs] def get(path: str, **kwargs): return endpoint(verb=Verb.GET, path=path, **kwargs)
[docs] def post(path: str, **kwargs): return endpoint(verb=Verb.POST, path=path, **kwargs)
[docs] class RouteMethod(BaseModel): attribute: Any verb: Optional[Verb] path: str config: Dict[str, Any]
[docs] def find_route_methods(on_class: Type) -> List[RouteMethod]: base_route_methods = [ route_method for base_cls in on_class.__bases__ for route_method in find_route_methods(base_cls) ] this_class_route_methods: List[RouteMethod] = [] for attribute in list(on_class.__dict__.values()): decorator = getattr(attribute, "decorator", None) if decorator: if getattr(decorator, "__is_endpoint__", False): path = getattr(attribute, "__path__", None) verb = getattr(attribute, "__verb__", None) config = getattr(attribute, "__endpoint_config__", {}) this_class_route_methods.append( RouteMethod(attribute=attribute, verb=verb, path=path, config=config) ) return merge_routes_respecting_override(base_route_methods, this_class_route_methods)
[docs] def merge_routes_respecting_override( base_routes: List[RouteMethod], this_class_routes: List[RouteMethod] ) -> List[RouteMethod]: """Merge routes from base classes into the routes from this class. If this class already has verb/path combo, ignore the one from the superclass, since it has now been overridden.""" for route_method in base_routes: if not route_list_contains(route_method, this_class_routes): this_class_routes.append(route_method) return this_class_routes
[docs] def route_list_contains(route_method: RouteMethod, routes: List[RouteMethod]): for other in routes: if other.path == route_method.path and other.verb == route_method.verb: return True return False
[docs] class Invocable(ABC): """A Steamship microservice. This model.py class: 1. Provide a pre-authenticated instance of the Steamship client 2. Provides a Lambda handler that routes to registered functions 3. Provides useful methods connecting functions to the router. """ _package_spec: PackageSpec config: Config context: InvocationContext def __init__( self, client: Steamship = None, config: Dict[str, Any] = None, context: InvocationContext = None, ): # Create an instance-level clone of the PackageSpec so that any route registrations to not impact other # instance that may exist. if self.__class__._package_spec: self._package_spec = self.__class__._package_spec.clone() self.context = context try: secret_kwargs = toml.load(".steamship/secrets.toml") except OSError: # Support local secret loading try: local_secrets_file = ( pathlib.Path(inspect.getfile(type(self))).parent / ".steamship" / "secrets.toml" ) secret_kwargs = toml.load(str(local_secrets_file)) except (TypeError, OSError): # Support collab usage secret_kwargs = {} # The configuration for the Invocable is the union of: # # 1) The `secret_kwargs` dict, read in from .steamship/secrets.toml, if it exists, and # 2) The `config` dict, provided upon instantiation. # # When invoked from within Steamship, the `config` dict is frozen, at the instance level, upon instance # creation. All subsequent method invocations reuse that frozen config. config = { **secret_kwargs, **{k: v for k, v in (config or {}).items() if v != ""}, } # Finally, we set the config object to an instance of the class returned by `self.config_cls` if config: self.config = self.config_cls()(**config) else: self.config = self.config_cls()() self.client = client def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) start_time = time.time() # The subclass takes care to extend what the superclass may have set here by copying it. _package_spec = PackageSpec(name=cls.__name__, doc=cls.__doc__, methods=[]) if hasattr(cls, "_package_spec"): _package_spec.import_parent_methods(cls._package_spec) _package_spec.used_mixins = cls._package_spec.used_mixins or [] # The subclass always overrides whatever the superclass set here, having cloned its data. cls._package_spec = _package_spec for route_method in find_route_methods(cls): cls._register_mapping( name=route_method.attribute.__name__, verb=route_method.verb, path=route_method.path, config=route_method.config, ) # Add the HTTP GET /__dir__ method which returns a serialization of the PackageSpec. # Wired up to both GET and POST for convenience (POST is Steamship default; GET is browser friendly) cls._register_mapping(name="__steamship_dir__", verb=Verb.GET, path="/__dir__") cls._register_mapping(name="__steamship_dir__", verb=Verb.POST, path="/__dir__") cls._register_mapping( name="invocable_instance_init", verb=Verb.POST, path="/__instance_init__", config={} ) end_time = time.time() logging.info(f"Registered package functions in {end_time - start_time} seconds.") def __steamship_dir__(self) -> dict: """Return this Invocable's PackageSpec for remote inspection -- e.g. documentation or OpenAPI generation.""" return self._package_spec.dict(by_alias=True)
[docs] def invocable_instance_init(self) -> InvocableResponse: self.instance_init() return InvocableResponse(data=True)
[docs] def add_api_route(self, method_spec: MethodSpec, permit_overwrite_of_existing: bool = False): """Add an API route to this Invocable instance.""" if self._package_spec is None: raise SteamshipError( message=f"Unable to add API route {method_spec}. Reason: _package_spec on Invocable was None." ) self._package_spec.add_method( method_spec, permit_overwrite_of_existing=permit_overwrite_of_existing )
[docs] def instance_init(self): """The instance init method will be called ONCE by the engine when a new instance of a package or plugin has been created. By default, this does nothing.""" pass
[docs] @classmethod def config_cls(cls) -> Type[Config]: """Returns the configuration object for the Invocable. By default, Steamship packages and plugins will not take any configuration. Steamship packages and plugins may declare a configuration object which extends from Config, if needed, as follows: class MyPackageOrPlugin: class MyConfig(Config): ... @classmethod def config_cls(cls): return MyPackageOrPlugin.MyConfig """ # noqa: RST301 return Config
@classmethod def _register_mapping( cls, name: str, verb: Optional[Verb] = None, path: str = "", config: Dict[str, Union[int, float, bool, str]] = None, ) -> MethodSpec: """Register a mapping that permits a method to be invoked via HTTP.""" method_spec = MethodSpec.from_class( cls, name, path=path, verb=verb, config=config, func_binding=name ) cls._package_spec.add_method(method_spec, permit_overwrite_of_existing=True) return method_spec def __call__(self, request: InvocableRequest, context: Any = None) -> InvocableResponse: """Invokes a method call if it is registered.""" if request.invocation is None: logging.error("__call__: No invocation on request.") return InvocableResponse.error( code=HTTPStatus.NOT_FOUND, message="No invocation was found." ) verb = request.invocation.http_verb path = request.invocation.invocation_path logging.info(f"REQUEST [{verb}] {path}") method_spec = self._package_spec.get_method(verb, path) if method_spec is None: return InvocableResponse.error( code=HTTPStatus.NOT_FOUND, message=f"No handler for {verb} {path} available.", ) bound_function = method_spec.get_bound_function(self) if not bound_function: logging.error( f"__call__: Method not found or not callable for '{path}' in method_mappings[{verb}]." ) return InvocableResponse.error( code=HTTPStatus.INTERNAL_SERVER_ERROR, message=f"Handler for {verb} {path} not callable.", ) arguments = request.invocation.arguments if arguments is None: return bound_function() else: return bound_function(**arguments)
[docs] @classmethod def get_config_parameters(cls) -> Dict[str, ConfigParameter]: return cls.config_cls().get_config_parameters()