import os import re import tempfile import requests import gradio as gr print(f"Gradio version: {gr.__version__}") from PyPDF2 import PdfReader import fitz # pymupdf import logging import webbrowser from huggingface_hub import InferenceClient from typing import Dict, List, Optional, Tuple from functools import wraps import threading import time from groq import Groq # Import the Groq client # Set up logging logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') # Constants CONTEXT_SIZES = { "4K": 4096, "8K": 8192, "32K": 32768, "64K": 65536, "128K": 131072 } MODEL_CONTEXT_SIZES = { "Clipboard only": 4096, "OpenAI ChatGPT": { "gpt-3.5-turbo": 16385, "gpt-3.5-turbo-0125": 16385, "gpt-3.5-turbo-1106": 16385, "gpt-3.5-turbo-instruct": 4096, "gpt-4": 8192, "gpt-4-0314": 8192, "gpt-4-0613": 8192, "gpt-4-turbo": 128000, "gpt-4-turbo-2024-04-09": 128000, "gpt-4-turbo-preview": 128000, "gpt-4-0125-preview": 128000, "gpt-4-1106-preview": 128000, "gpt-4o": 128000, "gpt-4o-2024-11-20": 128000, "gpt-4o-2024-08-06": 128000, "gpt-4o-2024-05-13": 128000, "chatgpt-4o-latest": 128000, "gpt-4o-mini": 128000, "gpt-4o-mini-2024-07-18": 128000, "gpt-4o-realtime-preview": 128000, "gpt-4o-realtime-preview-2024-10-01": 128000, "gpt-4o-audio-preview": 128000, "gpt-4o-audio-preview-2024-10-01": 128000, "o1-preview": 128000, "o1-preview-2024-09-12": 128000, "o1-mini": 128000, "o1-mini-2024-09-12": 128000, }, "HuggingFace Inference": { "microsoft/phi-3-mini-4k-instruct": 4096, "microsoft/Phi-3-mini-128k-instruct": 131072, # Added Phi-3 128k "HuggingFaceH4/zephyr-7b-beta": 8192, "deepseek-ai/DeepSeek-Coder-V2-Instruct": 8192, "mistralai/Mistral-7B-Instruct-v0.3": 32768, "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO": 32768, "microsoft/Phi-3.5-mini-instruct": 4096, "HuggingFaceTB/SmolLM2-1.7B-Instruct": 2048, "google/gemma-2-2b-it": 2048, "openai-community/gpt2": 1024, "microsoft/phi-2": 2048, "TinyLlama/TinyLlama-1.1B-Chat-v1.0": 2048, "VAGOsolutions/Llama-3-SauerkrautLM-8b-Instruct": 2048, "VAGOsolutions/Llama-3.1-SauerkrautLM-8b-Instruct": 4096, "VAGOsolutions/SauerkrautLM-Nemo-12b-Instruct": 4096, "openGPT-X/Teuken-7B-instruct-research-v0.4": 4096, "Qwen/Qwen2.5-7B-Instruct": 131072, "tiiuae/falcon-7b-instruct": 8192, "Qwen/QwQ-32B-preview": 32768, # Add QwQ model }, "Groq API": { "gemma2-9b-it": 8192, "gemma-7b-it": 8192, "llama-3.3-70b-versatile": 131072, "llama-3.1-70b-versatile": 131072, # Deprecated "llama-3.1-8b-instant": 131072, "llama-guard-3-8b": 8192, "llama3-70b-8192": 8192, "llama3-8b-8192": 8192, "mixtral-8x7b-32768": 32768, "llama3-groq-70b-8192-tool-use-preview": 8192, "llama3-groq-8b-8192-tool-use-preview": 8192, "llama-3.3-70b-specdec": 131072, "llama-3.1-70b-specdec": 131072, "llama-3.2-1b-preview": 131072, "llama-3.2-3b-preview": 131072, }, "Cohere API": { "command-r-plus-08-2024": 131072, # 128k "command-r-plus-04-2024": 131072, "command-r-plus": 131072, "command-r-08-2024": 131072, "command-r-03-2024": 131072, "command-r": 131072, "command": 4096, "command-nightly": 131072, "command-light": 4096, "command-light-nightly": 4096, "c4ai-aya-expanse-8b": 8192, "c4ai-aya-expanse-32b": 131072, }, "GLHF API": { "mistralai/Mixtral-8x7B-Instruct-v0.1": 32768, # "NousResearch/Nous-Hermes-2-Solar-10.7B": 32768, "01-ai/Yi-34B-Chat": 32768, "mistralai/Mistral-7B-Instruct-v0.3": 32768, "microsoft/phi-3-mini-4k-instruct": 4096, "microsoft/Phi-3.5-mini-instruct": 4096, "microsoft/Phi-3-mini-128k-instruct": 131072, "HuggingFaceH4/zephyr-7b-beta": 8192, "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO": 32768, "google/gemma-2-2b-it": 2048, "microsoft/phi-2": 2048, } } class RateLimit: def __init__(self, calls_per_min): self.calls_per_min = calls_per_min self.calls = [] self.lock = threading.Lock() def __call__(self, func): @wraps(func) def wrapped(*args, **kwargs): with self.lock: now = time.time() # Remove old calls self.calls = [call for call in self.calls if call > now - 60] if len(self.calls) >= self.calls_per_min: sleep_time = self.calls[0] - (now - 60) if sleep_time > 0: time.sleep(sleep_time) self.calls.append(now) return func(*args, **kwargs) return wrapped class ModelRegistry: def __init__(self): # HuggingFace Models self.hf_models = { "Mistral 7B": "mistralai/Mistral-7B-Instruct-v0.3", # works well "Nous-Hermes": "NousResearch/Nous-Hermes-2-Mixtral-8x7B-DPO", # works well "Zephyr 7B": "HuggingFaceH4/zephyr-7b-beta", # works "Phi-3.5 Mini": "microsoft/Phi-3.5-mini-instruct", # works but poor results "Phi-3 Mini 4K": "microsoft/phi-3-mini-4k-instruct", # good for small context "Phi-3 Mini 128K": "microsoft/Phi-3-mini-128k-instruct", # good for large context "Gemma 2 2B": "google/gemma-2-2b-it", # works but often busy "GPT2": "openai-community/gpt2", # works with token limits "Phi-2": "microsoft/phi-2", # works with token limits "TinyLlama 1.1B": "TinyLlama/TinyLlama-1.1B-Chat-v1.0", # works with token limits "DeepSeek Coder V2": "deepseek-ai/DeepSeek-Coder-V2-Instruct", # good for code "Falcon-7B": "tiiuae/falcon-7b-instruct", # reliable "Qwen 2.5 7B": "Qwen/Qwen2.5-7B-Instruct", # good performance "QwQ 32B Preview": "Qwen/QwQ-32B-preview", # special handling # Models requiring API key "DeepSeek Coder V2 (Pro)": "deepseek-ai/DeepSeek-Coder-V2-Instruct", # needs API key "Meta Llama 3.1 70B (Pro)": "meta-llama/Meta-Llama-3.1-70B-Instruct", # needs API key "Aya 23-35B (Pro)": "CohereForAI/aya-23-35B", # needs API key "Custom Model": "" } # Default Groq Models self.default_groq_models = { # Keep defaults in case fetching fails "gemma2-9b-it": "gemma2-9b-it", "gemma-7b-it": "gemma-7b-it", "llama-3.3-70b-versatile": "llama-3.3-70b-versatile", "llama-3.1-70b-versatile": "llama-3.1-70b-versatile", # Deprecated "llama-3.1-8b-instant": "llama-3.1-8b-instant", "llama-guard-3-8b": "llama-guard-3-8b", "llama3-70b-8192": "llama3-70b-8192", "llama3-8b-8192": "llama3-8b-8192", "mixtral-8x7b-32768": "mixtral-8x7b-32768", "llama3-groq-70b-8192-tool-use-preview": "llama3-groq-70b-8192-tool-use-preview", "llama3-groq-8b-8192-tool-use-preview": "llama3-groq-8b-8192-tool-use-preview", "llama-3.3-70b-specdec": "llama-3.3-70b-specdec", "llama-3.1-70b-specdec": "llama-3.1-70b-specdec", "llama-3.2-1b-preview": "llama-3.2-1b-preview", "llama-3.2-3b-preview": "llama-3.2-3b-preview", } self.groq_models = self._fetch_groq_models() def _fetch_groq_models(self) -> Dict[str, str]: """Fetch available Groq models with proper error handling""" try: groq_api_key = os.getenv('GROQ_API_KEY') if not groq_api_key: logging.warning("No GROQ_API_KEY found in environment") return self.default_groq_models headers = { "Authorization": f"Bearer {groq_api_key}", "Content-Type": "application/json" } response = requests.get( "https://api.groq.com/openai/v1/models", headers=headers, timeout=10 ) if response.status_code == 200: models = response.json().get("data", []) model_dict = {model["id"]: model["id"] for model in models} # Merge with defaults to ensure all models are available return {**self.default_groq_models, **model_dict} else: logging.error(f"Failed to fetch Groq models: {response.status_code}") return self.default_groq_models except requests.exceptions.Timeout: logging.error("Timeout while fetching Groq models") return self.default_groq_models except Exception as e: logging.error(f"Error fetching Groq models: {e}") return self.default_groq_models def _get_default_groq_models(self) -> Dict[str, str]: """Return default Groq models""" return self.default_groq_models def refresh_groq_models(self) -> Dict[str, str]: """Refresh the list of available Groq models""" self.groq_models = self._fetch_groq_models() return self.groq_models def apply_rate_limit(func, calls_per_min, *args, **kwargs): """Apply rate limiting only when needed.""" rate_decorator = RateLimit(calls_per_min) wrapped_func = rate_decorator(func) return wrapped_func(*args, **kwargs) class PDFProcessor: """Handles PDF conversion to text and markdown using different methods""" @staticmethod def txt_convert(pdf_path: str) -> str: """Basic text extraction using PyPDF2""" try: reader = PdfReader(pdf_path) text = "" for page_num, page in enumerate(reader.pages, start=1): page_text = page.extract_text() if page_text: text += page_text + "\n" else: logging.warning(f"No text found on page {page_num}.") return text except Exception as e: logging.error(f"Error in txt conversion: {e}") return f"Error: {str(e)}" @staticmethod def md_convert_with_pymupdf(pdf_path: str) -> str: """Convert PDF to Markdown using pymupdf""" try: doc = fitz.open(pdf_path) markdown_text = [] for page in doc: blocks = page.get_text("dict")["blocks"] for block in blocks: if "lines" in block: for line in block["lines"]: for span in line["spans"]: font_size = span["size"] content = span["text"] font_flags = span["flags"] # Contains bold, italic info # Handle headers based on font size if font_size > 20: markdown_text.append(f"# {content}\n") elif font_size > 16: markdown_text.append(f"## {content}\n") elif font_size > 14: markdown_text.append(f"### {content}\n") else: # Handle bold and italic if font_flags & 2**4: # Bold content = f"**{content}**" if font_flags & 2**1: # Italic content = f"*{content}*" markdown_text.append(content) markdown_text.append(" ") # Space between spans markdown_text.append("\n") # Newline between lines # Add extra newline between blocks for paragraphs markdown_text.append("\n") doc.close() return "".join(markdown_text) except Exception as e: logging.error(f"Error in pymupdf conversion: {e}") return f"Error: {str(e)}" # Initialize model registry model_registry = ModelRegistry() def extract_text_from_pdf(pdf_path: str, format_type: str = "txt") -> str: """ Extract and format text from PDF using different processors based on format. Args: pdf_path: Path to PDF file format_type: Either 'txt' or 'md' Returns: Formatted text content """ processor = PDFProcessor() try: if format_type == "txt": return processor.txt_convert(pdf_path) elif format_type == "md": return processor.md_convert_with_pymupdf(pdf_path) else: return f"Error: Unsupported format type: {format_type}" except Exception as e: logging.error(f"Error in PDF conversion: {e}") return f"Error: {str(e)}" def format_content(text: str, format_type: str) -> str: """Format extracted text according to specified format.""" if format_type == 'txt': return text elif format_type == 'md': paragraphs = text.split('\n\n') return '\n\n'.join(paragraphs) elif format_type == 'html': paragraphs = text.split('\n\n') return ''.join([f'

{para.strip()}

' for para in paragraphs if para.strip()]) else: logging.error(f"Unsupported format: {format_type}") return f"Unsupported format: {format_type}" def split_into_snippets(text: str, context_size: int) -> List[str]: """Split text into manageable snippets based on context size.""" sentences = re.split(r'(?<=[.!?]) +', text) snippets = [] current_snippet = "" for sentence in sentences: if len(current_snippet) + len(sentence) + 1 > context_size: if current_snippet: snippets.append(current_snippet.strip()) current_snippet = sentence + " " else: snippets.append(sentence.strip()) current_snippet = "" else: current_snippet += sentence + " " if current_snippet.strip(): snippets.append(current_snippet.strip()) return snippets def build_prompts(snippets: List[str], prompt_instruction: str, custom_prompt: Optional[str], snippet_num: Optional[int] = None) -> str: """Build formatted prompts from text snippets.""" if snippet_num is not None: if 1 <= snippet_num <= len(snippets): selected_snippets = [snippets[snippet_num - 1]] else: return f"Error: Invalid snippet number. Please choose between 1 and {len(snippets)}." else: selected_snippets = snippets prompts = [] base_prompt = custom_prompt if custom_prompt else prompt_instruction for idx, snippet in enumerate(selected_snippets, start=1): if len(selected_snippets) > 1: prompt_header = f"{base_prompt} Part {idx} of {len(selected_snippets)}: ---\n" else: prompt_header = f"{base_prompt} ---\n" framed_prompt = f"{prompt_header}{snippet}\n---" prompts.append(framed_prompt) return "\n\n".join(prompts) def send_to_model(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key, groq_model_choice, groq_api_key, openai_api_key, openai_model_choice, cohere_api_key=None, cohere_model=None, glhf_api_key=None, glhf_model=None, glhf_custom_model=None): """Primary wrapper for model interactions with error handling.""" logging.info("send to model starting...") if not prompt or not prompt.strip(): return gr.HTML(""), "Error: No prompt provided", None try: logging.info("sending to model preparation.") # Basic input validation valid_selections = ["Clipboard only", "HuggingFace Inference", "Groq API", "OpenAI ChatGPT", "Cohere API", "GLHF API"] if model_selection not in valid_selections: return gr.HTML(""), "Error: Invalid model selection", None # Check environment API keys env_api_keys = { "GROQ_API_KEY": os.getenv('GROQ_API_KEY'), "OPENAI_API_KEY": os.getenv('OPENAI_API_KEY'), "COHERE_API_KEY": os.getenv('COHERE_API_KEY'), "GLHF_API_KEY": os.getenv('GLHF_API_KEY') } for key_name, key_value in env_api_keys.items(): if not key_value: logging.warning(f"No {key_name} found in environment") # Model-specific validation - check only required keys if model_selection == "Groq API" and not groq_api_key: groq_api_key = env_api_keys.get("GROQ_API_KEY") if not groq_api_key: return gr.HTML(""), "Error: Groq API key required", None elif model_selection == "OpenAI ChatGPT" and not openai_api_key: openai_api_key = env_api_keys.get("OPENAI_API_KEY") if not openai_api_key: return gr.HTML(""), "Error: OpenAI API key required", None elif model_selection == "GLHF API" and not glhf_api_key: glhf_api_key = env_api_keys.get("GLHF_API_KEY") if not glhf_api_key: return gr.HTML(""), "Error: GLHF API key required", None # Call the implementation function clipboard_status, summary, download_file = send_to_model_impl( prompt=prompt.strip(), model_selection=model_selection, hf_model_choice=hf_model_choice, hf_custom_model=hf_custom_model, hf_api_key=hf_api_key, groq_model_choice=groq_model_choice, groq_api_key=groq_api_key, openai_api_key=openai_api_key, openai_model_choice=openai_model_choice, cohere_api_key=cohere_api_key or env_api_keys.get("COHERE_API_KEY"), cohere_model=cohere_model, glhf_api_key=glhf_api_key, glhf_model=glhf_model, glhf_custom_model=glhf_custom_model, use_rate_limits=False # Adjust based on your needs ) return clipboard_status, summary, download_file except Exception as e: error_msg = str(e) or "Unknown error occurred" logging.error(f"Error in send_to_model: {error_msg}") return gr.HTML(f"Error: {error_msg}"), f"Error: {error_msg}", None finally: logging.info("send to model completed.") def send_to_model_impl(prompt, model_selection, hf_model_choice, hf_custom_model, hf_api_key, groq_model_choice, groq_api_key, openai_api_key, openai_model_choice, cohere_api_key=None, cohere_model=None, glhf_api_key=None, glhf_model=None, glhf_custom_model=None, use_rate_limits=False): """Implementation of model sending with all providers.""" logging.info("send to model impl commencing...") try: if model_selection == "Clipboard only": # Escape the prompt for JavaScript escaped_prompt = prompt.replace('"', '\\"').replace("'", "\\'").replace('\n', '\\n') # Create temporary file for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(prompt) download_file = f.name # Create HTML with JavaScript using fallback methods html_template = f''' ''' # Return all three expected outputs: # 1. HTML component for clipboard action # 2. A success message for summary output # 3. The download file return gr.HTML(html_template), "Use Copy Text to Clipboard button below, then paste where you like.", download_file # Get the summary based on model selection if model_selection == "HuggingFace Inference": # Use the selected model ID directly model_id = hf_custom_model if hf_model_choice == "Custom Model" else hf_model_choice # Always try without API key first summary = send_to_hf_inference(prompt, model_id) if summary.startswith("Error: This model requires authentication") and hf_api_key: # Only try with API key if the model specifically requires it summary = send_to_hf_inference(prompt, model_id, hf_api_key, use_rate_limits) elif model_selection == "Groq API": if not groq_api_key: return gr.HTML(""), "Error: Groq API key required", None summary = send_to_groq(prompt, groq_model_choice, groq_api_key, use_rate_limits) elif model_selection == "OpenAI ChatGPT": if not openai_api_key: return "Error: OpenAI API key required", None summary = send_to_openai(prompt, openai_api_key, model=openai_model_choice, use_rate_limit=use_rate_limits) elif model_selection == "Cohere API": summary = send_to_cohere(prompt, cohere_api_key, cohere_model, use_rate_limits) elif model_selection == "GLHF API": if not glhf_api_key: return "Error: GLHF API key required", None # Handle model selection if glhf_model == "Custom Model": model_id = f"hf:{glhf_custom_model}" else: model_id = f"hf:{glhf_model}" summary = send_to_glhf(prompt, glhf_api_key, model_id, use_rate_limits) else: return "Error: Invalid model selection", None # Validate response if not summary: return gr.HTML(""), "Error: No response from model", None if not isinstance(summary, str): return gr.HTML(""), "Error: Invalid response type from model", None # Create download file for valid responses if not summary.startswith("Error"): with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(summary) return gr.HTML(""), summary, f.name return gr.HTML(""), summary, None except Exception as e: error_msg = str(e) if not error_msg: error_msg = "Unknown error occurred" logging.error(f"Error in send_to_model_impl: {error_msg}") # FIX: Return all three values even in error case return gr.HTML(""), f"Error: {error_msg}", None def send_to_qwq(prompt: str): """Send prompt to QwQ API.""" try: from gradio_client import Client client = Client("Qwen/QwQ-32B-preview") # Call the add_text endpoint result = client.predict( _input={"files":[], "text": prompt}, _chatbot=[], api_name="/add_text" ) # Call the agent_run endpoint response = client.predict( _chatbot=result[1], # This is correct api_name="/agent_run" ) if isinstance(response, list) and len(response) > 0: # Extract text from first message in chat history if isinstance(response[0], list) and len(response[0]) > 0: if isinstance(response[0][1], dict): return response[0][1].get('text', 'No response text from QwQ') elif isinstance(response[0][1], str): return response[0][1] return 'No valid response from QwQ' return 'No response from QwQ' except Exception as e: logging.error(f"QwQ API error: {e}") return f"Error with QwQ API: {str(e)}" def send_to_hf_inference(prompt: str, model_name: str, api_key: str = None, use_rate_limit: bool = False) -> str: """Send prompt to HuggingFace Inference API.""" # Special handling for QwQ if model_name == "Qwen/QwQ-32B-preview": return send_to_qwq(prompt) def _send(): # Check token limits first is_within_limits, error_msg = check_token_limits(prompt, model_name) if not is_within_limits: return error_msg try: client = InferenceClient(token=api_key) if api_key else InferenceClient() response = client.text_generation( prompt, model=model_name, max_new_tokens=500, temperature=0.7, top_p=0.95, repetition_penalty=1.1 ) return str(response) except Exception as e: logging.error(f"HuggingFace inference error: {e}") return f"Error with HuggingFace inference: {str(e)}" return apply_rate_limit(_send, 16) if use_rate_limit else _send() def send_to_glhf(prompt: str, api_key: str, model_id: str, use_rate_limit: bool = False) -> str: """Send prompt to GLHF API.""" def _send(): try: import openai client = openai.OpenAI( api_key=api_key, base_url="https://glhf.chat/api/openai/v1", ) # For GLHF, always use streaming for reliability completion = client.chat.completions.create( stream=True, model=model_id, messages=[ {"role": "system", "content": "You are a helpful assistant."}, {"role": "user", "content": prompt} ], ) response_text = [] for chunk in completion: if chunk.choices[0].delta.content is not None: response_text.append(chunk.choices[0].delta.content) return "".join(response_text) except Exception as e: logging.error(f"GLHF API error: {e}") return f"Error with GLHF API: {str(e)}" return apply_rate_limit(_send, 384) if use_rate_limit else _send() def send_to_openai(prompt: str, api_key: str, model: str = "gpt-3.5-turbo", use_rate_limit: bool = False) -> str: """Send prompt to OpenAI API.""" def _send(): try: from openai import OpenAI client = OpenAI(api_key=api_key) response = client.chat.completions.create( model=model, messages=[ {"role": "system", "content": "You are a helpful assistant that provides detailed responses."}, {"role": "user", "content": prompt} ], temperature=0.7, max_tokens=500, top_p=0.95 ) if response.choices and len(response.choices) > 0: return response.choices[0].message.content return "Error: No response generated" except ImportError: return "Error: Please install the latest version of openai package" except Exception as e: logging.error(f"OpenAI API error: {e}") return f"Error with OpenAI API: {str(e)}" return apply_rate_limit(_send, 3000/60) if use_rate_limit else _send() def send_to_cohere(prompt: str, api_key: str = None, model: str = None, use_rate_limit: bool = False) -> str: """Send prompt to Cohere API with V2 and V1 fallback.""" def _send(): try: import cohere # Try V2 first try: client = cohere.ClientV2(api_key) if api_key else cohere.ClientV2() response = client.chat( model=model or "command-r-plus-08-2024", messages=[{ "role": "user", "content": prompt }], temperature=0.7, ) return response.message.content[0].text except Exception as v2_error: logging.warning(f"Cohere V2 failed, trying V1: {v2_error}") # Fallback to V1 client = cohere.Client(api_key) if api_key else cohere.Client() response = client.chat( message=prompt, model=model or "command-r-plus-08-2024", temperature=0.7, max_tokens=500, ) return response.text except Exception as e: logging.error(f"Cohere API error: {e}") return f"Error with Cohere API: {str(e)}" return apply_rate_limit(_send, 16) if use_rate_limit else _send() def send_to_groq(prompt: str, model_name: str, api_key: str, use_rate_limit: bool = False) -> str: """Send prompt to Groq API.""" def _send(): try: client = Groq(api_key=api_key) response = client.chat.completions.create( model=model_name, messages=[{ "role": "user", "content": prompt }], temperature=0.7, max_tokens=500, top_p=0.95 ) return response.choices[0].message.content except Exception as e: logging.error(f"Groq API error: {e}") return f"Error with Groq API: {str(e)}" return apply_rate_limit(_send, 4) if use_rate_limit else _send() def estimate_tokens(text: str) -> int: """Rough token estimation: ~4 characters per token on average""" return len(text) // 4 def check_token_limits(prompt: str, model_name: str) -> tuple[bool, str]: """Check if prompt might exceed model's token limits.""" token_limited_models = { "openai-community/gpt2": 1500, # 2048 - buffer "microsoft/phi-2": 1500, "TinyLlama/TinyLlama-1.1B-Chat-v1.0": 1500 } if model_name in token_limited_models: estimated_tokens = estimate_tokens(prompt) max_tokens = token_limited_models[model_name] if estimated_tokens > max_tokens: return False, f"Prompt too long (estimated {estimated_tokens} tokens). This model supports max {max_tokens} tokens." return True, "" def copy_to_clipboard(text): return gr.HTML(f""" """) def handle_model_selection(choice): """Handle model selection and update UI""" ctx_size = MODEL_CONTEXT_SIZES.get(choice, {}) if isinstance(ctx_size, dict): first_model = list(ctx_size.keys())[0] ctx_size = ctx_size[first_model] if choice == "OpenAI ChatGPT": model_choices = list(MODEL_CONTEXT_SIZES["OpenAI ChatGPT"].keys()) return [ gr.update(visible=False), # hf_options gr.update(visible=False), # groq_options gr.update(visible=True), # openai_options gr.update(visible=False), # cohere_options gr.update(visible=False), # glhf_options gr.update(value=ctx_size), # context_size gr.update(interactive=True), # send_model_btn gr.Dropdown(choices=model_choices, value=first_model), # openai_model gr.update(visible=False) # hf_custom_model visibility ] elif choice == "HuggingFace Inference": model_choices = list(MODEL_CONTEXT_SIZES["HuggingFace Inference"].keys()) return [ gr.update(visible=True), # hf_options gr.update(visible=False), # groq_options gr.update(visible=False), # openai_options gr.update(visible=False), # cohere_options gr.update(visible=False), # glhf_options gr.update(value=ctx_size), # context_size gr.update(interactive=True), # send_model_btn gr.Dropdown(choices=model_choices, value="mistralai/Mistral-7B-Instruct-v0.3"), gr.update(visible=False) # hf_custom_model initially hidden ] elif choice == "Groq API": model_choices = list(model_registry.groq_models.keys()) return [ gr.update(visible=False), # hf_options gr.update(visible=True), # groq_options gr.update(visible=False), # openai_options gr.update(visible=False), # cohere_options gr.update(visible=False), # glhf_options gr.update(value=ctx_size), # context_size gr.update(interactive=True), # send_model_btn gr.Dropdown(choices=model_choices, value=model_choices[0] if model_choices else None), gr.update(visible=False) # hf_custom_model visibility ] elif choice == "Cohere API": return [ gr.update(visible=False), # hf_options gr.update(visible=False), # groq_options gr.update(visible=False), # openai_options gr.update(visible=True), # cohere_options gr.update(visible=False), # glhf_options gr.update(value=ctx_size), # context_size gr.update(interactive=True), # send_model_btn gr.Dropdown(choices=[]), # not used gr.update(visible=False) # hf_custom_model visibility ] elif choice == "GLHF API": model_choices = list(MODEL_CONTEXT_SIZES["GLHF API"].keys()) return [ gr.update(visible=False), # hf_options gr.update(visible=False), # groq_options gr.update(visible=False), # openai_options gr.update(visible=False), # cohere_options gr.update(visible=True), # glhf_options gr.update(value=ctx_size), # context_size gr.update(interactive=True), # send_model_btn gr.Dropdown(choices=[]), # not used gr.update(visible=False) # hf_custom_model visibility ] # Default return for "Clipboard only" or other options return [ gr.update(visible=False), # hf_options gr.update(visible=False), # groq_options gr.update(visible=False), # openai_options gr.update(visible=False), # cohere_options gr.update(visible=False), # glhf_options gr.update(value=4096), # context_size gr.update(interactive=False), # send_model_btn gr.Dropdown(choices=[]), # not used gr.update(visible=False) # hf_custom_model visibility ] def copy_text_js(element_id: str) -> str: return f"""function() {{ let textarea = document.getElementById('{element_id}'); if (!textarea) return 'Element not found'; textarea.select(); try {{ document.execCommand('copy'); return 'Copied to clipboard!'; }} catch(err) {{ return 'Failed to copy: ' + err; }} }}""" def process_pdf(pdf, fmt, ctx_size): """Process PDF and return text and snippets""" try: if not pdf: return "Please upload a PDF file.", "", [], None # Extract text text = extract_text_from_pdf(pdf.name) if text.startswith("Error"): return text, "", [], None # Format content formatted_text = format_content(text, fmt) # Split into snippets snippets = split_into_snippets(formatted_text, ctx_size) # Save full text for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as text_file: text_file.write(formatted_text) snippet_choices = [f"Snippet {i+1} of {len(snippets)}" for i in range(len(snippets))] return ( "PDF processed successfully!", formatted_text, snippets, snippet_choices, [text_file.name] ) except Exception as e: logging.error(f"Error processing PDF: {e}") return f"Error processing PDF: {str(e)}", "", [], None def generate_prompt(text, template, snippet_idx=None): """Generate prompt from text or selected snippet""" try: if not text: return "No text available.", "", None default_prompt = "Summarize the following text:" prompt_template = template if template else default_prompt if isinstance(text, list): # If text is list of snippets if snippet_idx is not None: if 0 <= snippet_idx < len(text): content = text[snippet_idx] else: return "Invalid snippet index.", "", None else: content = "\n\n".join(text) else: content = text prompt = f"{prompt_template}\n---\n{content}\n---" # Save prompt for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as prompt_file: prompt_file.write(prompt) return "Prompt generated!", prompt, [prompt_file.name] except Exception as e: logging.error(f"Error generating prompt: {e}") return f"Error generating prompt: {str(e)}", "", None # Main Interface with gr.Blocks(css=""" .gradio-container {max-width: 90%; margin: 0 auto;} @media (max-width: 768px) {.gradio-container {max-width: 98%; padding: 10px;} .gr-row {flex-direction: column;} .gr-col {width: 100%; margin-bottom: 10px;}} """) as demo: # State variables pdf_content = gr.State("") snippets = gr.State([]) # Header gr.Markdown("# 📄 Smart PDF Summarizer") gr.Markdown("Upload a PDF document and get AI-powered summaries using various AI models.") with gr.Tabs() as tabs: # Tab 1: PDF Processing with gr.Tab("1️⃣ PDF Processing"): with gr.Row(): with gr.Column(scale=1): pdf_input = gr.File( label="📁 Upload PDF", file_types=[".pdf"] ) format_type = gr.Radio( choices=["txt", "md"], value="txt", label="📝 Output Format" ) context_size = gr.Slider( minimum=1000, maximum=200000, step=1000, value=4096, label="Context Size" ) gr.Markdown("### Context Size") with gr.Row(): for size_name, size_value in CONTEXT_SIZES.items(): gr.Button( size_name, size="sm", scale=1 ).click( lambda v=size_value: gr.update(value=v), None, context_size ) process_button = gr.Button("🔍 Process PDF", variant="primary") with gr.Column(scale=1): progress_status = gr.Textbox( label="Status", interactive=False, show_label=True, visible=True # Ensure error messages are always visible ) processed_text = gr.Textbox( label="Processed Text", lines=10, max_lines=50, show_copy_button=True ) download_full_text = gr.File(label="📥 Download Full Text") # Tab 2: Snippet Selection with gr.Tab("2️⃣ Snippet Selection"): with gr.Row(): with gr.Column(scale=1): snippet_selector = gr.Dropdown( label="Select Snippet", choices=[], interactive=True ) custom_prompt = gr.Textbox( label="✍️ Custom Prompt Template", placeholder="Enter your custom prompt here...", lines=2 ) generate_prompt_btn = gr.Button("Generate Prompt", variant="primary") with gr.Column(scale=1): generated_prompt = gr.Textbox( label="📋 Generated Prompt", lines=10, max_lines=50, show_copy_button=True, elem_id="generated_prompt", elem_classes="generated_prompt" ) with gr.Row(): download_prompt = gr.File(label="📥 Download Prompt") download_snippet = gr.File(label="📥 Download Selected Snippet") # Tab 3: Model Processing with gr.Tab("3️⃣ Model Processing"): with gr.Row(): with gr.Column(scale=1): model_choice = gr.Radio( choices=list(MODEL_CONTEXT_SIZES.keys()), value="Clipboard only", label="🤖 Provider Selection" ) # Model-specific option containers with gr.Column(visible=False) as openai_options: openai_model = gr.Dropdown( choices=list(MODEL_CONTEXT_SIZES["OpenAI ChatGPT"].keys()), value="gpt-3.5-turbo", label="OpenAI Model" ) openai_api_key = gr.Textbox( label="🔑 OpenAI API Key", type="password" ) with gr.Column(visible=False) as hf_options: hf_model = gr.Dropdown( choices=list(MODEL_CONTEXT_SIZES["HuggingFace Inference"].keys()), label="🔧 HuggingFace Model", value="mistralai/Mistral-7B-Instruct-v0.3", allow_custom_value=True ) hf_custom_model = gr.Textbox( label="Custom Model ID", placeholder="Enter custom model ID...", visible=False ) hf_api_key = gr.Textbox( label="🔑 HuggingFace API Key", type="password" ) with gr.Column(visible=False) as groq_options: groq_model = gr.Dropdown( choices=list(model_registry.groq_models.keys()), value=list(model_registry.groq_models.keys())[0] if model_registry.groq_models else None, label="Groq Model" ) groq_api_key = gr.Textbox( label="🔑 Groq API Key", type="password" ) groq_refresh_btn = gr.Button("🔄 Refresh Groq Models") with gr.Column(visible=False) as glhf_options: glhf_api_key = gr.Textbox( label="🔑 GLHF API Key", type="password" ) glhf_model = gr.Dropdown( choices=list(MODEL_CONTEXT_SIZES["GLHF API"].keys()), value="mistralai/Mistral-7B-Instruct-v0.3", label="Model Selection" ) glhf_custom_model = gr.Textbox( label="Custom Model ID", placeholder="Enter custom model ID...", visible=False ) with gr.Column(visible=False) as cohere_options: cohere_api_key = gr.Textbox( label="🔑 Cohere API Key", type="password" ) cohere_model = gr.Dropdown( choices=list(MODEL_CONTEXT_SIZES["Cohere API"].keys()), value="command-r-plus-08-2024", label="Cohere Model" ) # Action Buttons Row with gr.Row(): # Copy to Clipboard button with robust fallbacks copy_button = gr.HTML("""
""") send_to_model_btn = gr.Button("🚀 Send to Model", variant="primary", interactive=False) # Restore the robust ChatGPT button implementation chatgpt_button = gr.HTML("""
""") # JavaScript for model choice handling gr.HTML(""" """) # Summary section with gr.Column(scale=1): summary_output = gr.Textbox( label="📝 Summary", lines=15, max_lines=50, show_copy_button=True, elem_id="summary_output" ) # Summary actions row with gr.Row(): copy_summary_btn = gr.Button("📋 Copy Summary", size="sm") download_summary = gr.File(label="📥 Download Summary") # Status display clipboard_status = gr.HTML(elem_id="clipboard_status") # Hidden components for file handling download_files = gr.Files(label="📥 Downloads", visible=False) # Event Handlers def update_context_size(size: int) -> None: """Update context size slider with validation""" if not isinstance(size, (int, float)): size = 4096 # Default size return gr.update(value=int(size)) def get_model_context_size(choice: str, groq_model: str = None) -> int: """Get context size for model with better defaults""" if choice == "Groq API" and groq_model: return MODEL_CONTEXT_SIZES["Groq API"].get(groq_model, 4096) elif choice == "OpenAI ChatGPT": return 4096 elif choice == "HuggingFace Inference": return 4096 return 32000 # Safe default def update_snippet_choices(snippets_list: List[str]) -> List[str]: """Create formatted snippet choices""" return [f"Snippet {i+1} of {len(snippets_list)}" for i in range(len(snippets_list))] def get_snippet_index(choice: str) -> int: """Extract snippet index from choice string""" if not choice: return 0 try: return int(choice.split()[1]) - 1 except: return 0 def toggle_model_options(choice): return ( gr.update(visible=choice == "HuggingFace Inference"), # hf_options gr.update(visible=choice == "Groq API"), # groq_options gr.update(visible=choice == "OpenAI ChatGPT"), # openai_options gr.update(visible=choice == "Cohere API"), # cohere_options gr.update(visible=choice == "GLHF API") # glhf_options ) def refresh_groq_models_list(): try: with gr.Progress() as progress: progress(0, "Refreshing Groq models...") updated_models = model_registry.refresh_groq_models() progress(1, "Complete!") return gr.update(choices=list(updated_models.keys())) except Exception as e: logging.error(f"Error refreshing models: {e}") return gr.update() def toggle_custom_model(model_name): return gr.update(visible=model_name == "Custom Model") def handle_groq_model_change(model_name): """Handle Groq model selection change""" return update_context_size("Groq API", model_name) # PDF Processing Handlers def handle_pdf_process(pdf, fmt, ctx_size): # Remove md_eng parameter if not pdf: return "Please upload a PDF file.", "", "", [], gr.update(choices=[], value=None), None try: text = extract_text_from_pdf(pdf.name, format_type=fmt) # Just use format_type if text.startswith("Error"): return text, "", "", [], gr.update(choices=[], value=None), None # The important part: still do snippets processing snippets_list = split_into_snippets(text, ctx_size) snippet_choices = update_snippet_choices(snippets_list) with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix=f'.{fmt}') as f: f.write(text) download_file = f.name return ( f"PDF processed successfully! Generated {len(snippets_list)} snippets.", text, text, snippets_list, gr.update(choices=snippet_choices, value=snippet_choices[0] if snippet_choices else None), download_file ) except Exception as e: error_msg = f"Error processing PDF: {str(e)}" logging.error(error_msg) return error_msg, "", "", [], gr.update(choices=[], value=None), None def handle_snippet_selection(choice, snippets_list): # Add download_snippet output """Handle snippet selection, update prompt, and provide snippet download.""" if not snippets_list: return "No snippets available.", "", None # Return None for download try: idx = get_snippet_index(choice) selected_snippet = snippets_list[idx] with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(selected_snippet) snippet_download_file = f.name # Store the file path return ( f"Selected snippet {idx + 1}", selected_snippet, snippet_download_file # Return file for download ) except Exception as e: error_msg = f"Error selecting snippet: {str(e)}" logging.error(error_msg) return ( error_msg, "", None ) # Copy button handlers def handle_prompt_generation(snippet_text, template, snippet_choice, snippets_list): try: if not snippets_list: return "No text available.", "", None idx = get_snippet_index(snippet_choice) base_prompt = template if template else "Summarize the following text:" content = snippets_list[idx] prompt = f"{base_prompt}\n---\n{content}\n---" # Save prompt for download with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt') as f: f.write(prompt) download_file = f.name return "Prompt generated!", prompt, download_file # Return the file for download_prompt except Exception as e: logging.error(f"Error generating prompt: {e}") return f"Error: {str(e)}", "", None def handle_copy_action(text): """Handle copy to clipboard action""" return { progress_status: gr.update(value="Text copied to clipboard!", visible=True) } # Connect all event handlers # Core event handlers process_button.click( handle_pdf_process, inputs=[pdf_input, format_type, context_size], outputs=[progress_status, processed_text, pdf_content, snippets, snippet_selector, download_full_text] ) generate_prompt_btn.click( handle_prompt_generation, inputs=[generated_prompt, custom_prompt, snippet_selector, snippets], outputs=[progress_status, generated_prompt, download_prompt] ) # copy_button.click( # fn=copy_to_clipboard, # inputs=[generated_prompt], # outputs=[clipboard_status] # ) # copy_summary_btn.click( # fn=None, # inputs=[], # outputs=[], # _js=copy_summary_js # ) # Snippet handling snippet_selector.change( handle_snippet_selection, inputs=[snippet_selector, snippets], outputs=[progress_status, generated_prompt, download_snippet] # Connect download_snippet ) # Model selection model_choice.change( handle_model_selection, inputs=[model_choice], outputs=[ hf_options, groq_options, openai_options, cohere_options, glhf_options, context_size, send_to_model_btn, hf_model, # For updating model choices hf_custom_model # Add this to update custom model visibility ] ) hf_model.change( toggle_custom_model, inputs=[hf_model], outputs=[hf_custom_model] ) groq_model.change( handle_groq_model_change, inputs=[groq_model], outputs=[context_size] ) def download_file(content: str, prefix: str) -> List[str]: if not content: return [] try: filename = f"{prefix}_{int(time.time())}.txt" # Add timestamp with tempfile.NamedTemporaryFile(delete=False, mode='w', suffix='.txt', prefix=filename) as f: f.write(content) return [f.name] except Exception as e: logging.error(f"Error creating download file: {e}") return [] # Model processing send_to_model_btn.click( fn=send_to_model, inputs=[ generated_prompt, model_choice, hf_model, hf_custom_model, hf_api_key, groq_model, groq_api_key, openai_api_key, openai_model, cohere_api_key, cohere_model, glhf_api_key, glhf_model, glhf_custom_model ], outputs=[ clipboard_status, # HTML component for clipboard status summary_output, # Textbox for summary download_summary # File component for download ] ) groq_refresh_btn.click( refresh_groq_models_list, outputs=[groq_model] ) # Instructions gr.Markdown(""" ### 📌 Instructions: 1. Upload a PDF document 2. Choose output format and context window size 3. Select snippet number (default: 1) or enter custom prompt 4. Select your preferred model in case you want to proceed directly (or continue with 5): - OpenAI ChatGPT: Manual copy/paste workflow - HuggingFace Inference: Direct API integration - Groq API: High-performance inference 5. Click 'Process PDF' to generate summary 6. Use 'Copy Prompt' and, optionally, 'Open ChatGPT' for manual processing 7. Download generated files as needed """) # Launch the interface if __name__ == "__main__": demo.launch(share=False, debug=True)