| import json |
| import time |
| import uuid |
| from typing import Optional |
|
|
| from open_webui.apps.webui.internal.db import Base, get_db |
| from pydantic import BaseModel, ConfigDict |
| from sqlalchemy import BigInteger, Boolean, Column, String, Text, JSON |
| from sqlalchemy import or_, func, select |
|
|
|
|
| |
| |
| |
|
|
|
|
| class Chat(Base): |
| __tablename__ = "chat" |
|
|
| id = Column(String, primary_key=True) |
| user_id = Column(String) |
| title = Column(Text) |
| chat = Column(JSON) |
|
|
| created_at = Column(BigInteger) |
| updated_at = Column(BigInteger) |
|
|
| share_id = Column(Text, unique=True, nullable=True) |
| archived = Column(Boolean, default=False) |
|
|
|
|
| class ChatModel(BaseModel): |
| model_config = ConfigDict(from_attributes=True) |
|
|
| id: str |
| user_id: str |
| title: str |
| chat: dict |
|
|
| created_at: int |
| updated_at: int |
|
|
| share_id: Optional[str] = None |
| archived: bool = False |
|
|
|
|
| |
| |
| |
|
|
|
|
| class ChatForm(BaseModel): |
| chat: dict |
|
|
|
|
| class ChatTitleForm(BaseModel): |
| title: str |
|
|
|
|
| class ChatResponse(BaseModel): |
| id: str |
| user_id: str |
| title: str |
| chat: dict |
| updated_at: int |
| created_at: int |
| share_id: Optional[str] = None |
| archived: bool |
|
|
|
|
| class ChatTitleIdResponse(BaseModel): |
| id: str |
| title: str |
| updated_at: int |
| created_at: int |
|
|
|
|
| class ChatTable: |
| def insert_new_chat(self, user_id: str, form_data: ChatForm) -> Optional[ChatModel]: |
| with get_db() as db: |
| id = str(uuid.uuid4()) |
| chat = ChatModel( |
| **{ |
| "id": id, |
| "user_id": user_id, |
| "title": ( |
| form_data.chat["title"] |
| if "title" in form_data.chat |
| else "New Chat" |
| ), |
| "chat": form_data.chat, |
| "created_at": int(time.time()), |
| "updated_at": int(time.time()), |
| } |
| ) |
|
|
| result = Chat(**chat.model_dump()) |
| db.add(result) |
| db.commit() |
| db.refresh(result) |
| return ChatModel.model_validate(result) if result else None |
|
|
| def update_chat_by_id(self, id: str, chat: dict) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat_item = db.get(Chat, id) |
| chat_item.chat = chat |
| chat_item.title = chat["title"] if "title" in chat else "New Chat" |
| chat_item.updated_at = int(time.time()) |
| db.commit() |
| db.refresh(chat_item) |
|
|
| return ChatModel.model_validate(chat_item) |
| except Exception: |
| return None |
|
|
| def insert_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]: |
| with get_db() as db: |
| |
| chat = db.get(Chat, chat_id) |
| |
| if chat.share_id: |
| return self.get_chat_by_id_and_user_id(chat.share_id, "shared") |
| |
| shared_chat = ChatModel( |
| **{ |
| "id": str(uuid.uuid4()), |
| "user_id": f"shared-{chat_id}", |
| "title": chat.title, |
| "chat": chat.chat, |
| "created_at": chat.created_at, |
| "updated_at": int(time.time()), |
| } |
| ) |
| shared_result = Chat(**shared_chat.model_dump()) |
| db.add(shared_result) |
| db.commit() |
| db.refresh(shared_result) |
|
|
| |
| result = ( |
| db.query(Chat) |
| .filter_by(id=chat_id) |
| .update({"share_id": shared_chat.id}) |
| ) |
| db.commit() |
| return shared_chat if (shared_result and result) else None |
|
|
| def update_shared_chat_by_chat_id(self, chat_id: str) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| print("update_shared_chat_by_id") |
| chat = db.get(Chat, chat_id) |
| print(chat) |
| chat.title = chat.title |
| chat.chat = chat.chat |
| db.commit() |
| db.refresh(chat) |
|
|
| return self.get_chat_by_id(chat.share_id) |
| except Exception: |
| return None |
|
|
| def delete_shared_chat_by_chat_id(self, chat_id: str) -> bool: |
| try: |
| with get_db() as db: |
| db.query(Chat).filter_by(user_id=f"shared-{chat_id}").delete() |
| db.commit() |
|
|
| return True |
| except Exception: |
| return False |
|
|
| def update_chat_share_id_by_id( |
| self, id: str, share_id: Optional[str] |
| ) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat = db.get(Chat, id) |
| chat.share_id = share_id |
| db.commit() |
| db.refresh(chat) |
| return ChatModel.model_validate(chat) |
| except Exception: |
| return None |
|
|
| def toggle_chat_archive_by_id(self, id: str) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat = db.get(Chat, id) |
| chat.archived = not chat.archived |
| db.commit() |
| db.refresh(chat) |
| return ChatModel.model_validate(chat) |
| except Exception: |
| return None |
|
|
| def archive_all_chats_by_user_id(self, user_id: str) -> bool: |
| try: |
| with get_db() as db: |
| db.query(Chat).filter_by(user_id=user_id).update({"archived": True}) |
| db.commit() |
| return True |
| except Exception: |
| return False |
|
|
| def get_archived_chat_list_by_user_id( |
| self, user_id: str, skip: int = 0, limit: int = 50 |
| ) -> list[ChatModel]: |
| with get_db() as db: |
| all_chats = ( |
| db.query(Chat) |
| .filter_by(user_id=user_id, archived=True) |
| .order_by(Chat.updated_at.desc()) |
| |
| .all() |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_chat_list_by_user_id( |
| self, |
| user_id: str, |
| include_archived: bool = False, |
| skip: int = 0, |
| limit: int = 50, |
| ) -> list[ChatModel]: |
| with get_db() as db: |
| query = db.query(Chat).filter_by(user_id=user_id) |
| if not include_archived: |
| query = query.filter_by(archived=False) |
| all_chats = ( |
| query.order_by(Chat.updated_at.desc()) |
| |
| .all() |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_chat_title_id_list_by_user_id( |
| self, |
| user_id: str, |
| include_archived: bool = False, |
| skip: Optional[int] = None, |
| limit: Optional[int] = None, |
| ) -> list[ChatTitleIdResponse]: |
| with get_db() as db: |
| query = db.query(Chat).filter_by(user_id=user_id) |
| if not include_archived: |
| query = query.filter_by(archived=False) |
|
|
| query = query.order_by(Chat.updated_at.desc()).with_entities( |
| Chat.id, Chat.title, Chat.updated_at, Chat.created_at |
| ) |
|
|
| if skip: |
| query = query.offset(skip) |
| if limit: |
| query = query.limit(limit) |
|
|
| all_chats = query.all() |
|
|
| |
| return [ |
| ChatTitleIdResponse.model_validate( |
| { |
| "id": chat[0], |
| "title": chat[1], |
| "updated_at": chat[2], |
| "created_at": chat[3], |
| } |
| ) |
| for chat in all_chats |
| ] |
|
|
| def get_chat_list_by_chat_ids( |
| self, chat_ids: list[str], skip: int = 0, limit: int = 50 |
| ) -> list[ChatModel]: |
| with get_db() as db: |
| all_chats = ( |
| db.query(Chat) |
| .filter(Chat.id.in_(chat_ids)) |
| .filter_by(archived=False) |
| .order_by(Chat.updated_at.desc()) |
| .all() |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_chat_by_id(self, id: str) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat = db.get(Chat, id) |
| return ChatModel.model_validate(chat) |
| except Exception: |
| return None |
|
|
| def get_chat_by_share_id(self, id: str) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat = db.query(Chat).filter_by(share_id=id).first() |
|
|
| if chat: |
| return self.get_chat_by_id(id) |
| else: |
| return None |
| except Exception: |
| return None |
|
|
| def get_chat_by_id_and_user_id(self, id: str, user_id: str) -> Optional[ChatModel]: |
| try: |
| with get_db() as db: |
| chat = db.query(Chat).filter_by(id=id, user_id=user_id).first() |
| return ChatModel.model_validate(chat) |
| except Exception: |
| return None |
|
|
| def get_chats(self, skip: int = 0, limit: int = 50) -> list[ChatModel]: |
| with get_db() as db: |
| all_chats = ( |
| db.query(Chat) |
| |
| .order_by(Chat.updated_at.desc()) |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_chats_by_user_id(self, user_id: str) -> list[ChatModel]: |
| with get_db() as db: |
| all_chats = ( |
| db.query(Chat) |
| .filter_by(user_id=user_id) |
| .order_by(Chat.updated_at.desc()) |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_archived_chats_by_user_id(self, user_id: str) -> list[ChatModel]: |
| with get_db() as db: |
| all_chats = ( |
| db.query(Chat) |
| .filter_by(user_id=user_id, archived=True) |
| .order_by(Chat.updated_at.desc()) |
| ) |
| return [ChatModel.model_validate(chat) for chat in all_chats] |
|
|
| def get_chats_by_user_id_and_search_text( |
| self, |
| user_id: str, |
| search_text: str, |
| include_archived: bool = False, |
| skip: int = 0, |
| limit: int = 60, |
| ) -> list[ChatModel]: |
| """ |
| Filters chats based on a search query using Python, allowing pagination using skip and limit. |
| """ |
| search_text = search_text.lower().strip() |
| if not search_text: |
| return self.get_chat_list_by_user_id(user_id, include_archived, skip, limit) |
|
|
| with get_db() as db: |
| query = db.query(Chat).filter(Chat.user_id == user_id) |
|
|
| if not include_archived: |
| query = query.filter(Chat.archived == False) |
|
|
| |
| all_chats = query.all() |
|
|
| |
| filtered_chats = [] |
| for chat in all_chats: |
| |
| title_matches = search_text in chat.title.lower() |
|
|
| |
| content_matches = any( |
| search_text in message.get("content", "").lower() |
| for message in chat.chat.get("messages", []) |
| if "content" in message |
| ) |
|
|
| if title_matches or content_matches: |
| filtered_chats.append(chat) |
|
|
| |
| paginated_chats = filtered_chats[skip : skip + limit] |
| return [ChatModel.model_validate(chat) for chat in paginated_chats] |
|
|
| def delete_chat_by_id(self, id: str) -> bool: |
| try: |
| with get_db() as db: |
| db.query(Chat).filter_by(id=id).delete() |
| db.commit() |
|
|
| return True and self.delete_shared_chat_by_chat_id(id) |
| except Exception: |
| return False |
|
|
| def delete_chat_by_id_and_user_id(self, id: str, user_id: str) -> bool: |
| try: |
| with get_db() as db: |
| db.query(Chat).filter_by(id=id, user_id=user_id).delete() |
| db.commit() |
|
|
| return True and self.delete_shared_chat_by_chat_id(id) |
| except Exception: |
| return False |
|
|
| def delete_chats_by_user_id(self, user_id: str) -> bool: |
| try: |
| with get_db() as db: |
| self.delete_shared_chats_by_user_id(user_id) |
|
|
| db.query(Chat).filter_by(user_id=user_id).delete() |
| db.commit() |
|
|
| return True |
| except Exception: |
| return False |
|
|
| def delete_shared_chats_by_user_id(self, user_id: str) -> bool: |
| try: |
| with get_db() as db: |
| chats_by_user = db.query(Chat).filter_by(user_id=user_id).all() |
| shared_chat_ids = [f"shared-{chat.id}" for chat in chats_by_user] |
|
|
| db.query(Chat).filter(Chat.user_id.in_(shared_chat_ids)).delete() |
| db.commit() |
|
|
| return True |
| except Exception: |
| return False |
|
|
|
|
| Chats = ChatTable() |
|
|