#!/usr/bin/env python3 """Rule evaluation engine for hookify plugin.""" import os import re import sys from functools import lru_cache from typing import List, Dict, Any, Optional # Import from local module from hookify.core.config_loader import Rule, Condition # Maximum transcript size to load into memory (10MB default) # For larger files, only the tail is read to prevent OOM MAX_TRANSCRIPT_SIZE_BYTES = 10 * 1024 * 1024 # 10MB # Size threshold for warning about large transcripts TRANSCRIPT_WARNING_SIZE_BYTES = 5 * 1024 * 1024 # 5MB # Cache compiled regexes (max 128 patterns) @lru_cache(maxsize=128) def compile_regex(pattern: str) -> re.Pattern: """Compile regex pattern with caching. Args: pattern: Regex pattern string Returns: Compiled regex pattern """ return re.compile(pattern, re.IGNORECASE) def read_transcript_safely(transcript_path: str) -> str: """Read transcript file with size limits to prevent OOM. For large transcripts (>10MB), only reads the tail of the file to prevent memory exhaustion. This preserves the most recent conversation context which is typically what rules care about. Args: transcript_path: Path to the transcript file Returns: Transcript content as string, possibly truncated for large files """ try: file_size = os.path.getsize(transcript_path) # Warn about large transcripts if file_size > TRANSCRIPT_WARNING_SIZE_BYTES: size_mb = file_size / (1024 * 1024) print(f"Warning: Large transcript ({size_mb:.1f}MB): {transcript_path}", file=sys.stderr) # For files within limit, read normally if file_size <= MAX_TRANSCRIPT_SIZE_BYTES: with open(transcript_path, 'r') as f: return f.read() # For large files, read only the tail to prevent OOM size_mb = file_size / (1024 * 1024) limit_mb = MAX_TRANSCRIPT_SIZE_BYTES / (1024 * 1024) print(f"Warning: Transcript too large ({size_mb:.1f}MB), reading last {limit_mb:.0f}MB only", file=sys.stderr) with open(transcript_path, 'r') as f: # Seek to position near end, leaving room for MAX_TRANSCRIPT_SIZE_BYTES f.seek(file_size - MAX_TRANSCRIPT_SIZE_BYTES) # Skip partial line at seek position f.readline() return f.read() except FileNotFoundError: print(f"Warning: Transcript file not found: {transcript_path}", file=sys.stderr) return '' except PermissionError: print(f"Warning: Permission denied reading transcript: {transcript_path}", file=sys.stderr) return '' except (IOError, OSError) as e: print(f"Warning: Error reading transcript {transcript_path}: {e}", file=sys.stderr) return '' except UnicodeDecodeError as e: print(f"Warning: Encoding error in transcript {transcript_path}: {e}", file=sys.stderr) return '' class RuleEngine: """Evaluates rules against hook input data.""" def __init__(self): """Initialize rule engine.""" # No need for instance cache anymore - using global lru_cache pass def evaluate_rules(self, rules: List[Rule], input_data: Dict[str, Any]) -> Dict[str, Any]: """Evaluate all rules and return combined results. Checks all rules and accumulates matches. Blocking rules take priority over warning rules. All matching rule messages are combined. Args: rules: List of Rule objects to evaluate input_data: Hook input JSON (tool_name, tool_input, etc.) Returns: Response dict with systemMessage, hookSpecificOutput, etc. Empty dict {} if no rules match. """ hook_event = input_data.get('hook_event_name', '') blocking_rules = [] warning_rules = [] for rule in rules: if self._rule_matches(rule, input_data): if rule.action == 'block': blocking_rules.append(rule) else: warning_rules.append(rule) # If any blocking rules matched, block the operation if blocking_rules: messages = [f"**[{r.name}]**\n{r.message}" for r in blocking_rules] combined_message = "\n\n".join(messages) # Use appropriate blocking format based on event type if hook_event == 'Stop': return { "decision": "block", "reason": combined_message, "systemMessage": combined_message } elif hook_event in ['PreToolUse', 'PostToolUse']: return { "hookSpecificOutput": { "hookEventName": hook_event, "permissionDecision": "deny" }, "systemMessage": combined_message } else: # For other events, just show message return { "systemMessage": combined_message } # If only warnings, show them but allow operation if warning_rules: messages = [f"**[{r.name}]**\n{r.message}" for r in warning_rules] return { "systemMessage": "\n\n".join(messages) } # No matches - allow operation return {} def _rule_matches(self, rule: Rule, input_data: Dict[str, Any]) -> bool: """Check if rule matches input data. Args: rule: Rule to evaluate input_data: Hook input data Returns: True if rule matches, False otherwise """ # Extract tool information tool_name = input_data.get('tool_name', '') tool_input = input_data.get('tool_input', {}) # Check tool matcher if specified if rule.tool_matcher: if not self._matches_tool(rule.tool_matcher, tool_name): return False # If no conditions, don't match # (Rules must have at least one condition to be valid) if not rule.conditions: return False # All conditions must match for condition in rule.conditions: if not self._check_condition(condition, tool_name, tool_input, input_data): return False return True def _matches_tool(self, matcher: str, tool_name: str) -> bool: """Check if tool_name matches the matcher pattern. Args: matcher: Pattern like "Bash", "Edit|Write", "*" tool_name: Actual tool name Returns: True if matches """ if matcher == '*': return True # Split on | for OR matching patterns = matcher.split('|') return tool_name in patterns def _check_condition(self, condition: Condition, tool_name: str, tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> bool: """Check if a single condition matches. Args: condition: Condition to check tool_name: Tool being used tool_input: Tool input dict input_data: Full hook input data (for Stop events, etc.) Returns: True if condition matches """ # Extract the field value to check field_value = self._extract_field(condition.field, tool_name, tool_input, input_data) if field_value is None: return False # Apply operator operator = condition.operator pattern = condition.pattern if operator == 'regex_match': return self._regex_match(pattern, field_value) elif operator == 'contains': return pattern in field_value elif operator == 'equals': return pattern == field_value elif operator == 'not_contains': return pattern not in field_value elif operator == 'starts_with': return field_value.startswith(pattern) elif operator == 'ends_with': return field_value.endswith(pattern) else: # Unknown operator return False def _extract_field(self, field: str, tool_name: str, tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> Optional[str]: """Extract field value from tool input or hook input data. Args: field: Field name like "command", "new_text", "file_path", "reason", "transcript" tool_name: Tool being used (may be empty for Stop events) tool_input: Tool input dict input_data: Full hook input (for accessing transcript_path, reason, etc.) Returns: Field value as string, or None if not found """ # Direct tool_input fields if field in tool_input: value = tool_input[field] if isinstance(value, str): return value return str(value) # For Stop events and other non-tool events, check input_data if input_data: # Stop event specific fields if field == 'reason': return input_data.get('reason', '') elif field == 'transcript': # Read transcript file with size limits to prevent OOM transcript_path = input_data.get('transcript_path') if transcript_path: return read_transcript_safely(transcript_path) elif field == 'user_prompt': # For UserPromptSubmit events return input_data.get('user_prompt', '') # Handle special cases by tool type if tool_name == 'Bash': if field == 'command': return tool_input.get('command', '') elif tool_name in ['Write', 'Edit']: if field == 'content': # Write uses 'content', Edit has 'new_string' return tool_input.get('content') or tool_input.get('new_string', '') elif field == 'new_text' or field == 'new_string': return tool_input.get('new_string', '') elif field == 'old_text' or field == 'old_string': return tool_input.get('old_string', '') elif field == 'file_path': return tool_input.get('file_path', '') elif tool_name == 'MultiEdit': if field == 'file_path': return tool_input.get('file_path', '') elif field in ['new_text', 'content']: # Concatenate all edits edits = tool_input.get('edits', []) return ' '.join(e.get('new_string', '') for e in edits) return None def _regex_match(self, pattern: str, text: str) -> bool: """Check if pattern matches text using regex. Args: pattern: Regex pattern text: Text to match against Returns: True if pattern matches """ try: # Use cached compiled regex (LRU cache with max 128 patterns) regex = compile_regex(pattern) return bool(regex.search(text)) except re.error as e: print(f"Invalid regex pattern '{pattern}': {e}", file=sys.stderr) return False # For testing if __name__ == '__main__': from hookify.core.config_loader import Condition, Rule # Test rule evaluation rule = Rule( name="test-rm", enabled=True, event="bash", conditions=[ Condition(field="command", operator="regex_match", pattern=r"rm\s+-rf") ], message="Dangerous rm command!" ) engine = RuleEngine() # Test matching input test_input = { "tool_name": "Bash", "tool_input": { "command": "rm -rf /tmp/test" } } result = engine.evaluate_rules([rule], test_input) print("Match result:", result) # Test non-matching input test_input2 = { "tool_name": "Bash", "tool_input": { "command": "ls -la" } } result2 = engine.evaluate_rules([rule], test_input2) print("Non-match result:", result2)