"""Recursive reasoning strategy implementation.""" import logging from typing import Dict, Any, List, Optional, Set, Tuple, Callable import json from dataclasses import dataclass, field from enum import Enum from datetime import datetime import asyncio from collections import defaultdict from .base import ReasoningStrategy, StrategyResult class SubproblemType(Enum): """Types of subproblems in recursive reasoning.""" ATOMIC = "atomic" COMPOSITE = "composite" PARALLEL = "parallel" SEQUENTIAL = "sequential" CONDITIONAL = "conditional" ITERATIVE = "iterative" class SolutionStatus(Enum): """Status of subproblem solutions.""" PENDING = "pending" IN_PROGRESS = "in_progress" SOLVED = "solved" FAILED = "failed" BLOCKED = "blocked" OPTIMIZING = "optimizing" @dataclass class Subproblem: """Represents a subproblem in recursive reasoning.""" id: str type: SubproblemType query: str context: Dict[str, Any] parent_id: Optional[str] children: List[str] status: SolutionStatus solution: Optional[Dict[str, Any]] confidence: float dependencies: List[str] metadata: Dict[str, Any] = field(default_factory=dict) timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) @dataclass class RecursiveStep: """Represents a step in recursive reasoning.""" id: str subproblem_id: str action: str result: Dict[str, Any] timestamp: str = field(default_factory=lambda: datetime.now().isoformat()) class RecursiveStrategy(ReasoningStrategy): """Advanced recursive reasoning that: 1. Breaks down complex problems 2. Solves sub-problems recursively 3. Combines solutions 4. Handles base cases 5. Optimizes performance """ def __init__(self, config: Optional[Dict[str, Any]] = None): """Initialize recursive reasoning.""" super().__init__() self.config = config or {} # Standard reasoning parameters self.min_confidence = self.config.get('min_confidence', 0.7) self.max_depth = self.config.get('max_depth', 5) self.optimization_rounds = self.config.get('optimization_rounds', 2) # Problem tracking self.subproblems: Dict[str, Subproblem] = {} self.steps: List[RecursiveStep] = [] self.solution_cache: Dict[str, Dict[str, Any]] = {} self.cycle_detection: Set[str] = set() # Performance metrics self.performance_metrics = { 'depth_distribution': defaultdict(int), 'type_distribution': defaultdict(int), 'success_rate': defaultdict(float), 'total_subproblems': 0, 'solved_subproblems': 0, 'failed_subproblems': 0, 'optimization_rounds': 0, 'cache_hits': 0, 'cycles_detected': 0 } async def reason( self, query: str, context: Dict[str, Any] ) -> StrategyResult: """ Apply recursive reasoning to analyze the query. Args: query: The query to reason about context: Additional context and parameters Returns: StrategyResult containing the reasoning output and metadata """ try: # Initialize root problem root_problem = await self._initialize_problem(query, context) root_id = root_problem.id # Solve recursively solution = await self._solve_recursive(root_id, depth=0) # Optimize solution if solution and solution.get('success', False): solution = await self._optimize_solution(solution, root_problem, context) # Update metrics self._update_metrics(root_id) # Build solution trace solution_trace = self._get_solution_trace(root_id) # Calculate overall confidence confidence = self._calculate_confidence(solution_trace) return StrategyResult( strategy_type="recursive", success=bool(solution and solution.get('success', False)), answer=solution.get('answer') if solution else None, confidence=confidence, reasoning_trace=solution_trace, metadata={ 'problem_tree': self._get_problem_tree(root_id), 'steps': [self._step_to_dict(step) for step in self.steps], 'solution_details': solution if solution else {} }, performance_metrics=self.performance_metrics ) except Exception as e: logging.error(f"Recursive reasoning error: {str(e)}") return StrategyResult( strategy_type="recursive", success=False, answer=None, confidence=0.0, reasoning_trace=[{ 'step': 'error', 'error': str(e), 'timestamp': datetime.now().isoformat() }], metadata={'error': str(e)}, performance_metrics=self.performance_metrics ) async def _initialize_problem( self, query: str, context: Dict[str, Any] ) -> Subproblem: """Initialize the root problem.""" problem = Subproblem( id="root", type=SubproblemType.COMPOSITE, query=query, context=context, parent_id=None, children=[], status=SolutionStatus.PENDING, solution=None, confidence=1.0, dependencies=[], metadata={'depth': 0} ) self.subproblems[problem.id] = problem self._record_step(RecursiveStep( id=f"init_{problem.id}", subproblem_id=problem.id, action="initialize", result={'type': problem.type.value, 'query': query} )) return problem async def _solve_recursive( self, problem_id: str, depth: int ) -> Optional[Dict[str, Any]]: """Recursively solve a problem and its subproblems.""" if depth > self.max_depth: return None problem = self.subproblems[problem_id] # Check cycle if problem_id in self.cycle_detection: self.performance_metrics['cycles_detected'] += 1 return None self.cycle_detection.add(problem_id) try: # Check cache if problem_id in self.solution_cache: self.performance_metrics['cache_hits'] += 1 return self.solution_cache[problem_id] # Decompose if composite if problem.type != SubproblemType.ATOMIC: await self._decompose_problem(problem, problem.context) # Solve atomic problem if problem.type == SubproblemType.ATOMIC: solution = await self._solve_atomic(problem) if solution: problem.solution = solution problem.status = SolutionStatus.SOLVED return solution else: problem.status = SolutionStatus.FAILED return None # Solve subproblems subsolutions = [] for child_id in problem.children: child_solution = await self._solve_recursive(child_id, depth + 1) if child_solution: subsolutions.append(child_solution) # Synthesize solutions if subsolutions: solution = await self._synthesize_solutions(subsolutions, problem, problem.context) if solution: problem.solution = solution problem.status = SolutionStatus.SOLVED self.solution_cache[problem_id] = solution return solution problem.status = SolutionStatus.FAILED return None finally: self.cycle_detection.remove(problem_id) async def _decompose_problem( self, problem: Subproblem, context: Dict[str, Any] ) -> None: """Decompose a problem into subproblems.""" subproblems = self._generate_subproblems(problem, context) for subproblem in subproblems: self.subproblems[subproblem.id] = subproblem problem.children.append(subproblem.id) self._record_step(RecursiveStep( id=f"decompose_{problem.id}", subproblem_id=problem.id, action="decompose", result={'num_subproblems': len(subproblems)} )) def _generate_subproblems( self, parent: Subproblem, context: Dict[str, Any] ) -> List[Subproblem]: """Generate subproblems for a composite problem.""" # This is a placeholder implementation # In practice, this would use more sophisticated decomposition subproblems = [] # Example: Split into 2-3 subproblems parts = parent.query.split('.')[:3] for i, part in enumerate(parts): if part.strip(): subproblem = Subproblem( id=f"{parent.id}_sub{i}", type=SubproblemType.ATOMIC, query=part.strip(), context=context, parent_id=parent.id, children=[], status=SolutionStatus.PENDING, solution=None, confidence=0.0, dependencies=[], metadata={'depth': parent.metadata['depth'] + 1} ) subproblems.append(subproblem) return subproblems async def _solve_atomic( self, problem: Subproblem ) -> Optional[Dict[str, Any]]: """Solve an atomic problem.""" # This is a placeholder implementation # In practice, this would use more sophisticated solving strategies solution = { 'success': True, 'answer': f"Solution for {problem.query}", 'confidence': 0.8 } self._record_step(RecursiveStep( id=f"solve_{problem.id}", subproblem_id=problem.id, action="solve_atomic", result=solution )) return solution async def _synthesize_solutions( self, subsolutions: List[Dict[str, Any]], problem: Subproblem, context: Dict[str, Any] ) -> Optional[Dict[str, Any]]: """Synthesize solutions from subproblems.""" if not subsolutions: return None # Combine answers combined_answer = " ".join( sol['answer'] for sol in subsolutions if sol.get('answer') ) # Average confidence avg_confidence = sum( sol['confidence'] for sol in subsolutions ) / len(subsolutions) synthesis = { 'success': True, 'answer': combined_answer, 'confidence': avg_confidence, 'subsolutions': subsolutions } self._record_step(RecursiveStep( id=f"synthesize_{problem.id}", subproblem_id=problem.id, action="synthesize", result={'num_solutions': len(subsolutions)} )) return synthesis async def _optimize_solution( self, solution: Dict[str, Any], problem: Subproblem, context: Dict[str, Any] ) -> Dict[str, Any]: """Optimize the final solution.""" optimized = solution.copy() for _ in range(self.optimization_rounds): self.performance_metrics['optimization_rounds'] += 1 # Example optimization: Improve confidence if optimized['confidence'] < 0.9: optimized['confidence'] *= 1.1 self._record_step(RecursiveStep( id=f"optimize_{problem.id}", subproblem_id=problem.id, action="optimize", result={'confidence_improvement': optimized['confidence'] - solution['confidence']} )) return optimized def _calculate_confidence( self, solution_trace: List[Dict[str, Any]] ) -> float: """Calculate overall confidence from solution trace.""" if not solution_trace: return 0.0 confidences = [ step.get('confidence', 0.0) for step in solution_trace if isinstance(step.get('confidence'), (int, float)) ] return sum(confidences) / len(confidences) if confidences else 0.0 def _update_metrics(self, root_id: str) -> None: """Update performance metrics.""" def update_recursive(problem_id: str): problem = self.subproblems[problem_id] depth = problem.metadata.get('depth', 0) self.performance_metrics['depth_distribution'][depth] += 1 self.performance_metrics['type_distribution'][problem.type] += 1 self.performance_metrics['total_subproblems'] += 1 if problem.status == SolutionStatus.SOLVED: self.performance_metrics['solved_subproblems'] += 1 elif problem.status == SolutionStatus.FAILED: self.performance_metrics['failed_subproblems'] += 1 for child_id in problem.children: update_recursive(child_id) update_recursive(root_id) # Calculate success rates total = self.performance_metrics['total_subproblems'] if total > 0: for problem_type in SubproblemType: type_count = self.performance_metrics['type_distribution'][problem_type] if type_count > 0: success_count = sum( 1 for p in self.subproblems.values() if p.type == problem_type and p.status == SolutionStatus.SOLVED ) self.performance_metrics['success_rate'][problem_type] = success_count / type_count def _get_problem_tree(self, root_id: str) -> Dict[str, Any]: """Get the problem decomposition tree.""" def build_tree(problem_id: str) -> Dict[str, Any]: problem = self.subproblems[problem_id] return { 'id': problem.id, 'type': problem.type.value, 'status': problem.status.value, 'confidence': problem.confidence, 'children': [build_tree(child_id) for child_id in problem.children] } return build_tree(root_id) def _get_solution_trace(self, root_id: str) -> List[Dict[str, Any]]: """Get the solution trace for a problem.""" trace = [] def build_trace(problem_id: str): problem = self.subproblems[problem_id] step = { 'id': problem.id, 'type': problem.type.value, 'status': problem.status.value, 'confidence': problem.confidence, 'timestamp': problem.timestamp } if problem.solution: step.update(problem.solution) trace.append(step) for child_id in problem.children: build_trace(child_id) build_trace(root_id) return trace def _record_step(self, step: RecursiveStep) -> None: """Record a reasoning step.""" self.steps.append(step) def _step_to_dict(self, step: RecursiveStep) -> Dict[str, Any]: """Convert step to dictionary for serialization.""" return { 'id': step.id, 'subproblem_id': step.subproblem_id, 'action': step.action, 'result': step.result, 'timestamp': step.timestamp } def clear_cache(self) -> None: """Clear solution cache.""" self.solution_cache.clear() self.performance_metrics['cache_hits'] = 0