Spaces:
Running
Running
import json | |
import logging | |
from typing import Optional | |
from fastapi import APIRouter, Depends, HTTPException, Request, status, BackgroundTasks | |
from pydantic import BaseModel | |
from open_webui.socket.main import sio, get_user_ids_from_room | |
from open_webui.models.users import Users, UserNameResponse | |
from open_webui.models.channels import Channels, ChannelModel, ChannelForm | |
from open_webui.models.messages import ( | |
Messages, | |
MessageModel, | |
MessageResponse, | |
MessageForm, | |
) | |
from open_webui.config import ENABLE_ADMIN_CHAT_ACCESS, ENABLE_ADMIN_EXPORT | |
from open_webui.constants import ERROR_MESSAGES | |
from open_webui.env import SRC_LOG_LEVELS | |
from open_webui.utils.auth import get_admin_user, get_verified_user | |
from open_webui.utils.access_control import has_access, get_users_with_access | |
from open_webui.utils.webhook import post_webhook | |
log = logging.getLogger(__name__) | |
log.setLevel(SRC_LOG_LEVELS["MODELS"]) | |
router = APIRouter() | |
############################ | |
# GetChatList | |
############################ | |
async def get_channels(user=Depends(get_verified_user)): | |
if user.role == "admin": | |
return Channels.get_channels() | |
else: | |
return Channels.get_channels_by_user_id(user.id) | |
############################ | |
# CreateNewChannel | |
############################ | |
async def create_new_channel(form_data: ChannelForm, user=Depends(get_admin_user)): | |
try: | |
channel = Channels.insert_new_channel(None, form_data, user.id) | |
return ChannelModel(**channel.model_dump()) | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# GetChannelById | |
############################ | |
async def get_channel_by_id(id: str, user=Depends(get_verified_user)): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
return ChannelModel(**channel.model_dump()) | |
############################ | |
# UpdateChannelById | |
############################ | |
async def update_channel_by_id( | |
id: str, form_data: ChannelForm, user=Depends(get_admin_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
try: | |
channel = Channels.update_channel_by_id(id, form_data) | |
return ChannelModel(**channel.model_dump()) | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# DeleteChannelById | |
############################ | |
async def delete_channel_by_id(id: str, user=Depends(get_admin_user)): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
try: | |
Channels.delete_channel_by_id(id) | |
return True | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# GetChannelMessages | |
############################ | |
class MessageUserResponse(MessageResponse): | |
user: UserNameResponse | |
async def get_channel_messages( | |
id: str, skip: int = 0, limit: int = 50, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message_list = Messages.get_messages_by_channel_id(id, skip, limit) | |
users = {} | |
messages = [] | |
for message in message_list: | |
if message.user_id not in users: | |
user = Users.get_user_by_id(message.user_id) | |
users[message.user_id] = user | |
replies = Messages.get_replies_by_message_id(message.id) | |
latest_reply_at = replies[0].created_at if replies else None | |
messages.append( | |
MessageUserResponse( | |
**{ | |
**message.model_dump(), | |
"reply_count": len(replies), | |
"latest_reply_at": latest_reply_at, | |
"reactions": Messages.get_reactions_by_message_id(message.id), | |
"user": UserNameResponse(**users[message.user_id].model_dump()), | |
} | |
) | |
) | |
return messages | |
############################ | |
# PostNewMessage | |
############################ | |
async def send_notification(webui_url, channel, message, active_user_ids): | |
users = get_users_with_access("read", channel.access_control) | |
for user in users: | |
if user.id in active_user_ids: | |
continue | |
else: | |
if user.settings: | |
webhook_url = user.settings.ui.get("notifications", {}).get( | |
"webhook_url", None | |
) | |
if webhook_url: | |
post_webhook( | |
webhook_url, | |
f"#{channel.name} - {webui_url}/channels/{channel.id}\n\n{message.content}", | |
{ | |
"action": "channel", | |
"message": message.content, | |
"title": channel.name, | |
"url": f"{webui_url}/channels/{channel.id}", | |
}, | |
) | |
async def post_new_message( | |
request: Request, | |
id: str, | |
form_data: MessageForm, | |
background_tasks: BackgroundTasks, | |
user=Depends(get_verified_user), | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
try: | |
message = Messages.insert_new_message(form_data, channel.id, user.id) | |
if message: | |
event_data = { | |
"channel_id": channel.id, | |
"message_id": message.id, | |
"data": { | |
"type": "message", | |
"data": MessageUserResponse( | |
**{ | |
**message.model_dump(), | |
"reply_count": 0, | |
"latest_reply_at": None, | |
"reactions": Messages.get_reactions_by_message_id( | |
message.id | |
), | |
"user": UserNameResponse(**user.model_dump()), | |
} | |
).model_dump(), | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
} | |
await sio.emit( | |
"channel-events", | |
event_data, | |
to=f"channel:{channel.id}", | |
) | |
if message.parent_id: | |
# If this message is a reply, emit to the parent message as well | |
parent_message = Messages.get_message_by_id(message.parent_id) | |
if parent_message: | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": parent_message.id, | |
"data": { | |
"type": "message:reply", | |
"data": MessageUserResponse( | |
**{ | |
**parent_message.model_dump(), | |
"user": UserNameResponse( | |
**Users.get_user_by_id( | |
parent_message.user_id | |
).model_dump() | |
), | |
} | |
).model_dump(), | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
active_user_ids = get_user_ids_from_room(f"channel:{channel.id}") | |
background_tasks.add_task( | |
send_notification, | |
request.app.state.config.WEBUI_URL, | |
channel, | |
message, | |
active_user_ids, | |
) | |
return MessageModel(**message.model_dump()) | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# GetChannelMessage | |
############################ | |
async def get_channel_message( | |
id: str, message_id: str, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message = Messages.get_message_by_id(message_id) | |
if not message: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if message.channel_id != id: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
return MessageUserResponse( | |
**{ | |
**message.model_dump(), | |
"user": UserNameResponse( | |
**Users.get_user_by_id(message.user_id).model_dump() | |
), | |
} | |
) | |
############################ | |
# GetChannelThreadMessages | |
############################ | |
async def get_channel_thread_messages( | |
id: str, | |
message_id: str, | |
skip: int = 0, | |
limit: int = 50, | |
user=Depends(get_verified_user), | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message_list = Messages.get_messages_by_parent_id(id, message_id, skip, limit) | |
users = {} | |
messages = [] | |
for message in message_list: | |
if message.user_id not in users: | |
user = Users.get_user_by_id(message.user_id) | |
users[message.user_id] = user | |
messages.append( | |
MessageUserResponse( | |
**{ | |
**message.model_dump(), | |
"reply_count": 0, | |
"latest_reply_at": None, | |
"reactions": Messages.get_reactions_by_message_id(message.id), | |
"user": UserNameResponse(**users[message.user_id].model_dump()), | |
} | |
) | |
) | |
return messages | |
############################ | |
# UpdateMessageById | |
############################ | |
async def update_message_by_id( | |
id: str, message_id: str, form_data: MessageForm, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message = Messages.get_message_by_id(message_id) | |
if not message: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if message.channel_id != id: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
try: | |
message = Messages.update_message_by_id(message_id, form_data) | |
message = Messages.get_message_by_id(message_id) | |
if message: | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": message.id, | |
"data": { | |
"type": "message:update", | |
"data": MessageUserResponse( | |
**{ | |
**message.model_dump(), | |
"user": UserNameResponse( | |
**user.model_dump() | |
).model_dump(), | |
} | |
).model_dump(), | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
return MessageModel(**message.model_dump()) | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# AddReactionToMessage | |
############################ | |
class ReactionForm(BaseModel): | |
name: str | |
async def add_reaction_to_message( | |
id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message = Messages.get_message_by_id(message_id) | |
if not message: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if message.channel_id != id: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
try: | |
Messages.add_reaction_to_message(message_id, user.id, form_data.name) | |
message = Messages.get_message_by_id(message_id) | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": message.id, | |
"data": { | |
"type": "message:reaction:add", | |
"data": { | |
**message.model_dump(), | |
"user": UserNameResponse( | |
**Users.get_user_by_id(message.user_id).model_dump() | |
).model_dump(), | |
"name": form_data.name, | |
}, | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
return True | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# RemoveReactionById | |
############################ | |
async def remove_reaction_by_id_and_user_id_and_name( | |
id: str, message_id: str, form_data: ReactionForm, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message = Messages.get_message_by_id(message_id) | |
if not message: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if message.channel_id != id: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
try: | |
Messages.remove_reaction_by_id_and_user_id_and_name( | |
message_id, user.id, form_data.name | |
) | |
message = Messages.get_message_by_id(message_id) | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": message.id, | |
"data": { | |
"type": "message:reaction:remove", | |
"data": { | |
**message.model_dump(), | |
"user": UserNameResponse( | |
**Users.get_user_by_id(message.user_id).model_dump() | |
).model_dump(), | |
"name": form_data.name, | |
}, | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
return True | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
############################ | |
# DeleteMessageById | |
############################ | |
async def delete_message_by_id( | |
id: str, message_id: str, user=Depends(get_verified_user) | |
): | |
channel = Channels.get_channel_by_id(id) | |
if not channel: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if user.role != "admin" and not has_access( | |
user.id, type="read", access_control=channel.access_control | |
): | |
raise HTTPException( | |
status_code=status.HTTP_403_FORBIDDEN, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
message = Messages.get_message_by_id(message_id) | |
if not message: | |
raise HTTPException( | |
status_code=status.HTTP_404_NOT_FOUND, detail=ERROR_MESSAGES.NOT_FOUND | |
) | |
if message.channel_id != id: | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |
try: | |
Messages.delete_message_by_id(message_id) | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": message.id, | |
"data": { | |
"type": "message:delete", | |
"data": { | |
**message.model_dump(), | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
}, | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
if message.parent_id: | |
# If this message is a reply, emit to the parent message as well | |
parent_message = Messages.get_message_by_id(message.parent_id) | |
if parent_message: | |
await sio.emit( | |
"channel-events", | |
{ | |
"channel_id": channel.id, | |
"message_id": parent_message.id, | |
"data": { | |
"type": "message:reply", | |
"data": MessageUserResponse( | |
**{ | |
**parent_message.model_dump(), | |
"user": UserNameResponse( | |
**Users.get_user_by_id( | |
parent_message.user_id | |
).model_dump() | |
), | |
} | |
).model_dump(), | |
}, | |
"user": UserNameResponse(**user.model_dump()).model_dump(), | |
"channel": channel.model_dump(), | |
}, | |
to=f"channel:{channel.id}", | |
) | |
return True | |
except Exception as e: | |
log.exception(e) | |
raise HTTPException( | |
status_code=status.HTTP_400_BAD_REQUEST, detail=ERROR_MESSAGES.DEFAULT() | |
) | |