import re import requests import json import time from src.utils.config import settings def filter_response(response: str) -> str: """Removes markdown formatting and unicode characters from a string. Args: response (str): The string to filter. Returns: str: The filtered string. """ response = re.sub(r"\*\*|__|~~|`", "", response) response = re.sub(r"[\U00010000-\U0010ffff]", "", response, flags=re.UNICODE) return response def warmup_llm(session: requests.Session, llm_model: str, llm_url: str): """Sends a warmup request to the LLM server. Args: session (requests.Session): The requests session to use. llm_model (str): The name of the LLM model. llm_url (str): The URL of the LLM server. """ try: health = session.get("http://localhost:11434", timeout=3) if health.status_code != 200: print("Ollama not running! Start it first.") return session.post( llm_url, json={ "model": llm_model, "messages": [{"role": "user", "content": "."}], "context": [], "options": {"num_ctx": 64}, }, timeout=5, ) except requests.RequestException as e: print(f"Warmup failed: {str(e)}") return def get_ai_response( session: requests.Session, messages: list, llm_model: str, llm_url: str, max_tokens: int, temperature: float = 0.7, stream: bool = False, ): """Sends a request to the LLM and returns a streaming iterator. Args: session (requests.Session): The requests session to use. messages (list): The list of messages to send to the LLM. llm_model (str): The name of the LLM model. llm_url (str): The URL of the LLM server. max_tokens (int): The maximum number of tokens to generate. temperature (float, optional): The temperature to use for generation. Defaults to 0.7. stream (bool, optional): Whether to stream the response. Defaults to False. Returns: iterator: An iterator over the streaming response. """ try: response = session.post( llm_url, json={ "model": llm_model, "messages": messages, "options": { "num_ctx": settings.MAX_TOKENS * 2, "num_thread": settings.NUM_THREADS, }, "stream": stream, }, timeout=3600, stream=stream, ) response.raise_for_status() def streaming_iterator(): """Iterates over the streaming response.""" try: for chunk in response.iter_content(chunk_size=512): if chunk: yield chunk else: yield b"\x00\x00" except Exception as e: print(f"\nError: {str(e)}") yield b"\x00\x00" return streaming_iterator() except Exception as e: print(f"\nError: {str(e)}") def parse_stream_chunk(chunk: bytes) -> dict: """Parses a chunk of data from the LLM stream. Args: chunk (bytes): The chunk of data to parse. Returns: dict: A dictionary containing the parsed data. """ if not chunk: return {"keep_alive": True} try: text = chunk.decode("utf-8").strip() if text.startswith("data: "): text = text[6:] if text == "[DONE]": return {"choices": [{"finish_reason": "stop", "delta": {}}]} if text.startswith("{"): data = json.loads(text) content = "" if "message" in data: content = data["message"].get("content", "") elif "choices" in data and data["choices"]: choice = data["choices"][0] content = choice.get("delta", {}).get("content", "") or choice.get( "message", {} ).get("content", "") if content: return {"choices": [{"delta": {"content": filter_response(content)}}]} return None except Exception as e: if str(e) != "Expecting value: line 1 column 2 (char 1)": print(f"Error parsing stream chunk: {str(e)}") return None