Source code for steamship.cli.local_server.server

import logging
import socketserver
import threading
from socketserver import TCPServer
from typing import Optional, Type

from steamship import Steamship, SteamshipError, TaskState
from steamship.cli.local_server.handler import create_safe_handler, make_handler
from steamship.invocable import (
    Invocable,
    InvocableRequest,
    Invocation,
    InvocationContext,
    LoggingConfig,
)


[docs] class ThreadedTCPServer(socketserver.ThreadingMixIn, socketserver.TCPServer): pass
[docs] class SteamshipHTTPServer: """A simple HTTP Server that wraps an invocable (package or plugin). To use, call: server = SteamshipHTTPServer(invocable) server.start() To shut down, call: server.stop() """ invocable: Type[Invocable] base_url: str port: int server: TCPServer default_api_key: Optional[str] = (None,) invocable_handle: str = (None,) invocable_version_handle: str = (None,) invocable_instance_handle: str = (None,) def __init__( self, invocable: Type[Invocable], base_url: Optional[str] = None, port: int = 8080, invocable_handle: str = None, invocable_version_handle: str = None, invocable_instance_handle: str = None, config: dict = None, workspace: str = None, ): self.invocable = invocable self.port = port self.base_url = base_url or f"http://localhost:{self.port}" self.invocable_handle = invocable_handle self.invocable_version_handle = invocable_version_handle self.invocable_instance_handle = invocable_instance_handle self.config = config self.workspace = workspace
[docs] def start(self): """Start the server.""" handler = make_handler( self.invocable, base_url=self.base_url, invocable_handle=self.invocable_handle, invocable_version_handle=self.invocable_version_handle, invocable_instance_handle=self.invocable_instance_handle, config=self.config, workspace=self.workspace, ) self.server = ThreadedTCPServer(("", self.port), handler) self.server.allow_reuse_address = True # Start a thread with the server -- that thread will then start one # more thread for each request self.server_thread = threading.Thread(target=self.server.serve_forever) # Exit the server thread when the main thread terminates self.server_thread.daemon = True self.server_thread.start() logging.info(f"Started local server thread on port {self.port}.") # Call the __dir__ method context = InvocationContext(invocable_url=f"{self.base_url}/") invocation = Invocation( http_verb="POST", invocation_path="__dir__", arguments={}, config=self.config ) event = InvocableRequest( client_config=Steamship(workspace=self.workspace).config, invocation=invocation, logging_config=LoggingConfig(logging_host=None, logging_port=None), invocation_context=context, ) handler = create_safe_handler(self.invocable) resp = handler(event.dict(by_alias=True), context) state = None try: state = resp.get("status", {}).get("state", None) except BaseException: state = "succeeded" if state == TaskState.failed: raise SteamshipError( message=resp.get("status", {}).get("statusMessage", "Unable to start project") )
[docs] def stop(self): """Stop the server. Note: This has to be called from a different thread or else it will deadlock. """ _server = self.server _server.shutdown()