from __future__ import annotations
import io
import mimetypes
from enum import Enum
from pathlib import Path
from typing import TYPE_CHECKING, Any, List, Optional, Type, Union
from pydantic import BaseModel, Field
from steamship import MimeTypes, SteamshipError
from steamship.base.client import Client
from steamship.base.model import CamelModel
from steamship.base.request import GetRequest, IdentifierRequest, ListRequest, Request, SortOrder
from steamship.base.response import ListResponse, Response
from steamship.base.tasks import Task
from steamship.data.block import Block
from steamship.data.embeddings import EmbeddingIndex
from steamship.data.tags import Tag, TagKind
from steamship.data.tags.tag_constants import ProvenanceTag
from steamship.utils.binary_utils import flexi_create
if TYPE_CHECKING:
from steamship.data.operations.generator import GenerateResponse
from steamship.data.operations.tagger import TagResponse
[docs]
class FileUploadType(str, Enum):
FILE = "file" # A file uploaded as bytes or a string
FILE_IMPORTER = "fileImporter" # A fileImporter will be used to create the file
BLOCKS = "blocks" # Blocks are sent to create a file
NONE = "none" # Create an empty file
[docs]
class FileClearResponse(Response):
id: str
[docs]
class ListFileRequest(ListRequest):
pass
[docs]
class ListFileResponse(ListResponse):
files: List[File]
[docs]
class FileQueryRequest(Request):
tag_filter_query: str
[docs]
class File(CamelModel):
"""A file."""
client: Client = Field(None, exclude=True)
id: str = None
handle: str = None
mime_type: MimeTypes = None
workspace_id: str = None
blocks: List[Block] = []
tags: List[Tag] = []
filename: str = None
public_data: bool = False
[docs]
class CreateResponse(Response):
data_: Any = None
mime_type: str = None
def __init__(
self,
data: Any = None,
string: str = None,
_bytes: Union[bytes, io.BytesIO] = None,
json: io.BytesIO = None,
mime_type: str = None,
):
super().__init__()
data, mime_type, encoding = flexi_create(
data=data, string=string, json=json, _bytes=_bytes, mime_type=mime_type
)
self.data_ = data
self.mime_type = mime_type
[docs]
@classmethod
def parse_obj(cls: Type[BaseModel], obj: Any) -> Response:
obj["data"] = obj.get("data") or obj.get("data_")
if "data_" in obj:
del obj["data_"]
return super().parse_obj(obj)
[docs]
@classmethod
def parse_obj(cls: Type[BaseModel], obj: Any) -> BaseModel:
# TODO (enias): This needs to be solved at the engine side
obj = obj["file"] if "file" in obj else obj
return super().parse_obj(obj)
[docs]
def delete(self) -> File:
return self.client.post(
"file/delete",
IdentifierRequest(id=self.id),
expect=File,
)
[docs]
@staticmethod
def get(
client: Client,
_id: str = None,
handle: str = None,
) -> File:
return client.post(
"file/get",
IdentifierRequest(id=_id, handle=handle),
expect=File,
)
[docs]
@staticmethod
def create(
client: Client,
content: Union[str, bytes] = None,
mime_type: MimeTypes = None,
handle: str = None,
blocks: List[Block] = None,
tags: List[Tag] = None,
public_data: bool = False,
) -> File:
req = {
"handle": handle,
"mimeType": mime_type,
"publicData": public_data,
}
if content is None and blocks is None:
# Both none: empty file; to be imported later.
upload_type = FileUploadType.NONE
elif content is not None and blocks is not None:
# Both not none: unclear what to do; raise an exception
raise SteamshipError(
message="Please provide only `blocks` or `content` to `File.create`."
)
elif blocks is not None:
# Blocks
upload_type = FileUploadType.BLOCKS
req["blocks"] = [
block.dict(by_alias=True, exclude_unset=True, exclude_none=True)
for block in blocks or []
]
elif content is not None:
upload_type = FileUploadType.FILE
else:
raise Exception("Unable to determine upload type.")
req["type"] = upload_type
if tags:
req["tags"] = [
tag.dict(by_alias=True, exclude_unset=True, exclude_none=True) for tag in tags or []
]
file_data = (
("file-part", content, "multipart/form-data")
if upload_type == FileUploadType.FILE
else None
)
# Defaulting this here, as opposed to in the Engine, because it is processed by Vapor
return client.post(
"file/create",
payload=req,
file=file_data,
expect=File,
)
[docs]
@staticmethod
def create_with_plugin(
client: Client,
plugin_instance: str,
url: str = None,
mime_type: str = None,
) -> Task[File]:
req = {
"type": FileUploadType.FILE_IMPORTER,
"url": url,
"mimeType": mime_type,
"pluginInstance": plugin_instance,
}
return client.post("file/create", payload=req, expect=File, as_background_task=True)
[docs]
def import_with_plugin(
self,
plugin_instance: str,
url: str = None,
mime_type: str = None,
) -> Task[File]:
"""Run an import operation on an (empty) file object that has already been created."""
req = {
"type": FileUploadType.FILE_IMPORTER,
"id": self.id,
"url": url,
"mimeType": mime_type,
"pluginInstance": plugin_instance,
}
return self.client.post("file/import", payload=req, expect=File, as_background_task=True)
[docs]
def refresh(self) -> File:
refreshed = File.get(self.client, self.id)
self.__init__(**refreshed.dict())
self.client = refreshed.client
for block in self.blocks:
block.client = self.client
return self
[docs]
@staticmethod
def query(
client: Client,
tag_filter_query: str,
) -> FileQueryResponse:
req = FileQueryRequest(tag_filter_query=tag_filter_query)
res = client.post(
"file/query",
payload=req,
expect=FileQueryResponse,
)
return res
[docs]
def raw(self):
return self.client.post(
"file/raw",
payload=GetRequest(
id=self.id,
),
raw_response=True,
)
@property
def raw_data_url(self) -> Optional[str]:
"""Return a URL at which the data content of this File can be accessed. If public_data is True,
this content can be accessed without an API key.
"""
if self.client is not None:
return f"{self.client.config.api_base}file/{self.id}/raw"
else:
return None
[docs]
def blockify(self, plugin_instance: str = None, wait_on_tasks: List[Task] = None) -> Task:
from steamship.data.operations.blockifier import BlockifyRequest
from steamship.plugin.outputs.block_and_tag_plugin_output import BlockAndTagPluginOutput
req = BlockifyRequest(type="file", id=self.id, plugin_instance=plugin_instance)
return self.client.post(
"plugin/instance/blockify",
payload=req,
expect=BlockAndTagPluginOutput,
wait_on_tasks=wait_on_tasks,
)
[docs]
def tag(
self,
plugin_instance: str = None,
wait_on_tasks: List[Task] = None,
) -> Task[TagResponse]:
from steamship.data.operations.tagger import TagRequest, TagResponse
from steamship.data.plugin import PluginTargetType
req = TagRequest(type=PluginTargetType.FILE, id=self.id, plugin_instance=plugin_instance)
return self.client.post(
"plugin/instance/tag", payload=req, expect=TagResponse, wait_on_tasks=wait_on_tasks
)
[docs]
def generate(
self,
plugin_instance_handle: str,
start_block_index: int = None,
end_block_index: Optional[int] = None,
block_index_list: Optional[List[int]] = None,
append_output_to_file: bool = True,
options: Optional[dict] = None,
wait_on_tasks: List[Task] = None,
make_output_public: bool = False,
streaming: Optional[bool] = False,
) -> Task[GenerateResponse]:
"""Generate new content from this file. Assumes this file as context for input and output. May specify start and end blocks."""
from steamship.data.operations.generator import GenerateRequest, GenerateResponse
if append_output_to_file:
output_file_id = self.id
else:
output_file_id = None
req = GenerateRequest(
plugin_instance=plugin_instance_handle,
input_file_id=self.id,
input_file_start_block_index=start_block_index,
input_file_end_block_index=end_block_index,
input_file_block_index_list=block_index_list,
append_output_to_file=append_output_to_file,
output_file_id=output_file_id,
options=options,
make_output_public=make_output_public,
streaming=streaming,
)
return self.client.post(
"plugin/instance/generate", req, expect=GenerateResponse, wait_on_tasks=wait_on_tasks
)
[docs]
def index(self, plugin_instance: Any = None) -> EmbeddingIndex:
"""Index every block in the file.
TODO(ted): Enable indexing the results of a tag query.
TODO(ted): It's hard to load the EmbeddingIndexPluginInstance with just a handle because of the chain
of things that need to be created to it to function."""
# Preserve the prior behavior of embedding the full text of each block.
tags = [
Tag(text=block.text, file_id=self.id, block_id=block.id, kind="block")
for block in self.blocks or []
]
return plugin_instance.insert(tags)
[docs]
@staticmethod
def list(
client: Client,
page_size: Optional[int] = None,
page_token: Optional[str] = None,
sort_order: Optional[SortOrder] = SortOrder.DESC,
) -> ListFileResponse:
return client.post(
"file/list",
ListFileRequest(pageSize=page_size, pageToken=page_token, sortOrder=sort_order),
expect=ListFileResponse,
)
[docs]
def append_block(
self,
text: str = None,
tags: List[Tag] = None,
content: Union[str, bytes] = None,
url: Optional[str] = None,
mime_type: Optional[MimeTypes] = None,
public_data: bool = False,
) -> Block:
"""Append a new block to this File. This is a convenience wrapper around
Block.create(). You should provide only one of text, content, or url.
This is a server-side operation, saving the new Block to the file. The new block
is appended to this client-side File as well for convenience.
"""
block = Block.create(
client=self.client,
file_id=self.id,
text=text,
tags=tags,
content=content,
url=url,
mime_type=mime_type,
public_data=public_data,
)
if (
self.blocks is not None
and len(self.blocks) > 0
and block.index_in_file == self.blocks[-1].index_in_file + 1
):
self.blocks.append(block)
else:
self.refresh()
return block
[docs]
def set_public_data(self, public_data: bool):
"""Set the public_data flag on this File. If this object already exists server-side, update the flag."""
self.public_data = public_data
if self.client is not None and self.id is not None:
req = {
"id": self.id,
"publicData": self.public_data,
}
return self.client.post("file/update", payload=req, expect=File)
[docs]
@staticmethod
def from_local(
client: Client,
file_path: str,
mime_type: MimeTypes = None,
handle: str = None,
tags: List[Tag] = None,
public_data: bool = False,
) -> Any:
"""Loads a local file into a Steamship File.
NOTE: the `file_path` should be relative to where the call to `from_local` is happening.
Loaded files will automatically be tagged with a provenance tag.
Args:
client: Steamship client for the workspace
file_path: Location of the file to upload **relative** to the current directory of the client
mime_type: Optional specification of a particular mime type. If not provided, a guess will be made.
handle: Intended handle (for lookups, etc.) for Steamship File
tags: Metadata to add to the Steamship File
public_data: Whether to make the Steamship File publicly-accessible
"""
full_path = Path(file_path).resolve()
if not mime_type:
mime, _ = mimetypes.guess_type(file_path, strict=False)
if MimeTypes.has_value(mime):
mime_type = MimeTypes(mime)
_tags = [
Tag(kind=TagKind.PROVENANCE, name=ProvenanceTag.FILE, value={"file_path": file_path})
]
if tags:
_tags.extend(tags)
with full_path.open("rb") as file:
return File.create(
client=client,
content=file.read(),
mime_type=mime_type,
handle=handle,
tags=_tags,
public_data=public_data,
)
[docs]
class FileQueryResponse(Response):
files: List[File]
ListFileResponse.update_forward_refs()