agentic-system / multimodal_reasoning.py
Cascade Bot
Added Groq streaming support and optimizations - clean version
1d75522
"""
Multi-Modal Reasoning Implementation
----------------------------------
Implements reasoning across different types of information.
"""
import logging
from typing import Dict, Any, List, Optional
from datetime import datetime
import json
import numpy as np
from .reasoning import ReasoningStrategy
class MultiModalReasoning(ReasoningStrategy):
"""Implements multi-modal reasoning across different types of information."""
def __init__(self, config: Optional[Dict[str, Any]] = None):
"""Initialize multi-modal reasoning."""
super().__init__()
self.config = config or {}
# Standard reasoning parameters
self.min_confidence = self.config.get('min_confidence', 0.7)
self.parallel_threshold = self.config.get('parallel_threshold', 3)
self.learning_rate = self.config.get('learning_rate', 0.1)
self.strategy_weights = self.config.get('strategy_weights', {
"LOCAL_LLM": 0.8,
"CHAIN_OF_THOUGHT": 0.6,
"TREE_OF_THOUGHTS": 0.5,
"META_LEARNING": 0.4
})
# Multi-modal specific parameters
self.modality_weights = self.config.get('modality_weights', {
'text': 0.8,
'image': 0.7,
'audio': 0.6,
'video': 0.5,
'structured': 0.7
})
self.cross_modal_threshold = self.config.get('cross_modal_threshold', 0.6)
self.integration_steps = self.config.get('integration_steps', 3)
self.alignment_method = self.config.get('alignment_method', 'attention')
async def reason(self, query: str, context: Dict[str, Any]) -> Dict[str, Any]:
try:
# Process different modalities
modalities = await self._process_modalities(query, context)
# Align across modalities
alignment = await self._cross_modal_alignment(modalities, context)
# Integrated analysis
integration = await self._integrated_analysis(alignment, context)
# Generate final response
response = await self._generate_response(integration, context)
return {
"success": True,
"answer": response["conclusion"],
"modalities": modalities,
"alignment": alignment,
"integration": integration,
"confidence": response["confidence"]
}
except Exception as e:
logging.error(f"Error in multi-modal reasoning: {str(e)}")
return {"success": False, "error": str(e)}
async def _process_modalities(self, query: str, context: Dict[str, Any]) -> Dict[str, List[Dict[str, Any]]]:
"""Process query across different modalities."""
prompt = f"""
Process query across modalities:
Query: {query}
Context: {json.dumps(context)}
For each modality extract:
1. [Type]: Modality type
2. [Content]: Relevant content
3. [Features]: Key features
4. [Quality]: Content quality
Format as:
[M1]
Type: ...
Content: ...
Features: ...
Quality: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_modalities(response["answer"])
async def _cross_modal_alignment(self, modalities: Dict[str, List[Dict[str, Any]]], context: Dict[str, Any]) -> List[Dict[str, Any]]:
"""Align information across different modalities."""
try:
# Extract modality types
modal_types = list(modalities.keys())
# Initialize alignment results
alignments = []
# Process each modality pair
for i in range(len(modal_types)):
for j in range(i + 1, len(modal_types)):
type1, type2 = modal_types[i], modal_types[j]
# Get items from each modality
items1 = modalities[type1]
items2 = modalities[type2]
# Find alignments between items
for item1 in items1:
for item2 in items2:
similarity = self._calculate_similarity(item1, item2)
if similarity > self.cross_modal_threshold: # Threshold for alignment
alignments.append({
"type1": type1,
"type2": type2,
"item1": item1,
"item2": item2,
"similarity": similarity
})
# Sort alignments by similarity
alignments.sort(key=lambda x: x["similarity"], reverse=True)
return alignments
except Exception as e:
logging.error(f"Error in cross-modal alignment: {str(e)}")
return []
def _calculate_similarity(self, item1: Dict[str, Any], item2: Dict[str, Any]) -> float:
"""Calculate similarity between two items from different modalities."""
try:
# Extract content from items
content1 = str(item1.get("content", ""))
content2 = str(item2.get("content", ""))
# Calculate basic similarity (can be enhanced with more sophisticated methods)
common_words = set(content1.lower().split()) & set(content2.lower().split())
total_words = set(content1.lower().split()) | set(content2.lower().split())
if not total_words:
return 0.0
return len(common_words) / len(total_words)
except Exception as e:
logging.error(f"Error calculating similarity: {str(e)}")
return 0.0
async def _integrated_analysis(self, alignment: List[Dict[str, Any]], context: Dict[str, Any]) -> List[Dict[str, Any]]:
prompt = f"""
Perform integrated multi-modal analysis:
Alignment: {json.dumps(alignment)}
Context: {json.dumps(context)}
For each insight:
1. [Insight]: Key finding
2. [Sources]: Contributing modalities
3. [Support]: Supporting evidence
4. [Confidence]: Confidence level
Format as:
[I1]
Insight: ...
Sources: ...
Support: ...
Confidence: ...
"""
response = await context["groq_api"].predict(prompt)
return self._parse_integration(response["answer"])
async def _generate_response(self, integration: List[Dict[str, Any]], context: Dict[str, Any]) -> Dict[str, Any]:
prompt = f"""
Generate unified multi-modal response:
Integration: {json.dumps(integration)}
Context: {json.dumps(context)}
Provide:
1. Main conclusion
2. Modal contributions
3. Integration benefits
4. Confidence level (0-1)
"""
response = await context["groq_api"].predict(prompt)
return self._parse_response(response["answer"])
def _parse_modalities(self, response: str) -> Dict[str, List[Dict[str, Any]]]:
"""Parse modalities from response."""
modalities = {}
current_modality = None
for line in response.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('[M'):
if current_modality:
if current_modality["type"] not in modalities:
modalities[current_modality["type"]] = []
modalities[current_modality["type"]].append(current_modality)
current_modality = {
"type": "",
"content": "",
"features": "",
"quality": ""
}
elif current_modality:
if line.startswith('Type:'):
current_modality["type"] = line[5:].strip()
elif line.startswith('Content:'):
current_modality["content"] = line[8:].strip()
elif line.startswith('Features:'):
current_modality["features"] = line[9:].strip()
elif line.startswith('Quality:'):
current_modality["quality"] = line[8:].strip()
if current_modality:
if current_modality["type"] not in modalities:
modalities[current_modality["type"]] = []
modalities[current_modality["type"]].append(current_modality)
return modalities
def _parse_integration(self, response: str) -> List[Dict[str, Any]]:
"""Parse integration from response."""
integration = []
current_insight = None
for line in response.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('[I'):
if current_insight:
integration.append(current_insight)
current_insight = {
"insight": "",
"sources": "",
"support": "",
"confidence": 0.0
}
elif current_insight:
if line.startswith('Insight:'):
current_insight["insight"] = line[8:].strip()
elif line.startswith('Sources:'):
current_insight["sources"] = line[8:].strip()
elif line.startswith('Support:'):
current_insight["support"] = line[8:].strip()
elif line.startswith('Confidence:'):
try:
current_insight["confidence"] = float(line[11:].strip())
except:
pass
if current_insight:
integration.append(current_insight)
return integration
def _parse_response(self, response: str) -> Dict[str, Any]:
"""Parse response from response."""
response_dict = {
"conclusion": "",
"modal_contributions": [],
"integration_benefits": [],
"confidence": 0.0
}
mode = None
for line in response.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith('Conclusion:'):
response_dict["conclusion"] = line[11:].strip()
elif line.startswith('Modal Contributions:'):
mode = "modal"
elif line.startswith('Integration Benefits:'):
mode = "integration"
elif line.startswith('Confidence:'):
try:
response_dict["confidence"] = float(line[11:].strip())
except:
response_dict["confidence"] = 0.5
mode = None
elif mode == "modal" and line.startswith('- '):
response_dict["modal_contributions"].append(line[2:].strip())
elif mode == "integration" and line.startswith('- '):
response_dict["integration_benefits"].append(line[2:].strip())
return response_dict