Source code for steamship.agents.schema.cache

import hashlib
import json
import logging
from typing import Dict, List, Optional

from steamship import Block, MimeTypes, Steamship
from steamship.agents.schema.action import Action, FinishAction
from steamship.utils.kv_store import KeyValueStore


def _blocks_to_cache_dict(value: List[Block]) -> Dict[str, any]:
    """Attempts to convert a list of blocks to key-store-safe dictionary.

    Convention: {'blocks':[{'id': <block_id>}, {'text': <some_text>}]}
    """

    if not value:
        return {}
    if not isinstance(value, list):
        return {}
    blocks = []
    for block in value:
        if block.id:
            blocks.append({"id": block.id})
        else:
            # TODO(dougreid): safe assumption about temporary blocks?
            blocks.append({"text": block.text})
    return {"blocks": blocks}


def _blocks_from_cache_dict(client: Steamship, value: Dict[str, any]) -> List[Block]:
    """Attempts to convert a key-store-safe dictionary to a list of blocks.

    Convention: {'blocks':[{'id': <block_id>}, {'text': <some_text>}]}
    """
    if block_list := value.get("blocks"):
        return_blocks = []
        for b in block_list:
            if block_id := b.get("id"):
                return_blocks.append(Block.get(client, _id=block_id))
            else:
                return_blocks.append(Block(text=b.get("text"), mime_type=MimeTypes.TXT))
        return return_blocks

    return []


[docs] class ActionCache: """Provide persistent cache layer for AgentContext that allows lookups of output blocks from Actions. Use this cache to eliminate calls to Tools. NOTE: EXPERIMENTAL. """ client: Steamship key_value_store: KeyValueStore def __init__(self, client: Steamship, key_value_store: KeyValueStore): self.client = client self.key_value_store = key_value_store
[docs] @staticmethod def get_or_create(client: Steamship, context_keys: Dict[str, str]): cache_handle = ( f"actioncache-{hashlib.sha256(json.dumps(context_keys).encode('utf-8')).hexdigest()}" ) return ActionCache( client=client, key_value_store=KeyValueStore(client=client, store_identifier=cache_handle), )
@staticmethod def _cache_key_for(action: Action) -> str: if isinstance(action, Action): key = f"{action.tool}" for block in action.input: if block.is_text(): key = f"{key}-{block.text}" else: key = f"{key}-{block.id}" else: key = f"unknown-{json.dumps(action)}" return f"sk-{hashlib.sha256(key.encode('utf-8')).hexdigest()}"
[docs] def lookup(self, key: Action) -> Optional[List[Block]]: cache_key = ActionCache._cache_key_for(key) value = self.key_value_store.get(key=cache_key) or None if value: logging.debug(f"cache hit for {cache_key}") return _blocks_from_cache_dict(self.client, value) logging.debug(f"cache miss for {cache_key}") return None
[docs] def clear(self) -> None: self.key_value_store.reset()
[docs] def update(self, key: Action, value: List[Block]): # TODO: should this be synchronous and wait? self.key_value_store.set( key=ActionCache._cache_key_for(key), value=_blocks_to_cache_dict(value) ) return
[docs] def delete(self, key: Action) -> bool: return self.key_value_store.delete(key=ActionCache._cache_key_for(key))
[docs] class LLMCache: """Provide persistent cache layer for AgentContext that allows lookups of Actions from LLM prompts. Use this cache to eliminate calls to LLMs for Tool selection and direct responses. NOTE: EXPERIMENTAL. """ client: Steamship key_value_store: KeyValueStore def __init__(self, client: Steamship, key_value_store: KeyValueStore): self.client = client self.key_value_store = key_value_store
[docs] @staticmethod def get_or_create(client: Steamship, context_keys: Dict[str, str]): cache_handle = ( f"llmcache-{hashlib.sha256(json.dumps(context_keys).encode('utf-8')).hexdigest()}" ) return LLMCache( client=client, key_value_store=KeyValueStore(client=client, store_identifier=cache_handle), )
@staticmethod def _cache_key_for(inputs: List[Block]) -> str: if isinstance(inputs, list): key = "llm" for block in inputs: if block.is_text(): key = f"{key}-{block.text}" else: key = f"{key}-{block.id}" else: key = f"unknown-{json.dumps(inputs)}" return f"sk-{hashlib.sha256(key.encode('utf-8')).hexdigest()}" def _action_from_value(self, value: dict) -> Action: input_blocks = [] if in_blocks := value.get("input"): input_blocks = _blocks_from_cache_dict(self.client, in_blocks) output_blocks = [] if out_blocks := value.get("output"): output_blocks = _blocks_from_cache_dict(self.client, out_blocks) if value.get("tool") == "Agent-FinishAction": return FinishAction(input=input_blocks, output=output_blocks) return Action(tool=value.get("tool"), input=input_blocks, output=output_blocks)
[docs] def lookup(self, key: List[Block]) -> Optional[Action]: cache_key = LLMCache._cache_key_for(key) value = self.key_value_store.get(key=cache_key) or None if value: logging.debug(f"cache hit for {cache_key}") return self._action_from_value(value) logging.debug(f"cache miss for {cache_key}") return None
[docs] def clear(self) -> None: self.key_value_store.reset()
[docs] def update(self, key: List[Block], value: Action): # TODO: should this be synchronous and wait? action_dict = { "tool": value.tool, "input": _blocks_to_cache_dict(value.input), "output": _blocks_to_cache_dict(value.output), } self.key_value_store.set(key=LLMCache._cache_key_for(key), value=action_dict) return
[docs] def delete(self, key: List[Block]) -> bool: return self.key_value_store.delete(key=LLMCache._cache_key_for(key))