Source code for steamship.cli.cli

import json
import logging
import os
import platform
import signal
import subprocess  # noqa: S404
import sys
import time
from os import getenv, path
from typing import Optional

import click

import steamship
from steamship import PackageInstance, Steamship, SteamshipError, Workspace
from steamship.base.configuration import DEFAULT_WEB_BASE, Configuration
from steamship.cli.create_instance import (
    config_str_to_dict,
    create_instance,
    load_manifest,
    set_unset_params,
)
from steamship.cli.deploy import (
    PackageDeployer,
    PluginDeployer,
    bundle_deployable,
    update_config_template,
)
from steamship.cli.local_server.server import SteamshipHTTPServer
from steamship.cli.manifest_init_wizard import manifest_init_wizard
from steamship.cli.requirements_init_wizard import requirements_init_wizard
from steamship.cli.ship_spinner import ship_spinner
from steamship.cli.utils import find_api_py, get_api_module
from steamship.data.manifest import DeployableType, Manifest
from steamship.data.package.package_instance import LOCAL_DEVELOPMENT_VERSION_HANDLE
from steamship.data.user import User
from steamship.invocable.dev_logging_handler import DevelopmentLoggingHandler
from steamship.invocable.lambda_handler import get_class_from_module
from steamship.utils.repl import HttpREPL


@click.group()
def cli():
    if not steamship.is_supported_python_version():
        click.echo("⚠️ ⚠️ ⚠️️️️️️️️️️️")
        click.echo(f"Running on unsupported Python version: {sys.version}")
        click.echo(
            "Steamship runtime supports only the following versions; you may see irregular behavior otherwise:"
        )
        for supported_name in steamship.SUPPORTED_PYTHON_VERSION_NAMES:
            click.echo(f"  {supported_name}")
        click.echo("Please develop against supported Python versions.")
        click.echo("⚠️ ⚠️ ⚠️️️️️️️️️️️")


[docs] def initialize(suppress_message: bool = False): logging.root.setLevel(logging.FATAL) if not suppress_message: click.echo(f"Steamship Python CLI version {steamship.__version__}")
[docs] def initialize_and_get_client_and_prep_project(): initialize() client = None try: client = Steamship() except SteamshipError as e: click.secho(e.message, fg="red") click.get_current_context().abort() user = User.current(client) if path.exists("steamship.json"): manifest = Manifest.load_manifest() else: manifest = manifest_init_wizard(client) manifest.save() if not path.exists("requirements.txt"): requirements_init_wizard() update_config_template(manifest) return client, user, manifest
@click.command() def login(): """Log in to Steamship, creating ~/.steamship.json""" initialize() click.echo("Logging into Steamship.") if sys.argv[1] == "login": if Configuration.default_config_file_has_api_key(): overwrite = click.confirm( text="You already have an API key in your .steamship.json file. Do you want to remove it and login?", default=False, ) if not overwrite: sys.exit(0) Configuration.remove_api_key_from_default_config() # Carry on with login client = Steamship() user = User.current(client) click.secho(f"🚢🚢🚢 Hooray! You're logged in with user handle: {user.handle} 🚢🚢🚢", fg="green") @click.command() def ships(): """Ship some ships""" initialize() click.secho("Here are some ships:", fg="cyan") with ship_spinner(): time.sleep(5) click.secho() def _run_ngrok(local_port: int) -> str: """Create an NGROK URL directed at `local_port`.""" try: from pyngrok import ngrok except BaseException: click.secho("⚠️ Public API: Unable to start ngrok. Please either:") click.secho(" - Install pyngrok via `pip install pyngrok`") click.secho(" - Use the --no-ngrok flag") click.secho(" NGROK is only necessary if you wish to debug Telegram or Slack locally.") exit(1) try: http_tunnel = ngrok.connect(local_port, bind_tls=True) except BaseException: click.secho(f"⚠️ Public API: Unable to bind ngrok to port {local_port}") click.secho(" - Try running with a different port via the --port flag.") exit(1) ngrok_api_url = http_tunnel.public_url return ngrok_api_url def _run_local_server( local_port: int, instance_handle: Optional[str] = None, config: Optional[str] = None, workspace: Optional[str] = None, base_url: Optional[ str ] = None, # If provided, will override the default calculation, eg for ngrok ) -> str: """Start the local API Server.""" logging.info( f"Starting local server. port={local_port}, instance_handle={instance_handle}, workspace={workspace}" ) path = find_api_py() api_module = get_api_module(path) invocable_class = get_class_from_module(api_module) # Use the provided base url (e.g. from NGROK) or default to localhost _base_url = base_url or "http://localhost" logging.info(f"Local server base URL (for PackageService configuration) is {_base_url}") if not invocable_class: logging.error("Local server startup unable to find Steamship service.") click.secho("⚠️ Local API: Unable to find Steamship service. Please:") click.secho( " - Check to see that you have an api.py file containing an AgentService or PackageService " ) exit(1) manifest = load_manifest() if not manifest: logging.error("Local server startup unable to find Steamship manifest.") click.secho("⚠️ Local API: Unable to find your steamship.json file") exit(1) invocable_config, is_file = config_str_to_dict(config) set_unset_params(config, invocable_config, is_file, manifest) server = SteamshipHTTPServer( invocable_class, base_url=_base_url, port=local_port, invocable_handle=manifest.handle, invocable_version_handle=manifest.version, invocable_instance_handle=instance_handle, config=invocable_config, workspace=workspace, ) try: server.start() except SteamshipError as e: click.secho(f"⚠️ Local API: {e.message}") if "validation error" in e.message: click.secho( "💡 Suggestion: Create a configuration file called config.json containing the required " ) click.secho( " configuration fields in your agent's configuration. Then run again with" ) click.secho(" ship run local -c config.json") exit(-1) except BaseException as e: click.secho(f"⚠️ Local API: {e}") exit(-1) logging.info("Local server running.") def on_exit(signum, frame): logging.info("Local server shutting down.") click.secho("Shutting down server.") server.stop() click.secho("Shut down.") logging.info("Local server shut down complete.") exit(1) signal.signal(signal.SIGINT, on_exit) local_api_url = f"http://localhost:{server.port}" logging.info(f"Local server address: {local_api_url}") return local_api_url def _run_web_interface(base_url: str, workspace_handle: str, instance_handle: str) -> str: web_base = DEFAULT_WEB_BASE logging.info(f"Starting web interface. web_base={web_base} and base_url={base_url}") try: config = Configuration() web_base = config.web_base except BaseException: click.secho("⚠️ Web UI: Unable to find Steamship Configuration. Please:") click.secho( " - Run `ship login` to make sure you have Steamship credentials in your environment." ) # Guarantee one and only one trailing / if not web_base.endswith("/"): web_base = f"{web_base}/" web_url = f"{web_base}dashboard/agents/workspaces/{workspace_handle}/packages/{instance_handle}" logging.info(f"Web interface url is: {web_url}") return web_url
[docs] def register_locally_running_package_with_engine( client: Steamship, ngrok_api_url: str, package_handle: str, manifest: Manifest, config: Optional[str] = None, ) -> PackageInstance: """Registers the locally running package with the Steamship Engine.""" # Register the Instance in the Engine invocable_config, is_file = config_str_to_dict(config) set_unset_params(config, invocable_config, is_file, manifest) package_instance = PackageInstance.create_local_development_instance( client, local_development_url=ngrok_api_url, package_handle=package_handle, config=invocable_config, ) return package_instance
[docs] def serve_local( # noqa: C901 port: int = 8443, no_ngrok: Optional[bool] = False, no_repl: Optional[bool] = False, config: Optional[str] = None, workspace: Optional[str] = None, ): """Serve the invocable on localhost. Useful for debugging locally.""" dev_logging_handler = DevelopmentLoggingHandler.init_and_take_root() click.secho("Running your project...\n") client, user, manifest = initialize_and_get_client_and_prep_project() if workspace: # Fetch/create a workspace if one was specified. workspace_obj = Workspace.create(client, handle=workspace, fetch_if_exists=True) else: # Create a new workspace if none was specified. # Otherwise multiple runs co-mingle data in the `default` workspace. workspace_obj = Workspace.create(client) workspace = workspace_obj.handle # Now switch the workspace to the one just created. client.switch_workspace(workspace) # Make sure we're running a package. if manifest.type != DeployableType.PACKAGE: click.secho( f"⚠️ Must run `ship serve local` in a folder with a Steamship Package. Found: {manifest.type}" ) exit(-1) # Make sure we have a package name -- this allows us to register the running copy with the engine. deployer = PackageDeployer() deployable = deployer.create_or_fetch_deployable(client, user, manifest) # Report the workspace we're running in click.secho(f"🗃️ Workspace: {workspace}") # Report the logs output file. click.secho(f"📝 Log file: {dev_logging_handler.log_filename}") # Start the NGROK connection ngrok_api_url = None public_api_url = None # Hard coded instance handle to represent "a local instance that isn't connected to the engine" local_instance_handle = "local-dev-instance-not-connected-to-engine" if not no_ngrok: ngrok_api_url = _run_ngrok(port) # It requires a trailing slash if ngrok_api_url[-1] != "/": ngrok_api_url = ngrok_api_url + "/" registered_instance = register_locally_running_package_with_engine( client=client, ngrok_api_url=ngrok_api_url, package_handle=deployable.handle, manifest=manifest, config=config, ) # Replace the local instance handle with the instance just registered in the engine. local_instance_handle = registered_instance.handle # Notes: # 1. registered_instance.invocation_url is the NGROK URL, not the Steamship Proxy URL. # 2. The public_api_url should still be NGROK, not the Proxy. The local server emulates the Proxy and # the Proxy blocks this kind of development traffic. public_api_url = ( f"https://{user.handle}.steamship.run/{workspace}/{registered_instance.handle}/" ) click.secho(f"🌎 Public API: {public_api_url}") # Start the local API Server. This has to happen after NGROK because the port & url need to be plummed. try: local_api_url = _run_local_server( local_port=port, instance_handle=local_instance_handle, config=config, workspace=workspace, base_url=public_api_url, ) except BaseException as e: click.secho("⚠️ Local API: Unable to start local server.") click.secho(e) exit(-1) if local_api_url[-1] != "/": local_api_url = local_api_url + "/" if local_api_url: click.secho(f"🌎 Local API: {local_api_url}") else: click.secho("⚠️ Local API: Unable to start local server.") exit(-1) # Start the web UI if public_api_url: web_url = _run_web_interface(public_api_url, workspace, local_instance_handle) if web_url: click.secho(f"🌎 Web UI: {web_url}") if no_repl: while True: # We need to make sure the thread doesn't exit. time.sleep(1) else: click.secho("\n💬 Interactive REPL below. Type to interact.\n") prompt_url = f"{local_api_url or public_api_url}prompt" repl = HttpREPL(prompt_url=prompt_url, dev_logging_handler=dev_logging_handler) repl.run()
@click.command() @click.option( "--port", "-p", type=int, default=8443, help="Port to host the server on.", ) @click.option( "--instance_handle", "-h", type=str, default=None, help="Handle of the package or plugin instance being hosted.", ) @click.option( "--no-ngrok", is_flag=True, default=False, help="Don't attempt to attach to ngrok.", ) @click.option( "--no-repl", is_flag=True, default=False, help="Don't start a console REPL.", ) @click.option( "--config", "-c", type=str, required=False, help="Instance configuration. May be inline JSON or a path to a file. If not specified, " "an empty configuration dictionary will be passed to the instance.", ) @click.option( "--workspace", "-w", required=False, type=str, help="Workspace handle. The new instance will be created in this workspace. If not specified, " "the default workspace will be used.", ) @click.argument( "environment", required=True, type=click.Choice(["local", "remote"], case_sensitive=False) ) @click.pass_context def run( ctx, environment: str, port: int = 8080, instance_handle: Optional[str] = None, no_ngrok: Optional[bool] = False, no_repl: Optional[bool] = False, config: Optional[str] = None, workspace: Optional[str] = None, ): """Serve your invocable locally or on prod""" if environment == "local": serve_local( port=port, no_ngrok=no_ngrok, no_repl=no_repl, config=config, workspace=workspace, ) else: if click.confirm("Do you want to deploy a new version first?", default=False): ctx.invoke(deploy) ctx.invoke( create_instance, workspace=workspace, instance_handle=instance_handle, config=config ) @click.command() def deploy(): """Deploy the package or plugin in this directory""" client, user, manifest = initialize_and_get_client_and_prep_project() deployable_type = manifest.type deployer = None if deployable_type == DeployableType.PACKAGE: deployer = PackageDeployer() elif deployable_type == DeployableType.PLUGIN: deployer = PluginDeployer() else: click.secho("Deployable must be of type package or plugin.", fg="red") click.get_current_context().abort() deployable = deployer.create_or_fetch_deployable(client, user, manifest) click.echo("Bundling content... ", nl=False) bundle_deployable(manifest) click.echo("Done. 📦") _ = deployer.create_version(client, manifest, deployable.id) thing_url = f"{client.config.web_base}{deployable_type.value}s/{manifest.handle}" click.echo( f"Deployment was successful. View and share your new {deployable_type.value} here:\n\n{thing_url}\n" ) # Common error conditions: # - Package/plugin handle already taken. [handled; asks user for new] # - Version handle already deployed. [handled; asks user for new] # - Bad parameter configuration. [mitigated by deriving template from Config object] # - Package content fails health check (ex. bad import) [Error caught while checking config object] @click.command() def info(): """Displays information about the current Steamship user. This is useful to help users (in a support or hackathon context) test whether they have configured their Steamship environment correctly. """ initialize() click.echo("\nSteamship Client Info\n=====================\n") if Configuration.default_config_file_has_api_key() or getenv("STEAMSHIP_API_KEY", None): # User is logged in! client = None try: client = Steamship() except BaseException: click.secho("Incorrect API key or network error.\n", fg="red") click.secho( "Your Steamship API Key is set, but we were unable to use it to fetch your account information.\n" ) click.secho("- If you are on your own computer, run `ship login` to login.") click.secho( "- If you are in Replit, add the STEAMSHIP_API_KEY secret, then close and re-open this shell.\n" ) return try: user = User.current(client) click.echo(f"User handle: {user.handle}") click.echo(f"User ID: {user.id}") click.echo(f"Profile: {client.config.profile}") click.echo("\nReady to ship! 🚢🚢🚢\n") except BaseException: click.secho("Incorrect API key or network error.\n", fg="red") click.secho( "Your Steamship API Key is set, but we were unable to use it to fetch your account information.\n" ) click.secho("- If you are on your own computer, run `ship login` to login.") click.secho( "- If you are in Replit, add the STEAMSHIP_API_KEY secret, then close and re-open this shell.\n" ) else: click.secho("You are not logged in.\n") click.secho("- If you are on your own computer, run `ship login` to login.") click.secho( "- If you are in Replit, add the STEAMSHIP_API_KEY secret, then close and re-open this shell.\n" ) @click.command() @click.option( "--workspace", "-w", required=True, type=str, help="Workspace handle used for scoping logs request. All requests MUST be scoped by workspace.", ) @click.option( "--offset", "-o", default=0, type=int, help="Starting index from sorted logs to return a chunk. Used for paging. Defaults to 0.", ) @click.option( "--number", "-n", default=50, type=int, help="Number of logs to return in a single batch. Defaults to 50.", ) @click.option( "--package", "-p", type=str, help="Package handle. Used to filter logs returend to a specific package (across all instances).", ) @click.option( "--instance", "-i", type=str, help="Instance handle. Used to filter logs returned to a specific instance of a package.", ) @click.option( "--version", "-v", type=str, help="Version handle. Used to filter logs returned to a specific version of a package.", ) @click.option( "--path", "request_path", type=str, help="Path invoked by a client operation. Used to filter logs returned to a specific invocation path.", ) @click.option( "--with-fields", "-f", "field_values", type=str, help="Dictionary of log field values (format: key1=value1,key2=value2,...). Used to filter logs returned.", ) def logs( workspace: str, offset: int, number: int, package: Optional[str] = None, instance: Optional[str] = None, version: Optional[str] = None, request_path: Optional[str] = None, field_values: Optional[str] = None, ): """Retrieve logs within a workspace.""" initialize(suppress_message=True) client = None try: client = Steamship(workspace=workspace) except SteamshipError as e: raise click.UsageError(message=e.message) value_dict = {} if field_values: try: for item in field_values.split(","): key, value = item.split("=") value_dict[key] = value except ValueError: raise click.UsageError( message="Invalid dictionary format for fields. Please provide a dictionary in the " "format: key1=value1,key2=value2,..." ) click.echo( json.dumps( client.logs(offset, number, package, instance, version, request_path, value_dict) ) ) def _exit_if_not_proceed(skip: bool, prompt: str): if skip: return click.secho("⚠️ Deletion is a destructive operation. It is not recoverable. ⚠️", fg="red") click.secho(prompt, fg="red") confirm = click.confirm("Proceed?", default=False) if not confirm: exit(0) @click.command() @click.option( "--workspace", "-w", required=True, type=str, help="Workspace handle.", ) @click.option( "--instance", "-i", type=str, help="Instance handle.", ) @click.option("--yes", is_flag=True) def delete( workspace: str, instance: Optional[str] = None, yes: Optional[bool] = False, ): """Deletes Steamship workspaces and instances, based on provided fields. NOTE: If `instance` is not specified, the `workspace` will be deleted. """ initialize(suppress_message=True) client = None try: client = Steamship(workspace=workspace) except SteamshipError as e: raise click.UsageError(message=e.message) if not instance: # delete workspace _exit_if_not_proceed( skip=yes, prompt=f"This will delete workspace '{workspace}' and all data contained within.", ) wspace = client.get_workspace() click.secho(f"Deleting workspace '{workspace}'... ", nl=False, fg="green") wspace.delete() click.secho("Done.", fg="green") return if instance: # delete instance _exit_if_not_proceed( skip=yes, prompt=f"This will delete instance '{instance}' in workspace '{workspace}'" ) try: pkg_inst = PackageInstance.get(client=client, handle=instance) click.secho( f"Deleting instance '{instance}' in workspace '{workspace}'... ", nl=False, fg="green", ) pkg_inst.delete() click.secho("Done.", fg="green") return except SteamshipError as e: click.secho( f"⚠️ Failed to delete instance '{instance}' in workspace '{workspace}': {e.message}", fg="red", ) exit(1) @click.command() def support_info(): """Displays detailed information needed for getting technical support""" initialize() click.echo("\nSteamship User Info\n=====================") if Configuration.default_config_file_has_api_key() or getenv("STEAMSHIP_API_KEY", None): # User is logged in! client = None try: client = Steamship() user = User.current(client) click.echo(f"User ID: {user.id}") except BaseException: click.secho("User not logged in.") else: click.secho("User not logged in.") click.echo("\n\nDeployable Info\n=====================") if path.exists("steamship.json"): manifest = Manifest.load_manifest() click.echo(f"Deployable type: {manifest.type.value}") if manifest.type.value == DeployableType.PLUGIN: click.echo(f"Plugin type: {manifest.plugin.type}") click.echo(f"Deployable handle: {manifest.handle}") click.echo(f"Deployable version: {manifest.version}") else: click.echo("No deployable manifest.") click.echo("\n\nDependency Info\n=====================") click.echo(f"Running Python CLI version: {steamship.__version__}") click.echo("\nEnv packages:") click.echo("\n-------------") subprocess.run(["pip", "list"]) # noqa: S607, S603 click.echo("\nRequirements.txt:") click.echo("\n-----------------") if path.exists("requirements.txt"): with open("requirements.txt") as requirements: lines = requirements.readlines() if len(lines) == 0: click.secho("Empty requirements.txt") else: for line in lines: click.secho(line, nl=False) click.secho("") else: click.echo("FILE NOT PRESENT") click.echo("\n\nEnvironment/OS Info\n=====================") click.secho(f"OS type: {os.name}") click.secho(f"OS Name: {platform.system()}") click.secho(f"OS Version: {platform.release()}") click.secho(f"Python version: {sys.version}") click.secho(f"Shell: {os.environ.get('SHELL')}") click.secho(f"In virtual env: {sys.prefix != sys.base_prefix}") @click.command() @click.option( "--all", is_flag=True, default=False, help="Delete all local development instances without prompting", ) @click.argument("environment", required=True, type=click.Choice(["local"], case_sensitive=False)) def clean(environment: str, all: Optional[bool] = False): client, _, manifest = initialize_and_get_client_and_prep_project() if manifest.type != DeployableType.PACKAGE: click.secho("May only delete development instances of Packages.", fg="red") return click.secho( f"Deleting LOCAL DEVELOPMENT instances of package [{manifest.handle}] across workspaces." ) for instance in PackageInstance.list( client, include_workspace=True, across_workspaces=True ).package_instances: # Only consider deletion for LOCAL DEVELOPMENT instances of THIS PACKAGE. if ( instance.package_version_handle == LOCAL_DEVELOPMENT_VERSION_HANDLE and instance.package_handle == manifest.handle ): workspace = Workspace.get(client, instance.workspace_id) result = all or click.confirm( f"Delete instance [{instance.handle}] in workspace [{workspace.handle}] created {instance.created_at} ?" ) if result: try: instance.delete() click.secho( f"Deleted instance [{instance.handle}] in workspace [{workspace.handle}]", fg="red", ) except Exception as e: click.secho( f"Could NOT delete instance [{instance.handle}] in workspace [{workspace.handle}]. Error: {str(e)}", fg="red", ) cli.add_command(login) cli.add_command(deploy) cli.add_command(info) cli.add_command(deploy, name="it") cli.add_command(ships) cli.add_command(logs) cli.add_command(run) cli.add_command(create_instance, name="use") cli.add_command(delete) cli.add_command(support_info, name="support-info") cli.add_command(clean)