|
import re |
|
import requests |
|
import json |
|
import time |
|
from src.utils.config import settings |
|
|
|
|
|
def filter_response(response: str) -> str: |
|
"""Removes markdown formatting and unicode characters from a string. |
|
|
|
Args: |
|
response (str): The string to filter. |
|
|
|
Returns: |
|
str: The filtered string. |
|
""" |
|
response = re.sub(r"\*\*|__|~~|`", "", response) |
|
response = re.sub(r"[\U00010000-\U0010ffff]", "", response, flags=re.UNICODE) |
|
return response |
|
|
|
|
|
def warmup_llm(session: requests.Session, llm_model: str, llm_url: str): |
|
"""Sends a warmup request to the LLM server. |
|
|
|
Args: |
|
session (requests.Session): The requests session to use. |
|
llm_model (str): The name of the LLM model. |
|
llm_url (str): The URL of the LLM server. |
|
""" |
|
try: |
|
health = session.get("http://localhost:11434", timeout=3) |
|
if health.status_code != 200: |
|
print("Ollama not running! Start it first.") |
|
return |
|
|
|
session.post( |
|
llm_url, |
|
json={ |
|
"model": llm_model, |
|
"messages": [{"role": "user", "content": "."}], |
|
"context": [], |
|
"options": {"num_ctx": 64}, |
|
}, |
|
timeout=5, |
|
) |
|
|
|
except requests.RequestException as e: |
|
print(f"Warmup failed: {str(e)}") |
|
return |
|
|
|
|
|
def get_ai_response( |
|
session: requests.Session, |
|
messages: list, |
|
llm_model: str, |
|
llm_url: str, |
|
max_tokens: int, |
|
temperature: float = 0.7, |
|
stream: bool = False, |
|
): |
|
"""Sends a request to the LLM and returns a streaming iterator. |
|
|
|
Args: |
|
session (requests.Session): The requests session to use. |
|
messages (list): The list of messages to send to the LLM. |
|
llm_model (str): The name of the LLM model. |
|
llm_url (str): The URL of the LLM server. |
|
max_tokens (int): The maximum number of tokens to generate. |
|
temperature (float, optional): The temperature to use for generation. Defaults to 0.7. |
|
stream (bool, optional): Whether to stream the response. Defaults to False. |
|
|
|
Returns: |
|
iterator: An iterator over the streaming response. |
|
""" |
|
try: |
|
response = session.post( |
|
llm_url, |
|
json={ |
|
"model": llm_model, |
|
"messages": messages, |
|
"options": { |
|
"num_ctx": settings.MAX_TOKENS * 2, |
|
"num_thread": settings.NUM_THREADS, |
|
}, |
|
"stream": stream, |
|
}, |
|
timeout=3600, |
|
stream=stream, |
|
) |
|
response.raise_for_status() |
|
|
|
def streaming_iterator(): |
|
"""Iterates over the streaming response.""" |
|
try: |
|
for chunk in response.iter_content(chunk_size=512): |
|
if chunk: |
|
yield chunk |
|
else: |
|
yield b"\x00\x00" |
|
except Exception as e: |
|
print(f"\nError: {str(e)}") |
|
yield b"\x00\x00" |
|
|
|
return streaming_iterator() |
|
|
|
except Exception as e: |
|
print(f"\nError: {str(e)}") |
|
|
|
|
|
def parse_stream_chunk(chunk: bytes) -> dict: |
|
"""Parses a chunk of data from the LLM stream. |
|
|
|
Args: |
|
chunk (bytes): The chunk of data to parse. |
|
|
|
Returns: |
|
dict: A dictionary containing the parsed data. |
|
""" |
|
if not chunk: |
|
return {"keep_alive": True} |
|
|
|
try: |
|
text = chunk.decode("utf-8").strip() |
|
if text.startswith("data: "): |
|
text = text[6:] |
|
if text == "[DONE]": |
|
return {"choices": [{"finish_reason": "stop", "delta": {}}]} |
|
if text.startswith("{"): |
|
data = json.loads(text) |
|
content = "" |
|
if "message" in data: |
|
content = data["message"].get("content", "") |
|
elif "choices" in data and data["choices"]: |
|
choice = data["choices"][0] |
|
content = choice.get("delta", {}).get("content", "") or choice.get( |
|
"message", {} |
|
).get("content", "") |
|
|
|
if content: |
|
return {"choices": [{"delta": {"content": filter_response(content)}}]} |
|
return None |
|
|
|
except Exception as e: |
|
if str(e) != "Expecting value: line 1 column 2 (char 1)": |
|
print(f"Error parsing stream chunk: {str(e)}") |
|
return None |
|
|