from typing import Optional
from steamship import File, Steamship, Task
from steamship.invocable import PackageService, post
from steamship.invocable.mixins.blockifier_mixin import BlockifierMixin
from steamship.invocable.mixins.file_importer_mixin import FileImporterMixin
from steamship.invocable.mixins.indexer_mixin import IndexerMixin
from steamship.invocable.package_mixin import PackageMixin
from steamship.utils.file_tags import update_file_status
[docs]
class IndexerPipelineMixin(PackageMixin):
"""Provides a complete set of endpoints & async workflow for Document Question Answering.
This Mixin is an async orchestrator of other mixins:
- Importer Mixin: to import files, e.g. YouTube videos, PDF urls
- Blockifier Mixin: to convert files to Blocks -- whether that's s2t or PDF parsing, etc.
- Indexer Mixin: to convert Steamship Files to embedded sharts
"""
client: Steamship
invocable: PackageService
blockifier_mixin: BlockifierMixin
importer_mixin: FileImporterMixin
indexer_mixin: IndexerMixin
def __init__(self, client: Steamship, invocable: PackageService):
self.client = client
self.invocable = invocable
self.importer_mixin = FileImporterMixin(client)
self.invocable.add_mixin(self.importer_mixin)
self.blockifier_mixin = BlockifierMixin(client)
self.invocable.add_mixin(self.blockifier_mixin)
self.indexer_mixin = IndexerMixin(client)
self.invocable.add_mixin(self.indexer_mixin)
[docs]
@post("/set_file_status")
def set_file_status(self, file_id: str, status: str) -> bool:
"""Set the status bit of a file. Intended to be scheduled after import."""
file = File.get(self.client, _id=file_id)
update_file_status(self.client, file, status)
return True
[docs]
@post("/index_url")
def index_url(
self,
url: str,
metadata: Optional[dict] = None,
index_handle: Optional[str] = None,
mime_type: Optional[str] = None,
) -> Task:
"""Load a URL into an embedding index.
URL Types supported:
- PDF (Text)
- TXT and Markdown
- YouTube (Though failure rate is high)
Optional arguments:
- mime_type (if it can be guessed by the Content-Type header or the URL schema)
- index_handle (uses your default index if blank)
- metadata (returned on embedding results for source attribution)
"""
# Step 1: Import the URL
file, task = self.importer_mixin.import_url_to_file_and_task(url)
# Step 2: Blockify the File
importer_task_id = None
if task and task.task_id:
importer_task_id = task.task_id
blockify_task = self.blockifier_mixin.blockify(
file_id=file.id, mime_type=mime_type, after_task_id=importer_task_id
)
# Step 3: Index the File
_metadata = {"url": url}
if metadata is not None:
_metadata.update(metadata)
index_task = self.invocable.invoke_later(
method="index_file",
wait_on_tasks=[blockify_task],
arguments={"file_id": file.id, "index_handle": index_handle, "metadata": _metadata},
)
# Step 4: Set the File Status to 'indexed'
self.invocable.invoke_later(
method="set_file_status",
wait_on_tasks=[index_task],
arguments={
"file_id": file.id,
"status": "Indexed",
},
)
# We return the index task instead of the file set task just to safe a few seconds.
return index_task