Spaces:
Running
Running
from typing import Optional | |
from open_webui.models.prompts import ( | |
PromptForm, | |
PromptUserResponse, | |
PromptModel, | |
Prompts, | |
) | |
from open_webui.constants import ERROR_MESSAGES | |
from fastapi import APIRouter, Depends, HTTPException, status, Request | |
from open_webui.utils.auth import get_admin_user, get_verified_user | |
from open_webui.utils.access_control import has_access, has_permission | |
router = APIRouter() | |
############################ | |
# GetPrompts | |
############################ | |
async def get_prompts(user=Depends(get_verified_user)): | |
if user.role == "admin": | |
prompts = Prompts.get_prompts() | |
else: | |
prompts = Prompts.get_prompts_by_user_id(user.id, "read") | |
return prompts | |
async def get_prompt_list(user=Depends(get_verified_user)): | |
if user.role == "admin": | |
prompts = Prompts.get_prompts() | |
else: | |
prompts = Prompts.get_prompts_by_user_id(user.id, "write") | |
return prompts | |
############################ | |
# CreateNewPrompt | |
############################ | |
async def create_new_prompt( | |
request: Request, form_data: PromptForm, user=Depends(get_verified_user) | |
): | |
if user.role != "admin" and not has_permission( | |
user.id, "workspace.prompts", request.app.state.config.USER_PERMISSIONS | |
): | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.UNAUTHORIZED, | |
) | |
prompt = Prompts.get_prompt_by_command(form_data.command) | |
if prompt is None: | |
prompt = Prompts.insert_new_prompt(user.id, form_data) | |
if prompt: | |
return prompt | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.DEFAULT(), | |
) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, | |
detail=ERROR_MESSAGES.COMMAND_TAKEN, | |
) | |
############################ | |
# GetPromptByCommand | |
############################ | |
async def get_prompt_by_command(command: str, user=Depends(get_verified_user)): | |
prompt = Prompts.get_prompt_by_command(f"/{command}") | |
if prompt: | |
if ( | |
user.role == "admin" | |
or prompt.user_id == user.id | |
or has_access(user.id, "read", prompt.access_control) | |
): | |
return prompt | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
############################ | |
# UpdatePromptByCommand | |
############################ | |
async def update_prompt_by_command( | |
command: str, | |
form_data: PromptForm, | |
user=Depends(get_verified_user), | |
): | |
prompt = Prompts.get_prompt_by_command(f"/{command}") | |
if not prompt: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
if prompt.user_id != user.id and user.role != "admin": | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.ACCESS_PROHIBITED, | |
) | |
prompt = Prompts.update_prompt_by_command(f"/{command}", form_data) | |
if prompt: | |
return prompt | |
else: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.ACCESS_PROHIBITED, | |
) | |
############################ | |
# DeletePromptByCommand | |
############################ | |
async def delete_prompt_by_command(command: str, user=Depends(get_verified_user)): | |
prompt = Prompts.get_prompt_by_command(f"/{command}") | |
if not prompt: | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.NOT_FOUND, | |
) | |
if prompt.user_id != user.id and user.role != "admin": | |
raise HTTPException( | |
status_code=status.HTTP_401_UNAUTHORIZED, | |
detail=ERROR_MESSAGES.ACCESS_PROHIBITED, | |
) | |
result = Prompts.delete_prompt_by_command(f"/{command}") | |
return result | |