Abdullah Al Asif
--base
78cb487
import re
import requests
import json
import time
from src.utils.config import settings
def filter_response(response: str) -> str:
"""Removes markdown formatting and unicode characters from a string.
Args:
response (str): The string to filter.
Returns:
str: The filtered string.
"""
response = re.sub(r"\*\*|__|~~|`", "", response)
response = re.sub(r"[\U00010000-\U0010ffff]", "", response, flags=re.UNICODE)
return response
def warmup_llm(session: requests.Session, llm_model: str, llm_url: str):
"""Sends a warmup request to the LLM server.
Args:
session (requests.Session): The requests session to use.
llm_model (str): The name of the LLM model.
llm_url (str): The URL of the LLM server.
"""
try:
health = session.get("http://localhost:11434", timeout=3)
if health.status_code != 200:
print("Ollama not running! Start it first.")
return
session.post(
llm_url,
json={
"model": llm_model,
"messages": [{"role": "user", "content": "."}],
"context": [],
"options": {"num_ctx": 64},
},
timeout=5,
)
except requests.RequestException as e:
print(f"Warmup failed: {str(e)}")
return
def get_ai_response(
session: requests.Session,
messages: list,
llm_model: str,
llm_url: str,
max_tokens: int,
temperature: float = 0.7,
stream: bool = False,
):
"""Sends a request to the LLM and returns a streaming iterator.
Args:
session (requests.Session): The requests session to use.
messages (list): The list of messages to send to the LLM.
llm_model (str): The name of the LLM model.
llm_url (str): The URL of the LLM server.
max_tokens (int): The maximum number of tokens to generate.
temperature (float, optional): The temperature to use for generation. Defaults to 0.7.
stream (bool, optional): Whether to stream the response. Defaults to False.
Returns:
iterator: An iterator over the streaming response.
"""
try:
response = session.post(
llm_url,
json={
"model": llm_model,
"messages": messages,
"options": {
"num_ctx": settings.MAX_TOKENS * 2,
"num_thread": settings.NUM_THREADS,
},
"stream": stream,
},
timeout=3600,
stream=stream,
)
response.raise_for_status()
def streaming_iterator():
"""Iterates over the streaming response."""
try:
for chunk in response.iter_content(chunk_size=512):
if chunk:
yield chunk
else:
yield b"\x00\x00"
except Exception as e:
print(f"\nError: {str(e)}")
yield b"\x00\x00"
return streaming_iterator()
except Exception as e:
print(f"\nError: {str(e)}")
def parse_stream_chunk(chunk: bytes) -> dict:
"""Parses a chunk of data from the LLM stream.
Args:
chunk (bytes): The chunk of data to parse.
Returns:
dict: A dictionary containing the parsed data.
"""
if not chunk:
return {"keep_alive": True}
try:
text = chunk.decode("utf-8").strip()
if text.startswith("data: "):
text = text[6:]
if text == "[DONE]":
return {"choices": [{"finish_reason": "stop", "delta": {}}]}
if text.startswith("{"):
data = json.loads(text)
content = ""
if "message" in data:
content = data["message"].get("content", "")
elif "choices" in data and data["choices"]:
choice = data["choices"][0]
content = choice.get("delta", {}).get("content", "") or choice.get(
"message", {}
).get("content", "")
if content:
return {"choices": [{"delta": {"content": filter_response(content)}}]}
return None
except Exception as e:
if str(e) != "Expecting value: line 1 column 2 (char 1)":
print(f"Error parsing stream chunk: {str(e)}")
return None