|
|
|
import unittest |
|
from pathlib import Path |
|
import tempfile |
|
import os |
|
|
|
class TestSkeletonMapper(unittest.TestCase): |
|
def setUp(self): |
|
self.temp_dir = tempfile.mkdtemp() |
|
self.patterns = create_language_patterns() |
|
|
|
def create_test_file(self, content: str, extension: str) -> str: |
|
path = Path(self.temp_dir) / f"test{extension}" |
|
path.write_text(content) |
|
return str(path) |
|
|
|
def test_kotlin_edge_cases(self): |
|
kotlin_code = ''' |
|
@DslMarker |
|
annotation class NioProxyDsl |
|
|
|
interface EnhancedNioProxy<T : Any> { |
|
val original: T |
|
fun verifyIdentity(): Boolean = enhanced.equals(original) |
|
} |
|
|
|
class ProxyContext { |
|
private val _events = MutableSharedFlow<ProxyEvent>() |
|
} |
|
''' |
|
file_path = self.create_test_file(kotlin_code, ".kt") |
|
results = extract_skeleton(file_path, self.patterns) |
|
|
|
|
|
self.assertIn("interface EnhancedNioProxy<T : Any>", results['interface']) |
|
|
|
|
|
self.assertIn("val original: T", results['property']) |
|
|
|
|
|
self.assertIn("@DslMarker", results['annotation']) |
|
|
|
def fix_kotlin_patterns(): |
|
return { |
|
'class': r'^\s*(?:data\s+)?class\s+(\w+)(?:<[^>]+>)?', |
|
'function': r'^\s*fun\s+(\w+)(?:<[^>]+>)?', |
|
'property': r'^\s*(?:var|val)\s+(\w+)(?:\s*:\s*[^=]+)?(?:\s*=.+)?', |
|
'interface': r'^\s*interface\s+(\w+)(?:<[^>]+>)?', |
|
'annotation': r'^\s*@(\w+)(?:\s*[\w\s.()]+)?', |
|
'suspend': r'^\s*suspend\s+fun\s+\w+', |
|
} |
|
|
|
|
|
def patch_implementation(): |
|
""" |
|
Critical patches for identified issues |
|
""" |
|
|
|
def safe_grep(cmd: str, timeout: int = 30) -> str: |
|
try: |
|
return subprocess.run( |
|
cmd, |
|
shell=True, |
|
text=True, |
|
capture_output=True, |
|
timeout=timeout |
|
).stdout |
|
except subprocess.TimeoutExpired: |
|
return "" |
|
|
|
|
|
def escape_grep_pattern(pattern: str) -> str: |
|
return pattern.replace('(', '\\(').replace(')', '\\)') |
|
|
|
|
|
def read_file_safe(file_path: str) -> str: |
|
try: |
|
with open(file_path, 'r', encoding='utf-8') as f: |
|
return f.read() |
|
except UnicodeDecodeError: |
|
try: |
|
with open(file_path, 'r', encoding='latin-1') as f: |
|
return f.read() |
|
except Exception: |
|
return "" |
|
|
|
return safe_grep, escape_grep_pattern, read_file_safe |
|
|
|
|
|
def generate_fixed_shell_script(): |
|
return ''' |
|
#!/bin/bash |
|
|
|
# Fixed file handling |
|
while IFS= read -r -d '' file; do |
|
if [[ ! -f "$file" ]]; then |
|
continue |
|
fi |
|
|
|
# Handle filenames with spaces and special chars |
|
file_ext="${file##*.}" |
|
file_name=$(printf '%q' "$file") |
|
|
|
# Prevent grep pattern injection |
|
safe_grep() { |
|
local pattern=$1 |
|
local file=$2 |
|
grep -E "^[[:space:]]*${pattern}" "$file" 2>/dev/null || true |
|
} |
|
|
|
case "$file_ext" in |
|
kt|kts) |
|
safe_grep "(@\\w+|class\\s+\\w+|interface\\s+\\w+|fun\\s+\\w+)" "$file_name" |
|
;; |
|
# ... other extensions |
|
esac |
|
done < <(find . -type f -print0) |
|
''' |
|
|
|
|
|
def add_monitoring(): |
|
import time |
|
import psutil |
|
|
|
def monitor_execution(func): |
|
def wrapper(*args, **kwargs): |
|
start = time.time() |
|
process = psutil.Process() |
|
mem_before = process.memory_info().rss |
|
|
|
result = func(*args, **kwargs) |
|
|
|
elapsed = time.time() - start |
|
mem_after = process.memory_info().rss |
|
mem_delta = mem_after - mem_before |
|
|
|
if elapsed > 5.0 or mem_delta > 100*1024*1024: |
|
print(f"Warning: High resource usage in {func.__name__}") |
|
|
|
return result |
|
return wrapper |
|
|
|
return monitor_execution |
|
|