| import json |
| from typing import Optional, Union |
| from pydantic import BaseModel |
| from fastapi import APIRouter, Depends, HTTPException, status |
| import logging |
|
|
| from open_webui.apps.webui.models.knowledge import ( |
| Knowledges, |
| KnowledgeUpdateForm, |
| KnowledgeForm, |
| KnowledgeResponse, |
| ) |
| from open_webui.apps.webui.models.files import Files, FileModel |
| from open_webui.apps.retrieval.vector.connector import VECTOR_DB_CLIENT |
| from open_webui.apps.retrieval.main import process_file, ProcessFileForm |
|
|
|
|
| from open_webui.constants import ERROR_MESSAGES |
| from open_webui.utils.utils import get_admin_user, get_verified_user |
| from open_webui.env import SRC_LOG_LEVELS |
|
|
|
|
| log = logging.getLogger(__name__) |
| log.setLevel(SRC_LOG_LEVELS["MODELS"]) |
|
|
| router = APIRouter() |
|
|
| |
| |
| |
|
|
|
|
| @router.get( |
| "/", response_model=Optional[Union[list[KnowledgeResponse], KnowledgeResponse]] |
| ) |
| async def get_knowledge_items( |
| id: Optional[str] = None, user=Depends(get_verified_user) |
| ): |
| if id: |
| knowledge = Knowledges.get_knowledge_by_id(id=id) |
|
|
| if knowledge: |
| return knowledge |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
| else: |
| return [ |
| KnowledgeResponse(**knowledge.model_dump()) |
| for knowledge in Knowledges.get_knowledge_items() |
| ] |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/create", response_model=Optional[KnowledgeResponse]) |
| async def create_new_knowledge(form_data: KnowledgeForm, user=Depends(get_admin_user)): |
| knowledge = Knowledges.insert_new_knowledge(user.id, form_data) |
|
|
| if knowledge: |
| return knowledge |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.FILE_EXISTS, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class KnowledgeFilesResponse(KnowledgeResponse): |
| files: list[FileModel] |
|
|
|
|
| @router.get("/{id}", response_model=Optional[KnowledgeFilesResponse]) |
| async def get_knowledge_by_id(id: str, user=Depends(get_verified_user)): |
| knowledge = Knowledges.get_knowledge_by_id(id=id) |
|
|
| if knowledge: |
| file_ids = knowledge.data.get("file_ids", []) if knowledge.data else [] |
| files = Files.get_files_by_ids(file_ids) |
|
|
| return KnowledgeFilesResponse( |
| **knowledge.model_dump(), |
| files=files, |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_401_UNAUTHORIZED, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/{id}/update", response_model=Optional[KnowledgeFilesResponse]) |
| async def update_knowledge_by_id( |
| id: str, |
| form_data: KnowledgeUpdateForm, |
| user=Depends(get_admin_user), |
| ): |
| knowledge = Knowledges.update_knowledge_by_id(id=id, form_data=form_data) |
|
|
| if knowledge: |
| file_ids = knowledge.data.get("file_ids", []) if knowledge.data else [] |
| files = Files.get_files_by_ids(file_ids) |
|
|
| return KnowledgeFilesResponse( |
| **knowledge.model_dump(), |
| files=files, |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.ID_TAKEN, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| class KnowledgeFileIdForm(BaseModel): |
| file_id: str |
|
|
|
|
| @router.post("/{id}/file/add", response_model=Optional[KnowledgeFilesResponse]) |
| def add_file_to_knowledge_by_id( |
| id: str, |
| form_data: KnowledgeFileIdForm, |
| user=Depends(get_admin_user), |
| ): |
| knowledge = Knowledges.get_knowledge_by_id(id=id) |
| file = Files.get_file_by_id(form_data.file_id) |
| if not file: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
| if not file.data: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.FILE_NOT_PROCESSED, |
| ) |
|
|
| |
| try: |
| process_file(ProcessFileForm(file_id=form_data.file_id, collection_name=id)) |
| except Exception as e: |
| log.debug(e) |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=str(e), |
| ) |
|
|
| if knowledge: |
| data = knowledge.data or {} |
| file_ids = data.get("file_ids", []) |
|
|
| if form_data.file_id not in file_ids: |
| file_ids.append(form_data.file_id) |
| data["file_ids"] = file_ids |
|
|
| knowledge = Knowledges.update_knowledge_by_id( |
| id=id, form_data=KnowledgeUpdateForm(data=data) |
| ) |
|
|
| if knowledge: |
| files = Files.get_files_by_ids(file_ids) |
|
|
| return KnowledgeFilesResponse( |
| **knowledge.model_dump(), |
| files=files, |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.DEFAULT("knowledge"), |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.DEFAULT("file_id"), |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
|
|
| @router.post("/{id}/file/update", response_model=Optional[KnowledgeFilesResponse]) |
| def update_file_from_knowledge_by_id( |
| id: str, |
| form_data: KnowledgeFileIdForm, |
| user=Depends(get_admin_user), |
| ): |
| knowledge = Knowledges.get_knowledge_by_id(id=id) |
| file = Files.get_file_by_id(form_data.file_id) |
| if not file: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
| |
| VECTOR_DB_CLIENT.delete( |
| collection_name=knowledge.id, filter={"file_id": form_data.file_id} |
| ) |
|
|
| |
| try: |
| process_file(ProcessFileForm(file_id=form_data.file_id, collection_name=id)) |
| except Exception as e: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=str(e), |
| ) |
|
|
| if knowledge: |
| data = knowledge.data or {} |
| file_ids = data.get("file_ids", []) |
|
|
| files = Files.get_files_by_ids(file_ids) |
|
|
| return KnowledgeFilesResponse( |
| **knowledge.model_dump(), |
| files=files, |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/{id}/file/remove", response_model=Optional[KnowledgeFilesResponse]) |
| def remove_file_from_knowledge_by_id( |
| id: str, |
| form_data: KnowledgeFileIdForm, |
| user=Depends(get_admin_user), |
| ): |
| knowledge = Knowledges.get_knowledge_by_id(id=id) |
| file = Files.get_file_by_id(form_data.file_id) |
| if not file: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
| |
| VECTOR_DB_CLIENT.delete( |
| collection_name=knowledge.id, filter={"file_id": form_data.file_id} |
| ) |
|
|
| result = VECTOR_DB_CLIENT.query( |
| collection_name=knowledge.id, |
| filter={"file_id": form_data.file_id}, |
| ) |
|
|
| Files.delete_file_by_id(form_data.file_id) |
|
|
| if knowledge: |
| data = knowledge.data or {} |
| file_ids = data.get("file_ids", []) |
|
|
| if form_data.file_id in file_ids: |
| file_ids.remove(form_data.file_id) |
| data["file_ids"] = file_ids |
|
|
| knowledge = Knowledges.update_knowledge_by_id( |
| id=id, form_data=KnowledgeUpdateForm(data=data) |
| ) |
|
|
| if knowledge: |
| files = Files.get_files_by_ids(file_ids) |
|
|
| return KnowledgeFilesResponse( |
| **knowledge.model_dump(), |
| files=files, |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.DEFAULT("knowledge"), |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.DEFAULT("file_id"), |
| ) |
| else: |
| raise HTTPException( |
| status_code=status.HTTP_400_BAD_REQUEST, |
| detail=ERROR_MESSAGES.NOT_FOUND, |
| ) |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.post("/{id}/reset", response_model=Optional[KnowledgeResponse]) |
| async def reset_knowledge_by_id(id: str, user=Depends(get_admin_user)): |
| try: |
| VECTOR_DB_CLIENT.delete_collection(collection_name=id) |
| except Exception as e: |
| log.debug(e) |
| pass |
|
|
| knowledge = Knowledges.update_knowledge_by_id( |
| id=id, form_data=KnowledgeUpdateForm(data={"file_ids": []}) |
| ) |
| return knowledge |
|
|
|
|
| |
| |
| |
|
|
|
|
| @router.delete("/{id}/delete", response_model=bool) |
| async def delete_knowledge_by_id(id: str, user=Depends(get_admin_user)): |
| try: |
| VECTOR_DB_CLIENT.delete_collection(collection_name=id) |
| except Exception as e: |
| log.debug(e) |
| pass |
| result = Knowledges.delete_knowledge_by_id(id=id) |
| return result |
|
|