Source code for steamship.agents.tools.base_tools
import json
from abc import abstractmethod
from typing import Any, List, Optional, Union
import requests
from steamship import Block, File, MimeTypes, Task
from steamship.agents.schema import AgentContext, Tool
[docs]
class GeneratorTool(Tool):
"""
A base class for tools that wrap Steamship Generator plugins. Subclass this and implement
the `accept_output_block` method.
"""
generator_plugin_handle: str
generator_plugin_instance_handle: Optional[str] = None
generator_plugin_config: dict = {}
merge_blocks: bool = False
make_output_public: bool = False
[docs]
@abstractmethod
def accept_output_block(self, block: Block) -> bool:
raise NotImplementedError()
[docs]
def run(self, tool_input: List[Block], context: AgentContext) -> Union[List[Block], Task[Any]]:
generator = context.client.use_plugin(
plugin_handle=self.generator_plugin_handle,
instance_handle=self.generator_plugin_instance_handle,
config=self.generator_plugin_config,
)
tasks = []
if self.merge_blocks:
block = tool_input[0]
for extra_block in tool_input[1:]:
block.text = f"{block.text}\n\n{extra_block.text}"
tool_input = [block]
for block in tool_input:
if isinstance(block, str):
print(block)
if not block.is_text():
continue
prompt = block.text
task = generator.generate(
text=prompt, append_output_to_file=True, make_output_public=self.make_output_public
)
tasks.append(task)
# TODO / REMOVE Synchronous execution is a temporary simplification while we merge code.
output_blocks = []
# NB: In an async framework, we need to contend with the fact that we have some Monad-style trickery
# to contend with. Namely that the below code has taken us from List[Block] as the universal type to
# List[List[Block]] as the async result that we will have to process.
#
# It's unclear to me (ted) if this is something we leave as an exercise for implementors or build in
# as universally handled.
for task in tasks:
task.wait()
task_blocks = self.post_process(task, context)
for block in task_blocks:
output_blocks.append(block)
# END TODO / REMOVE
return output_blocks
[docs]
def post_process(self, task: Task, context: AgentContext) -> List[Block]:
"""In this case, the Generator returns a GeneratorResponse that has a .blocks method on it"""
try:
return task.output.blocks
except BaseException:
return json.loads(task.output).get("blocks")
[docs]
class ImageGeneratorTool(GeneratorTool):
"""
A base class for tools that wrap Steamship Image Generator plugins.
"""
# So that generated image blocks can be accessed from chat clients
make_output_public: bool = True
[docs]
class VideoGeneratorTool(GeneratorTool):
"""
A base class for tools that wrap Steamship Video Generator plugins.
"""
# So that generated video blocks can be accessed from chat clients
make_output_public: bool = True
[docs]
class AudioGeneratorTool(GeneratorTool):
"""
A base class for tools that wrap Steamship Audio Generator plugins.
"""
# So that generated audio blocks can be accessed from chat clients
make_output_public: bool = True
[docs]
class ScrapeAndBlockifyTool(Tool):
"""
A base class for tools that wrap Steamship Blockifier plugin which transforms bytes to a set of blocks.
"""
blockifier_plugin_handle: str
blockifier_plugin_instance_handle: Optional[str] = None
blockifier_plugin_config: dict = {}
def _scrape(self, url: str, context: AgentContext) -> File:
response = requests.get(url)
file = File.create(context.client, content=response.content, mime_type=self.get_mime_type())
return file
[docs]
def run(self, tool_input: List[Block], context: AgentContext) -> Union[List[Block], Task[Any]]:
tasks = []
blockifier = context.client.use_plugin(
plugin_handle=self.blockifier_plugin_handle,
instance_handle=self.blockifier_plugin_instance_handle,
config=self.blockifier_plugin_config,
)
for input_block in tool_input:
if not input_block.is_text():
continue
url = input_block.text
file = self._scrape(url, context)
task = file.blockify(blockifier.handle)
tasks.append(task)
# TODO / REMOVE Synchronous execution is a temporary simplification while we merge code.
output_blocks = []
# NB: In an async framework, we need to contend with the fact that we have some Monad-style trickery
# to contend with. Namely that the below code has taken us from List[Block] as the universal type to
# List[List[Block]] as the async result that we will have to process.
#
# It's unclear to me (ted) if this is something we leave as an exercise for implementors or build in
# as universally handled.
for task in tasks:
task.wait()
task_blocks = self.post_process(task, context)
for block in task_blocks:
output_blocks.append(block)
# END TODO / REMOVE
return output_blocks
[docs]
def post_process(self, task: Task, context: AgentContext) -> List[Block]:
"""In this case, the Blockifier returns a BlockAndTagResponse that has a .file.blocks method on it"""
try:
return task.output.file.blocks
except BaseException:
return json.loads(task.output).get("file", {}).get("blocks")
[docs]
class ImageBlockifierTool(ScrapeAndBlockifyTool):
"""
A base class for tools that wrap Steamship Image Blockifier plugins.
"""