import os import gc import io from llama_cpp import Llama from concurrent.futures import ThreadPoolExecutor, as_completed from fastapi import FastAPI, Request, HTTPException from fastapi.responses import JSONResponse from tqdm import tqdm from dotenv import load_dotenv from pydantic import BaseModel from huggingface_hub import hf_hub_download, login from nltk.tokenize import word_tokenize from nltk.corpus import stopwords from sklearn.feature_extraction.text import TfidfVectorizer from sklearn.metrics.pairwise import cosine_similarity import nltk import uvicorn import psutil import torch import io nltk.download('punkt') nltk.download('stopwords') load_dotenv() app = FastAPI() HUGGINGFACE_TOKEN = os.getenv("HUGGINGFACE_TOKEN") if HUGGINGFACE_TOKEN: login(token=HUGGINGFACE_TOKEN) model_configs = [ # ... (Your model configurations remain the same) ... ] global_data = {'model_configs': model_configs, 'training_data': io.StringIO()} class ModelManager: def __init__(self): self.models = {} self.load_models() def load_models(self): for config in tqdm(global_data['model_configs'], desc="Loading models"): model_name = config['name'] if model_name not in self.models: try: model_bytes = hf_hub_download(repo_id=config['repo_id'], filename=config['filename'], use_auth_token=HUGGINGFACE_TOKEN) model = Llama(model_path=io.BytesIO(model_bytes), n_ctx=512, n_gpu=1) # Correct: Use io.BytesIO self.models[model_name] = model print(f"Model '{model_name}' loaded successfully.") except Exception as e: print(f"Error loading model {model_name}: {e}") self.models[model_name] = None finally: gc.collect() def get_model(self, model_name: str): return self.models.get(model_name) model_manager = ModelManager() class ChatRequest(BaseModel): message: str async def generate_model_response(model, inputs: str) -> str: try: if model: response = model(inputs, max_tokens=150) return response['choices'][0]['text'].strip() else: return "Model not loaded" except Exception as e: return f"Error: Could not generate a response. Details: {e}" async def process_message(message: str) -> dict: inputs = message.strip() responses = {} loaded_models = [model for model in global_data['model_configs'] if model_manager.get_model(model['name'])] with ThreadPoolExecutor(max_workers=min(len(loaded_models), 4)) as executor: futures = [executor.submit(generate_model_response, model_manager.get_model(config['name']), inputs) for config in loaded_models] for i, future in enumerate(tqdm(as_completed(futures), total=len(futures), desc="Generating responses")): try: model_name = loaded_models[i]['name'] responses[model_name] = future.result() except Exception as e: responses[model_name] = f"Error processing {model_name}: {e}" stop_words = set(stopwords.words('english')) vectorizer = TfidfVectorizer(tokenizer=word_tokenize, stop_words=stop_words) reference_text = message response_texts = list(responses.values()) if response_texts: tfidf_matrix = vectorizer.fit_transform([reference_text] + response_texts) similarities = cosine_similarity(tfidf_matrix[0:1], tfidf_matrix[1:]) best_response_index = similarities.argmax() best_response_model = list(responses.keys())[best_response_index] best_response_text = response_texts[best_response_index] return {"best_response": {"model": best_response_model, "text": best_response_text}, "all_responses": responses} else: return {"best_response": {"model": None, "text": "No models loaded successfully."}, "all_responses": responses} @app.post("/generate_multimodel") async def api_generate_multimodel(request: Request): try: data = await request.json() message = data.get("message") if not message: raise HTTPException(status_code=400, detail="Missing message") response = await process_message(message) return JSONResponse(response) except HTTPException as e: raise e except Exception as e: return JSONResponse({"error": str(e)}, status_code=500) async def startup(): pass async def shutdown(): gc.collect() app.add_event_handler("startup", startup) app.add_event_handler("shutdown", shutdown) def release_resources(): try: torch.cuda.empty_cache() gc.collect() except Exception as e: print(f"Failed to release resources: {e}") def resource_manager(): MAX_RAM_PERCENT = 20 MAX_CPU_PERCENT = 20 MAX_GPU_PERCENT = 20 MAX_RAM_MB = 2048 while True: try: virtual_mem = psutil.virtual_memory() current_ram_percent = virtual_mem.percent current_ram_mb = virtual_mem.used / (1024 * 1024) if current_ram_percent > MAX_RAM_PERCENT or current_ram_mb > MAX_RAM_MB: release_resources() current_cpu_percent = psutil.cpu_percent() if current_cpu_percent > MAX_CPU_PERCENT: psutil.Process(os.getpid()).nice() if torch.cuda.is_available(): gpu = torch.cuda.current_device() gpu_mem = torch.cuda.memory_percent(gpu) if gpu_mem > MAX_GPU_PERCENT: release_resources() except Exception as e: print(f"Error in resource manager: {e}") if __name__ == "__main__": import threading resource_thread = threading.Thread(target=resource_manager) resource_thread.daemon = True resource_thread.start() port = int(os.environ.get("PORT", 7860)) uvicorn.run(app, host="0.0.0.0", port=port)