import os
import logging
import asyncio
import uvicorn
import random
from transformers import AutoModelForCausalLM, AutoTokenizer
from fastapi import FastAPI, Query, HTTPException
from fastapi.responses import HTMLResponse
# Configuración de logging
logging.basicConfig(level=logging.DEBUG)
logger = logging.getLogger(__name__)
# Inicializar la aplicación FastAPI
app = FastAPI()
# Diccionario para almacenar los modelos
data_and_models_dict = {}
# Lista para almacenar el historial de mensajes
message_history = []
# Lista para almacenar los tokens
tokens_history = []
# Función para cargar modelos
async def load_models():
programming_models = [
"microsoft/CodeGPT-small-py",
"Salesforce/codegen-350M-multi",
"Salesforce/codegen-2B-multi"
]
gpt_models = ["gpt2-medium", "gpt2-large", "gpt2", "google/gemma-2-9b"] + programming_models
models = []
for model_name in gpt_models:
try:
model = AutoModelForCausalLM.from_pretrained(model_name)
tokenizer = AutoTokenizer.from_pretrained(model_name)
logger.info(f"Successfully loaded {model_name} model")
models.append((model, tokenizer, model_name))
except Exception as e:
logger.error(f"Failed to load {model_name} model: {e}")
if not models:
raise HTTPException(status_code=500, detail="Failed to load any models")
return models
# Función para descargar modelos
async def download_models():
models = await load_models()
data_and_models_dict['models'] = models
@app.get('/')
async def main():
html_code = """
ChatGPT Chatbot
"""
return HTMLResponse(content=html_code, status_code=200)
# Ruta para la generación de respuestas
@app.get('/autocomplete')
async def autocomplete(q: str = Query(...)):
global data_and_models_dict, message_history, tokens_history
# Verificar si hay modelos cargados
if 'models' not in data_and_models_dict:
await download_models()
# Obtener los modelos
models = data_and_models_dict['models']
best_response = None
best_score = float('-inf') # Para almacenar la mejor puntuación
for model, tokenizer, model_name in models:
# Generar tokens de entrada
input_ids = tokenizer.encode(q, return_tensors="pt")
tokens_history.append({"input": input_ids.tolist()}) # Guardar tokens de entrada
# Generar parámetros aleatorios
top_k = random.randint(0, 50)
top_p = random.uniform(0.8, 1.0)
temperature = random.uniform(0.7, 1.5)
# Generar una respuesta utilizando el modelo
output = model.generate(
input_ids,
max_length=50,
top_k=top_k,
top_p=top_p,
temperature=temperature,
num_return_sequences=1
)
response_text = tokenizer.decode(output[0], skip_special_tokens=True)
# Calcular una puntuación simple para determinar la mejor respuesta
score = len(response_text) # Aquí podrías usar otro criterio de puntuación
# Comparar y almacenar la mejor respuesta
if score > best_score:
best_score = score
best_response = response_text
# Generar tokens de salida
output_ids = output[0].tolist()
tokens_history.append({"output": output_ids}) # Guardar tokens de salida
# Guardar eos y pad tokens
eos_token = tokenizer.eos_token_id
pad_token = tokenizer.pad_token_id
tokens_history.append({"eos_token": eos_token, "pad_token": pad_token})
# Guardar el mensaje del usuario en el historial
message_history.append(q)
# Respuesta con la mejor respuesta generada
return {"response": best_response}
# Función para ejecutar la aplicación sin reiniciarla
def run_app():
asyncio.run(download_models())
uvicorn.run(app, host='0.0.0.0', port=7860)
# Ejecutar la aplicación
if __name__ == "__main__":
run_app()