mirror of
https://github.com/anthropics/claude-code.git
synced 2026-05-21 02:52:42 +00:00
feat: Add hookify plugin for custom hook rules via markdown
Adds the hookify plugin to public marketplace. Enables users to create custom hooks using simple markdown configuration files instead of editing JSON. Key features: - Define rules with regex patterns to warn/block operations - Create rules from explicit instructions or conversation analysis - Pattern-based matching for bash commands, file edits, prompts, stop events - Enable/disable rules dynamically without editing code - Conversation analyzer agent finds problematic behaviors Changes from internal version: - Removed non-functional SessionStart hook (not registered in hooks.json) - Removed all sessionstart documentation and examples - Fixed restart documentation to consistently state "no restart needed" - Changed license from "Internal Anthropic use only" to "MIT License" - Kept test blocks in core modules (useful for developers) Plugin provides: - 4 commands: /hookify, /hookify:list, /hookify:configure, /hookify:help - 1 agent: conversation-analyzer - 1 skill: writing-rules - 4 hook types: PreToolUse, PostToolUse, Stop, UserPromptSubmit - 4 example rules ready to use All features functional and suitable for public use. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
313
plugins/hookify/core/rule_engine.py
Normal file
313
plugins/hookify/core/rule_engine.py
Normal file
@@ -0,0 +1,313 @@
|
||||
#!/usr/bin/env python3
|
||||
"""Rule evaluation engine for hookify plugin."""
|
||||
|
||||
import re
|
||||
import sys
|
||||
from functools import lru_cache
|
||||
from typing import List, Dict, Any, Optional
|
||||
|
||||
# Import from local module
|
||||
from hookify.core.config_loader import Rule, Condition
|
||||
|
||||
|
||||
# Cache compiled regexes (max 128 patterns)
|
||||
@lru_cache(maxsize=128)
|
||||
def compile_regex(pattern: str) -> re.Pattern:
|
||||
"""Compile regex pattern with caching.
|
||||
|
||||
Args:
|
||||
pattern: Regex pattern string
|
||||
|
||||
Returns:
|
||||
Compiled regex pattern
|
||||
"""
|
||||
return re.compile(pattern, re.IGNORECASE)
|
||||
|
||||
|
||||
class RuleEngine:
|
||||
"""Evaluates rules against hook input data."""
|
||||
|
||||
def __init__(self):
|
||||
"""Initialize rule engine."""
|
||||
# No need for instance cache anymore - using global lru_cache
|
||||
pass
|
||||
|
||||
def evaluate_rules(self, rules: List[Rule], input_data: Dict[str, Any]) -> Dict[str, Any]:
|
||||
"""Evaluate all rules and return combined results.
|
||||
|
||||
Checks all rules and accumulates matches. Blocking rules take priority
|
||||
over warning rules. All matching rule messages are combined.
|
||||
|
||||
Args:
|
||||
rules: List of Rule objects to evaluate
|
||||
input_data: Hook input JSON (tool_name, tool_input, etc.)
|
||||
|
||||
Returns:
|
||||
Response dict with systemMessage, hookSpecificOutput, etc.
|
||||
Empty dict {} if no rules match.
|
||||
"""
|
||||
hook_event = input_data.get('hook_event_name', '')
|
||||
blocking_rules = []
|
||||
warning_rules = []
|
||||
|
||||
for rule in rules:
|
||||
if self._rule_matches(rule, input_data):
|
||||
if rule.action == 'block':
|
||||
blocking_rules.append(rule)
|
||||
else:
|
||||
warning_rules.append(rule)
|
||||
|
||||
# If any blocking rules matched, block the operation
|
||||
if blocking_rules:
|
||||
messages = [f"**[{r.name}]**\n{r.message}" for r in blocking_rules]
|
||||
combined_message = "\n\n".join(messages)
|
||||
|
||||
# Use appropriate blocking format based on event type
|
||||
if hook_event == 'Stop':
|
||||
return {
|
||||
"decision": "block",
|
||||
"reason": combined_message,
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
elif hook_event in ['PreToolUse', 'PostToolUse']:
|
||||
return {
|
||||
"hookSpecificOutput": {
|
||||
"hookEventName": hook_event,
|
||||
"permissionDecision": "deny"
|
||||
},
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
else:
|
||||
# For other events, just show message
|
||||
return {
|
||||
"systemMessage": combined_message
|
||||
}
|
||||
|
||||
# If only warnings, show them but allow operation
|
||||
if warning_rules:
|
||||
messages = [f"**[{r.name}]**\n{r.message}" for r in warning_rules]
|
||||
return {
|
||||
"systemMessage": "\n\n".join(messages)
|
||||
}
|
||||
|
||||
# No matches - allow operation
|
||||
return {}
|
||||
|
||||
def _rule_matches(self, rule: Rule, input_data: Dict[str, Any]) -> bool:
|
||||
"""Check if rule matches input data.
|
||||
|
||||
Args:
|
||||
rule: Rule to evaluate
|
||||
input_data: Hook input data
|
||||
|
||||
Returns:
|
||||
True if rule matches, False otherwise
|
||||
"""
|
||||
# Extract tool information
|
||||
tool_name = input_data.get('tool_name', '')
|
||||
tool_input = input_data.get('tool_input', {})
|
||||
|
||||
# Check tool matcher if specified
|
||||
if rule.tool_matcher:
|
||||
if not self._matches_tool(rule.tool_matcher, tool_name):
|
||||
return False
|
||||
|
||||
# If no conditions, don't match
|
||||
# (Rules must have at least one condition to be valid)
|
||||
if not rule.conditions:
|
||||
return False
|
||||
|
||||
# All conditions must match
|
||||
for condition in rule.conditions:
|
||||
if not self._check_condition(condition, tool_name, tool_input, input_data):
|
||||
return False
|
||||
|
||||
return True
|
||||
|
||||
def _matches_tool(self, matcher: str, tool_name: str) -> bool:
|
||||
"""Check if tool_name matches the matcher pattern.
|
||||
|
||||
Args:
|
||||
matcher: Pattern like "Bash", "Edit|Write", "*"
|
||||
tool_name: Actual tool name
|
||||
|
||||
Returns:
|
||||
True if matches
|
||||
"""
|
||||
if matcher == '*':
|
||||
return True
|
||||
|
||||
# Split on | for OR matching
|
||||
patterns = matcher.split('|')
|
||||
return tool_name in patterns
|
||||
|
||||
def _check_condition(self, condition: Condition, tool_name: str,
|
||||
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> bool:
|
||||
"""Check if a single condition matches.
|
||||
|
||||
Args:
|
||||
condition: Condition to check
|
||||
tool_name: Tool being used
|
||||
tool_input: Tool input dict
|
||||
input_data: Full hook input data (for Stop events, etc.)
|
||||
|
||||
Returns:
|
||||
True if condition matches
|
||||
"""
|
||||
# Extract the field value to check
|
||||
field_value = self._extract_field(condition.field, tool_name, tool_input, input_data)
|
||||
if field_value is None:
|
||||
return False
|
||||
|
||||
# Apply operator
|
||||
operator = condition.operator
|
||||
pattern = condition.pattern
|
||||
|
||||
if operator == 'regex_match':
|
||||
return self._regex_match(pattern, field_value)
|
||||
elif operator == 'contains':
|
||||
return pattern in field_value
|
||||
elif operator == 'equals':
|
||||
return pattern == field_value
|
||||
elif operator == 'not_contains':
|
||||
return pattern not in field_value
|
||||
elif operator == 'starts_with':
|
||||
return field_value.startswith(pattern)
|
||||
elif operator == 'ends_with':
|
||||
return field_value.endswith(pattern)
|
||||
else:
|
||||
# Unknown operator
|
||||
return False
|
||||
|
||||
def _extract_field(self, field: str, tool_name: str,
|
||||
tool_input: Dict[str, Any], input_data: Dict[str, Any] = None) -> Optional[str]:
|
||||
"""Extract field value from tool input or hook input data.
|
||||
|
||||
Args:
|
||||
field: Field name like "command", "new_text", "file_path", "reason", "transcript"
|
||||
tool_name: Tool being used (may be empty for Stop events)
|
||||
tool_input: Tool input dict
|
||||
input_data: Full hook input (for accessing transcript_path, reason, etc.)
|
||||
|
||||
Returns:
|
||||
Field value as string, or None if not found
|
||||
"""
|
||||
# Direct tool_input fields
|
||||
if field in tool_input:
|
||||
value = tool_input[field]
|
||||
if isinstance(value, str):
|
||||
return value
|
||||
return str(value)
|
||||
|
||||
# For Stop events and other non-tool events, check input_data
|
||||
if input_data:
|
||||
# Stop event specific fields
|
||||
if field == 'reason':
|
||||
return input_data.get('reason', '')
|
||||
elif field == 'transcript':
|
||||
# Read transcript file if path provided
|
||||
transcript_path = input_data.get('transcript_path')
|
||||
if transcript_path:
|
||||
try:
|
||||
with open(transcript_path, 'r') as f:
|
||||
return f.read()
|
||||
except FileNotFoundError:
|
||||
print(f"Warning: Transcript file not found: {transcript_path}", file=sys.stderr)
|
||||
return ''
|
||||
except PermissionError:
|
||||
print(f"Warning: Permission denied reading transcript: {transcript_path}", file=sys.stderr)
|
||||
return ''
|
||||
except (IOError, OSError) as e:
|
||||
print(f"Warning: Error reading transcript {transcript_path}: {e}", file=sys.stderr)
|
||||
return ''
|
||||
except UnicodeDecodeError as e:
|
||||
print(f"Warning: Encoding error in transcript {transcript_path}: {e}", file=sys.stderr)
|
||||
return ''
|
||||
elif field == 'user_prompt':
|
||||
# For UserPromptSubmit events
|
||||
return input_data.get('user_prompt', '')
|
||||
|
||||
# Handle special cases by tool type
|
||||
if tool_name == 'Bash':
|
||||
if field == 'command':
|
||||
return tool_input.get('command', '')
|
||||
|
||||
elif tool_name in ['Write', 'Edit']:
|
||||
if field == 'content':
|
||||
# Write uses 'content', Edit has 'new_string'
|
||||
return tool_input.get('content') or tool_input.get('new_string', '')
|
||||
elif field == 'new_text' or field == 'new_string':
|
||||
return tool_input.get('new_string', '')
|
||||
elif field == 'old_text' or field == 'old_string':
|
||||
return tool_input.get('old_string', '')
|
||||
elif field == 'file_path':
|
||||
return tool_input.get('file_path', '')
|
||||
|
||||
elif tool_name == 'MultiEdit':
|
||||
if field == 'file_path':
|
||||
return tool_input.get('file_path', '')
|
||||
elif field in ['new_text', 'content']:
|
||||
# Concatenate all edits
|
||||
edits = tool_input.get('edits', [])
|
||||
return ' '.join(e.get('new_string', '') for e in edits)
|
||||
|
||||
return None
|
||||
|
||||
def _regex_match(self, pattern: str, text: str) -> bool:
|
||||
"""Check if pattern matches text using regex.
|
||||
|
||||
Args:
|
||||
pattern: Regex pattern
|
||||
text: Text to match against
|
||||
|
||||
Returns:
|
||||
True if pattern matches
|
||||
"""
|
||||
try:
|
||||
# Use cached compiled regex (LRU cache with max 128 patterns)
|
||||
regex = compile_regex(pattern)
|
||||
return bool(regex.search(text))
|
||||
|
||||
except re.error as e:
|
||||
print(f"Invalid regex pattern '{pattern}': {e}", file=sys.stderr)
|
||||
return False
|
||||
|
||||
|
||||
# For testing
|
||||
if __name__ == '__main__':
|
||||
from hookify.core.config_loader import Condition, Rule
|
||||
|
||||
# Test rule evaluation
|
||||
rule = Rule(
|
||||
name="test-rm",
|
||||
enabled=True,
|
||||
event="bash",
|
||||
conditions=[
|
||||
Condition(field="command", operator="regex_match", pattern=r"rm\s+-rf")
|
||||
],
|
||||
message="Dangerous rm command!"
|
||||
)
|
||||
|
||||
engine = RuleEngine()
|
||||
|
||||
# Test matching input
|
||||
test_input = {
|
||||
"tool_name": "Bash",
|
||||
"tool_input": {
|
||||
"command": "rm -rf /tmp/test"
|
||||
}
|
||||
}
|
||||
|
||||
result = engine.evaluate_rules([rule], test_input)
|
||||
print("Match result:", result)
|
||||
|
||||
# Test non-matching input
|
||||
test_input2 = {
|
||||
"tool_name": "Bash",
|
||||
"tool_input": {
|
||||
"command": "ls -la"
|
||||
}
|
||||
}
|
||||
|
||||
result2 = engine.evaluate_rules([rule], test_input2)
|
||||
print("Non-match result:", result2)
|
||||
Reference in New Issue
Block a user