Hjgugugjhuhjggg's picture
Update app.py
85c0c92 verified
import os
import gc
import io
from llama_cpp import Llama
from concurrent.futures import ThreadPoolExecutor, as_completed
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
from tqdm import tqdm
from dotenv import load_dotenv
from pydantic import BaseModel
from huggingface_hub import hf_hub_download, login
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity
import nltk
import uvicorn
import psutil
import torch
import io
nltk.download('punkt')
nltk.download('stopwords')
load_dotenv()
app = FastAPI()
HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN")
if HUGGINGFACE_TOKEN:
login(token=HUGGINGFACE_TOKEN)
model_configs = [
# ... (Your model configurations remain the same) ...
]
global_data = {'model_configs': model_configs, 'training_data': io.StringIO()}
class ModelManager:
def __init__(self):
self.models = {}
self.load_models()
def load_models(self):
for config in tqdm(global_data['model_configs'], desc="Loading models"):
model_name = config['name']
if model_name not in self.models:
try:
model_bytes = hf_hub_download(repo_id=config['repo_id'], filename=config['filename'], use_auth_token=HUGGINGFACE_TOKEN)
model = Llama(model_path=io.BytesIO(model_bytes), n_ctx=512, n_gpu=1) # Correct: Use io.BytesIO
self.models[model_name] = model
print(f"Model '{model_name}' loaded successfully.")
except Exception as e:
print(f"Error loading model {model_name}: {e}")
self.models[model_name] = None
finally:
gc.collect()
def get_model(self, model_name: str):
return self.models.get(model_name)
model_manager = ModelManager()
class ChatRequest(BaseModel):
message: str
async def generate_model_response(model, inputs: str) -> str:
try:
if model:
response = model(inputs, max_tokens=150)
return response['choices'][0]['text'].strip()
else:
return "Model not loaded"
except Exception as e:
return f"Error: Could not generate a response. Details: {e}"
async def process_message(message: str) -> dict:
inputs = message.strip()
responses = {}
loaded_models = [model for model in global_data['model_configs'] if model_manager.get_model(model['name'])]
with ThreadPoolExecutor(max_workers=min(len(loaded_models), 4)) as executor:
futures = [executor.submit(generate_model_response, model_manager.get_model(config['name']), inputs) for config in loaded_models]
for i, future in enumerate(tqdm(as_completed(futures), total=len(futures), desc="Generating responses")):
try:
model_name = loaded_models[i]['name']
responses[model_name] = future.result()
except Exception as e:
responses[model_name] = f"Error processing {model_name}: {e}"
stop_words = set(stopwords.words('english'))
vectorizer = TfidfVectorizer(tokenizer=word_tokenize, stop_words=stop_words)
reference_text = message
response_texts = list(responses.values())
if response_texts:
tfidf_matrix = vectorizer.fit_transform([reference_text] + response_texts)
similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:])
best_response_index = similarities.argmax()
best_response_model = list(responses.keys())[best_response_index]
best_response_text = response_texts[best_response_index]
return {"best_response": {"model": best_response_model, "text": best_response_text}, "all_responses": responses}
else:
return {"best_response": {"model": None, "text": "No models loaded successfully."}, "all_responses": responses}
@app.post("/generate_multimodel")
async def api_generate_multimodel(request: Request):
try:
data = await request.json()
message = data.get("message")
if not message:
raise HTTPException(status_code=400, detail="Missing message")
response = await process_message(message)
return JSONResponse(response)
except HTTPException as e:
raise e
except Exception as e:
return JSONResponse({"error": str(e)}, status_code=500)
async def startup():
pass
async def shutdown():
gc.collect()
app.add_event_handler("startup", startup)
app.add_event_handler("shutdown", shutdown)
def release_resources():
try:
torch.cuda.empty_cache()
gc.collect()
except Exception as e:
print(f"Failed to release resources: {e}")
def resource_manager():
MAX_RAM_PERCENT = 20
MAX_CPU_PERCENT = 20
MAX_GPU_PERCENT = 20
MAX_RAM_MB = 2048
while True:
try:
virtual_mem = psutil.virtual_memory()
current_ram_percent = virtual_mem.percent
current_ram_mb = virtual_mem.used / (1024 * 1024)
if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB:
release_resources()
current_cpu_percent = psutil.cpu_percent()
if current_cpu_percent > MAX_CPU_PERCENT:
psutil.Process(os.getpid()).nice()
if torch.cuda.is_available():
gpu = torch.cuda.current_device()
gpu_mem = torch.cuda.memory_percent(gpu)
if gpu_mem > MAX_GPU_PERCENT:
release_resources()
except Exception as e:
print(f"Error in resource manager: {e}")
if __name__ == "__main__":
import threading
resource_thread = threading.Thread(target=resource_manager)
resource_thread.daemon = True
resource_thread.start()
port = int(os.environ.get("PORT", 7860))
uvicorn.run(app, host="0.0.0.0", port=port)