Source code for steamship.invocable.mixins.file_importer_mixin

import logging
from typing import List, Optional, Tuple

import requests

from steamship import DocTag, File, MimeTypes, Steamship, SteamshipError, Tag, Task
from steamship.invocable import post
from steamship.invocable.package_mixin import PackageMixin
from steamship.utils.file_tags import update_file_status


[docs] class FileImporterMixin(PackageMixin): """Provide endpoints for easy file import -- both sync and async.""" client: Steamship def __init__(self, client: Steamship): self.client = client def _async_importer_for_url(self, url: str) -> Optional[str]: """Return the async importer plugin, if necessary.""" if "youtube.com" in url or "youtu.be" in url: return "youtube-transcript-importer" return None def _import_with_async_importer( self, url: str, importer_handle: str, mime_type: Optional[str] = None, tags: Optional[List[Tag]] = None, ) -> Tuple[File, Optional[Task]]: """Import a URL via an async FileImporter, returning a synchronous File but async import Task.""" if ("youtube" in url or "youtu.be" in url) and mime_type is None: # Mark it as audio so that the s2t will work later. mime_type = MimeTypes.TXT file = File.create(self.client, tags=tags, mime_type=mime_type) if importer_handle is None: raise SteamshipError( message=f"Unable to async auto-guess file importer for {url} and none was provided." ) file_importer = self.client.use_plugin(importer_handle) task = file.import_with_plugin( plugin_instance=file_importer.handle, url=url, mime_type=mime_type ) update_file_status(self.client, file, "Importing") return file, task def _scrape_and_import_url( self, url: str, mime_type: Optional[str] = None, tags: Optional[List[Tag]] = None, ) -> Tuple[File, Optional[Task]]: """Scrape and then import the URL to a File, returning a synchronous File and no import Task.""" if mime_type is None and ".pdf" in url: mime_type = MimeTypes.PDF response = requests.get(url) _bytes = response.content if not response.ok: msg = f"Error importing url {url}. Response was {response.text}" logging.error(msg) raise SteamshipError(message=msg) file = File.create(self.client, content=_bytes, mime_type=mime_type, tags=tags) return file, None
[docs] def import_url_to_file_and_task(self, url: str) -> Tuple[File, Optional[Task]]: """Import the provided URL, returning the file and optional task, if async work is required.""" async_importer_for_url = self._async_importer_for_url(url) source_tag = Tag(kind=DocTag.SOURCE, name=url) # Hacky way to get ta title title = url.split("/")[-1] title = title.split("?")[0] title = title.split("#")[0] title_tag = Tag(kind=DocTag.TITLE, name=title) tags = [source_tag, title_tag] if async_importer_for_url: return self._import_with_async_importer( url, async_importer_for_url, tags=tags, mime_type=None ) else: return self._scrape_and_import_url(url, tags=tags)
[docs] @post("/import_url") def import_url(self, url: str) -> File: """Import the URL to a Steamship File. Actual import will be scheduled async.""" file, task = self.import_url_to_file_and_task(url) return file
[docs] @post("/import_text") def import_text(self, text: str, mime_type: Optional[str]) -> File: """Import the text to a Steamship File.""" return File.create( self.client, content=text, mime_type=mime_type, tags=[Tag(kind=DocTag.SOURCE, name="local")], )