import logging from pathlib import Path from typing import List, Dict, Union, Optional import re import openai import requests from PyPDF2 import PdfReader from gradio_client import Client # Configure logging logging.basicConfig( level=logging.INFO, format='%(asctime)s - %(name)s - %(levelname)s - %(message)s' ) logger = logging.getLogger(__name__) def extract_text_from_pdf(file_path: str) -> str: """ Extract text from a PDF file with robust error handling. Args: file_path: Path to the PDF file Returns: Extracted text as a string Raises: ValueError: If file doesn't exist or isn't readable RuntimeError: If text extraction fails """ try: if not Path(file_path).exists(): raise ValueError(f"PDF file not found: {file_path}") reader = PdfReader(file_path) text_content = [] for page_num, page in enumerate(reader.pages, 1): try: text = page.extract_text() if text.strip(): text_content.append(text) else: logger.warning(f"Page {page_num} appears to be empty or unreadable") except Exception as e: logger.error(f"Error extracting text from page {page_num}: {str(e)}") continue if not text_content: raise RuntimeError("No readable text found in PDF") return "\n\n".join(text_content) except Exception as e: logger.error(f"PDF extraction failed: {str(e)}") raise RuntimeError(f"Failed to process PDF: {str(e)}") def format_content(text: str, format_type: str) -> str: """ Format extracted text into the specified output format. Args: text: Raw text content format_type: Output format ('txt', 'md', 'html') Returns: Formatted text string Raises: ValueError: If format type is invalid """ if not isinstance(text, str): raise ValueError("Input text must be a string") # Clean up common PDF extraction artifacts text = re.sub(r'\s+', ' ', text) # Normalize whitespace text = re.sub(r'(?<=[.!?])\s+', '\n\n', text) # Split sentences into paragraphs text = text.strip() if format_type.lower() == 'txt': return text elif format_type.lower() == 'md': paragraphs = text.split('\n\n') md_text = [] for para in paragraphs: # Detect and format headers if re.match(r'^[A-Z][^.!?]*$', para.strip()): md_text.append(f"## {para.strip()}") else: md_text.append(para.strip()) return '\n\n'.join(md_text) elif format_type.lower() == 'html': paragraphs = text.split('\n\n') html_parts = ['', '', '
'] for para in paragraphs: if re.match(r'^[A-Z][^.!?]*$', para.strip()): html_parts.append(f"{para.strip()}
") html_parts.extend(['', '']) return '\n'.join(html_parts) else: raise ValueError(f"Unsupported format type: {format_type}") def split_into_snippets(text: str, chunk_size: int = 4000, overlap: int = 200) -> List[str]: """ Split text into overlapping chunks that fit within model context windows. Args: text: Input text to split chunk_size: Maximum size of each chunk overlap: Number of characters to overlap between chunks Returns: List of text snippets Raises: ValueError: If chunk_size is too small or text is empty """ if not text: raise ValueError("Input text is empty") if chunk_size < 1000: raise ValueError("Chunk size must be at least 1000 characters") # Split into paragraphs first paragraphs = text.split('\n\n') chunks = [] current_chunk = [] current_size = 0 for para in paragraphs: para_size = len(para) if current_size + para_size <= chunk_size: current_chunk.append(para) current_size += para_size + 2 # +2 for newlines else: if current_chunk: chunks.append('\n\n'.join(current_chunk)) # Start new chunk with overlap if chunks: overlap_text = chunks[-1][-overlap:] if overlap > 0 else "" current_chunk = [overlap_text, para] current_size = len(overlap_text) + para_size + 2 else: current_chunk = [para] current_size = para_size # Add the last chunk if it exists if current_chunk: chunks.append('\n\n'.join(current_chunk)) return chunks def build_prompts(chunks: List[str], custom_prompt: Optional[str] = None) -> List[str]: """ Build formatted prompts for each text chunk. Args: chunks: List of text chunks custom_prompt: Optional custom instruction Returns: List of formatted prompt strings """ default_prompt = """Please analyze and summarize the following text. Focus on: 1. Key points and main ideas 2. Important details and supporting evidence 3. Any conclusions or recommendations Please maintain the original meaning while being concise.""" instruction = custom_prompt if custom_prompt else default_prompt prompts = [] for i, chunk in enumerate(chunks, 1): prompt = f"""### Instruction {instruction} ### Input Text (Part {i} of {len(chunks)}) {chunk} ### End of Input Text Please provide your summary below:""" prompts.append(prompt) return prompts def process_with_model( prompt: str, model_choice: str, api_key: Optional[str] = None, oauth_token: Optional[str] = None ) -> str: """ Process text with selected model. Args: prompt: Input prompt model_choice: Selected model name api_key: OpenAI API key for GPT models oauth_token: Hugging Face token for other models Returns: Generated summary Raises: ValueError: If required credentials are missing RuntimeError: If model processing fails """ try: if 'gpt' in model_choice.lower(): if not api_key: raise ValueError("OpenAI API key required for GPT models") openai.api_key = api_key response = openai.ChatCompletion.create( model="gpt-3.5-turbo" if "3.5" in model_choice else "gpt-4", messages=[{"role": "user", "content": prompt}], temperature=0.7, max_tokens=1500 ) return response.choices[0].message.content else: # Hugging Face models if not oauth_token: raise ValueError("Hugging Face token required") headers = {"Authorization": f"Bearer {oauth_token}"} # Map model choice to actual model ID model_map = { "Claude-3": "anthropic/claude-3-opus-20240229", "Mistral": "mistralai/Mixtral-8x7B-Instruct-v0.1" } model_id = model_map.get(model_choice) if not model_id: raise ValueError(f"Unknown model: {model_choice}") response = requests.post( f"https://api-inference.huggingface.co/models/{model_id}", headers=headers, json={"inputs": prompt} ) if response.status_code != 200: raise RuntimeError(f"Model API error: {response.text}") return response.json()[0]["generated_text"] except Exception as e: logger.error(f"Model processing failed: {str(e)}") raise RuntimeError(f"Failed to process with model: {str(e)}") def validate_api_keys(openai_key: Optional[str] = None, hf_token: Optional[str] = None) -> Dict[str, bool]: """ Validate API keys for different services. Args: openai_key: OpenAI API key hf_token: Hugging Face token Returns: Dictionary with validation results """ results = {"openai": False, "huggingface": False} if openai_key: try: openai.api_key = openai_key openai.Model.list() results["openai"] = True except: pass if hf_token: try: response = requests.get( "https://huggingface.co./api/models", headers={"Authorization": f"Bearer {hf_token}"} ) results["huggingface"] = response.status_code == 200 except: pass return results