import gradio as gr from langchain_community.llms import LlamaCpp import os import json import torch import logging from typing import Optional, List, Dict, Any from fastapi import FastAPI, HTTPException, Request from fastapi.responses import JSONResponse from pydantic import BaseModel import uvicorn import time from threading import Lock from pathlib import Path from huggingface_hub import hf_hub_download, list_repo_files from contextlib import asynccontextmanager # Configure logging logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class ChatCompletionRequest(BaseModel): model: str messages: List[Dict[str, str]] temperature: Optional[float] = 0.7 max_tokens: Optional[int] = 2048 stream: Optional[bool] = False def get_model_filename(): """Get the correct model filename from the repository.""" try: logger.info("Listing repository files...") files = list_repo_files("G17c21ds/Qwen2.5-14B-Instruct-Uncensored-Q8_0-GGUF") gguf_files = [f for f in files if f.endswith('.gguf')] if not gguf_files: raise ValueError("No GGUF model files found in repository") logger.info(f"Found model files: {gguf_files}") return gguf_files[0] except Exception as e: logger.error(f"Error listing repository files: {str(e)}") raise def download_model_from_hf(): """Download the model file from Hugging Face.""" try: logger.info("Downloading model from Hugging Face Hub...") model_dir = Path("models") model_dir.mkdir(exist_ok=True) model_filename = get_model_filename() logger.info(f"Using model file: {model_filename}") local_path = hf_hub_download( repo_id="G17c21ds/Qwen2.5-14B-Instruct-Uncensored-Q8_0-GGUF", filename=model_filename, local_dir=model_dir, local_dir_use_symlinks=False ) return Path(local_path) except Exception as e: logger.error(f"Error downloading model: {str(e)}") raise class QwenModel: def __init__(self): """Initialize the Qwen model with automatic device detection.""" try: self.has_gpu = torch.cuda.is_available() self.device_count = torch.cuda.device_count() if self.has_gpu else 0 logger.info(f"GPU available: {self.has_gpu}, Device count: {self.device_count}") model_path = download_model_from_hf() logger.info(f"Model path: {model_path}") n_gpu_layers = 40 if self.has_gpu else 0 logger.info(f"Using {'GPU' if self.has_gpu else 'CPU'} for inference") n_batch = 512 if self.has_gpu else 64 n_ctx = 2048 if not self.has_gpu else 4096 self.llm = LlamaCpp( model_path=str(model_path), n_gpu_layers=n_gpu_layers, n_ctx=n_ctx, n_batch=n_batch, verbose=True, temperature=0.7, max_tokens=2048, top_p=0.95, top_k=50, f16_kv=self.has_gpu, use_mlock=True, use_mmap=True, seed=42, repeat_penalty=1.1, rope_scaling={"type": "linear", "factor": 1.0}, ) self.lock = Lock() except Exception as e: logger.error(f"Failed to initialize model: {str(e)}") raise def generate_cot_prompt(self, messages: List[Dict[str, str]]) -> str: """Generate a chain-of-thought prompt from message history.""" conversation = [] for msg in messages: role = msg.get("role", "") content = msg.get("content", "") if role == "system": conversation.append(f"System: {content}") elif role == "user": conversation.append(f"Human: {content}") elif role == "assistant": conversation.append(f"Assistant: {content}") last_user_msg = next((msg["content"] for msg in reversed(messages) if msg["role"] == "user"), None) if not last_user_msg: raise ValueError("No user message found in the conversation") cot_template = f"""Previous conversation: {chr(10).join(conversation)} Let's approach the latest question step-by-step: 1. Understanding the question: {last_user_msg} 2. Breaking down components: - Key elements to consider - Specific information requested - Relevant constraints 3. Reasoning process: - Systematic approach - Applicable knowledge - Potential challenges 4. Step-by-step solution: """ return cot_template def process_response(self, response: str) -> str: """Process and format the model's response.""" try: response = response.strip() if not response.startswith("Step"): response = "Step-by-step solution:\n" + response return response except Exception as e: logger.error(f"Error processing response: {str(e)}") return "Error processing response" def generate_response(self, messages: List[Dict[str, str]], temperature: float = 0.7, max_tokens: int = 2048) -> Dict[str, Any]: """Generate a response using chain-of-thought reasoning.""" try: with self.lock: full_prompt = self.generate_cot_prompt(messages) start_time = time.time() response = self.llm( full_prompt, temperature=temperature, max_tokens=max_tokens ) end_time = time.time() processed_response = self.process_response(response) return { "id": f"chatcmpl-{int(time.time()*1000)}", "object": "chat.completion", "created": int(time.time()), "model": "qwen-2.5-14b", "choices": [{ "index": 0, "message": { "role": "assistant", "content": processed_response }, "finish_reason": "stop" }], "usage": { "prompt_tokens": len(full_prompt.split()), "completion_tokens": len(processed_response.split()), "total_tokens": len(full_prompt.split()) + len(processed_response.split()) }, "system_info": { "device": "gpu" if self.has_gpu else "cpu", "processing_time": round(end_time - start_time, 2) } } except Exception as e: logger.error(f"Error generating response: {str(e)}") raise HTTPException(status_code=500, detail=str(e)) def create_gradio_interface(model: QwenModel): """Create and configure the Gradio interface.""" def predict(message: str, temperature: float, max_tokens: int) -> str: messages = [{"role": "user", "content": message}] response = model.generate_response( messages, temperature=temperature, max_tokens=max_tokens ) return response["choices"][0]["message"]["content"] iface = gr.Interface( fn=predict, inputs=[ gr.Textbox( label="Input", placeholder="Enter your question or task here...", lines=5 ), gr.Slider( minimum=0.1, maximum=1.0, value=0.7, label="Temperature", info="Higher values make the output more random" ), gr.Slider( minimum=64, maximum=4096, value=2048, step=64, label="Max Tokens", info="Maximum length of the generated response" ) ], outputs=gr.Textbox(label="Response", lines=10), title="Qwen 2.5 14B Instruct Model", description="""This is a Qwen 2.5 14B model interface with chain-of-thought prompting. The model will break down complex problems and solve them step by step.""", examples=[ ["Explain how photosynthesis works", 0.7, 2048], ["Solve the quadratic equation: x² + 5x + 6 = 0", 0.7, 1024], ["What are the implications of Moore's Law for future computing?", 0.8, 2048] ] ) return iface # Initialize FastAPI with lifespan app = FastAPI(title="Qwen 2.5 API") # Global model instance model = None @asynccontextmanager async def lifespan(app: FastAPI): """Lifespan context manager for FastAPI startup and shutdown events.""" global model try: model = QwenModel() logger.info("Model initialized successfully") yield finally: pass app = FastAPI(lifespan=lifespan) @app.post("/v1/chat/completions") async def create_chat_completion(request: ChatCompletionRequest): """OpenAI-compatible chat completions endpoint.""" try: response = model.generate_response( request.messages, temperature=request.temperature, max_tokens=request.max_tokens ) return JSONResponse(content=response) except Exception as e: raise HTTPException(status_code=500, detail=str(e)) def main(): """Main function to initialize and launch the application.""" try: global model if model is None: model = QwenModel() interface = create_gradio_interface(model) app.mount("/", interface.app) uvicorn.run( app, host="0.0.0.0", port=7860, log_level="info" ) except Exception as e: logger.error(f"Application failed to start: {str(e)}") raise if __name__ == "__main__": main()