Spaces:
Sleeping
Sleeping
import gradio as gr | |
from langchain_community.llms import LlamaCpp | |
import os | |
import json | |
import torch | |
import logging | |
from typing import Optional, List, Dict, Any | |
from fastapi import FastAPI, HTTPException, Request | |
from fastapi.responses import JSONResponse | |
from pydantic import BaseModel | |
import uvicorn | |
import time | |
from threading import Lock | |
from pathlib import Path | |
from huggingface_hub import hf_hub_download, list_repo_files | |
from contextlib import asynccontextmanager | |
# Configure logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
class ChatCompletionRequest(BaseModel): | |
model: str | |
messages: List[Dict[str, str]] | |
temperature: Optional[float] = 0.7 | |
max_tokens: Optional[int] = 2048 | |
stream: Optional[bool] = False | |
def get_model_filename(): | |
"""Get the correct model filename from the repository.""" | |
try: | |
logger.info("Listing repository files...") | |
files = list_repo_files("G17c21ds/Qwen2.5-14B-Instruct-Uncensored-Q8_0-GGUF") | |
gguf_files = [f for f in files if f.endswith('.gguf')] | |
if not gguf_files: | |
raise ValueError("No GGUF model files found in repository") | |
logger.info(f"Found model files: {gguf_files}") | |
return gguf_files[0] | |
except Exception as e: | |
logger.error(f"Error listing repository files: {str(e)}") | |
raise | |
def download_model_from_hf(): | |
"""Download the model file from Hugging Face.""" | |
try: | |
logger.info("Downloading model from Hugging Face Hub...") | |
model_dir = Path("models") | |
model_dir.mkdir(exist_ok=True) | |
model_filename = get_model_filename() | |
logger.info(f"Using model file: {model_filename}") | |
local_path = hf_hub_download( | |
repo_id="G17c21ds/Qwen2.5-14B-Instruct-Uncensored-Q8_0-GGUF", | |
filename=model_filename, | |
local_dir=model_dir, | |
local_dir_use_symlinks=False | |
) | |
return Path(local_path) | |
except Exception as e: | |
logger.error(f"Error downloading model: {str(e)}") | |
raise | |
class QwenModel: | |
def __init__(self): | |
"""Initialize the Qwen model with automatic device detection.""" | |
try: | |
self.has_gpu = torch.cuda.is_available() | |
self.device_count = torch.cuda.device_count() if self.has_gpu else 0 | |
logger.info(f"GPU available: {self.has_gpu}, Device count: {self.device_count}") | |
model_path = download_model_from_hf() | |
logger.info(f"Model path: {model_path}") | |
n_gpu_layers = 40 if self.has_gpu else 0 | |
logger.info(f"Using {'GPU' if self.has_gpu else 'CPU'} for inference") | |
n_batch = 512 if self.has_gpu else 64 | |
n_ctx = 2048 if not self.has_gpu else 4096 | |
self.llm = LlamaCpp( | |
model_path=str(model_path), | |
n_gpu_layers=n_gpu_layers, | |
n_ctx=n_ctx, | |
n_batch=n_batch, | |
verbose=True, | |
temperature=0.7, | |
max_tokens=2048, | |
top_p=0.95, | |
top_k=50, | |
f16_kv=self.has_gpu, | |
use_mlock=True, | |
use_mmap=True, | |
seed=42, | |
repeat_penalty=1.1, | |
rope_scaling={"type": "linear", "factor": 1.0}, | |
) | |
self.lock = Lock() | |
except Exception as e: | |
logger.error(f"Failed to initialize model: {str(e)}") | |
raise | |
def generate_cot_prompt(self, messages: List[Dict[str, str]]) -> str: | |
"""Generate a chain-of-thought prompt from message history.""" | |
conversation = [] | |
for msg in messages: | |
role = msg.get("role", "") | |
content = msg.get("content", "") | |
if role == "system": | |
conversation.append(f"System: {content}") | |
elif role == "user": | |
conversation.append(f"Human: {content}") | |
elif role == "assistant": | |
conversation.append(f"Assistant: {content}") | |
last_user_msg = next((msg["content"] for msg in reversed(messages) | |
if msg["role"] == "user"), None) | |
if not last_user_msg: | |
raise ValueError("No user message found in the conversation") | |
cot_template = f"""Previous conversation: | |
{chr(10).join(conversation)} | |
Let's approach the latest question step-by-step: | |
1. Understanding the question: | |
{last_user_msg} | |
2. Breaking down components: | |
- Key elements to consider | |
- Specific information requested | |
- Relevant constraints | |
3. Reasoning process: | |
- Systematic approach | |
- Applicable knowledge | |
- Potential challenges | |
4. Step-by-step solution: | |
""" | |
return cot_template | |
def process_response(self, response: str) -> str: | |
"""Process and format the model's response.""" | |
try: | |
response = response.strip() | |
if not response.startswith("Step"): | |
response = "Step-by-step solution:\n" + response | |
return response | |
except Exception as e: | |
logger.error(f"Error processing response: {str(e)}") | |
return "Error processing response" | |
def generate_response(self, | |
messages: List[Dict[str, str]], | |
temperature: float = 0.7, | |
max_tokens: int = 2048) -> Dict[str, Any]: | |
"""Generate a response using chain-of-thought reasoning.""" | |
try: | |
with self.lock: | |
full_prompt = self.generate_cot_prompt(messages) | |
start_time = time.time() | |
response = self.llm( | |
full_prompt, | |
temperature=temperature, | |
max_tokens=max_tokens | |
) | |
end_time = time.time() | |
processed_response = self.process_response(response) | |
return { | |
"id": f"chatcmpl-{int(time.time()*1000)}", | |
"object": "chat.completion", | |
"created": int(time.time()), | |
"model": "qwen-2.5-14b", | |
"choices": [{ | |
"index": 0, | |
"message": { | |
"role": "assistant", | |
"content": processed_response | |
}, | |
"finish_reason": "stop" | |
}], | |
"usage": { | |
"prompt_tokens": len(full_prompt.split()), | |
"completion_tokens": len(processed_response.split()), | |
"total_tokens": len(full_prompt.split()) + len(processed_response.split()) | |
}, | |
"system_info": { | |
"device": "gpu" if self.has_gpu else "cpu", | |
"processing_time": round(end_time - start_time, 2) | |
} | |
} | |
except Exception as e: | |
logger.error(f"Error generating response: {str(e)}") | |
raise HTTPException(status_code=500, detail=str(e)) | |
def create_gradio_interface(model: QwenModel): | |
"""Create and configure the Gradio interface.""" | |
def predict(message: str, | |
temperature: float, | |
max_tokens: int) -> str: | |
messages = [{"role": "user", "content": message}] | |
response = model.generate_response( | |
messages, | |
temperature=temperature, | |
max_tokens=max_tokens | |
) | |
return response["choices"][0]["message"]["content"] | |
iface = gr.Interface( | |
fn=predict, | |
inputs=[ | |
gr.Textbox( | |
label="Input", | |
placeholder="Enter your question or task here...", | |
lines=5 | |
), | |
gr.Slider( | |
minimum=0.1, | |
maximum=1.0, | |
value=0.7, | |
label="Temperature", | |
info="Higher values make the output more random" | |
), | |
gr.Slider( | |
minimum=64, | |
maximum=4096, | |
value=2048, | |
step=64, | |
label="Max Tokens", | |
info="Maximum length of the generated response" | |
) | |
], | |
outputs=gr.Textbox(label="Response", lines=10), | |
title="Qwen 2.5 14B Instruct Model", | |
description="""This is a Qwen 2.5 14B model interface with chain-of-thought prompting. | |
The model will break down complex problems and solve them step by step.""", | |
examples=[ | |
["Explain how photosynthesis works", 0.7, 2048], | |
["Solve the quadratic equation: x² + 5x + 6 = 0", 0.7, 1024], | |
["What are the implications of Moore's Law for future computing?", 0.8, 2048] | |
] | |
) | |
return iface | |
# Initialize FastAPI with lifespan | |
app = FastAPI(title="Qwen 2.5 API") | |
# Global model instance | |
model = None | |
async def lifespan(app: FastAPI): | |
"""Lifespan context manager for FastAPI startup and shutdown events.""" | |
global model | |
try: | |
model = QwenModel() | |
logger.info("Model initialized successfully") | |
yield | |
finally: | |
pass | |
app = FastAPI(lifespan=lifespan) | |
async def create_chat_completion(request: ChatCompletionRequest): | |
"""OpenAI-compatible chat completions endpoint.""" | |
try: | |
response = model.generate_response( | |
request.messages, | |
temperature=request.temperature, | |
max_tokens=request.max_tokens | |
) | |
return JSONResponse(content=response) | |
except Exception as e: | |
raise HTTPException(status_code=500, detail=str(e)) | |
def main(): | |
"""Main function to initialize and launch the application.""" | |
try: | |
global model | |
if model is None: | |
model = QwenModel() | |
interface = create_gradio_interface(model) | |
app.mount("/", interface.app) | |
uvicorn.run( | |
app, | |
host="0.0.0.0", | |
port=7860, | |
log_level="info" | |
) | |
except Exception as e: | |
logger.error(f"Application failed to start: {str(e)}") | |
raise | |
if __name__ == "__main__": | |
main() |