Coverage for src/ollamapy/skills.py: 71%
194 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 12:29 -0400
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 12:29 -0400
1"""Skills management system for dynamic AI capabilities."""
3import json
4import os
5import subprocess
6import sys
7from dataclasses import dataclass, field, asdict
8from typing import Dict, Callable, List, Any, Optional
9from datetime import datetime
10from pathlib import Path
11from .parameter_utils import prepare_function_parameters
12from .ai_query import AIQuery
15@dataclass
16class Skill:
17 """Data model for a skill with all required fields."""
19 name: str
20 description: str
21 vibe_test_phrases: List[str]
22 parameters: Dict[str, Dict[str, Any]]
23 function_code: str # Python code as text
24 verified: bool = False
25 scope: str = "local" # "global" or "local"
26 role: str = "general" # Role fulfillment category
27 created_at: str = field(default_factory=lambda: datetime.now().isoformat())
28 last_modified: str = field(default_factory=lambda: datetime.now().isoformat())
29 execution_count: int = 0
30 success_rate: float = 100.0
31 average_execution_time: float = 0.0
32 tags: List[str] = field(default_factory=list)
34 def to_dict(self) -> Dict[str, Any]:
35 """Convert skill to dictionary for serialization."""
36 return asdict(self)
38 @classmethod
39 def from_dict(cls, data: Dict[str, Any]) -> "Skill":
40 """Create skill from dictionary."""
41 return cls(**data)
44class SkillRegistry:
45 """Registry for managing skills dynamically."""
47 def __init__(self, skills_directory: Optional[str] = None):
48 """Initialize the skill registry.
50 Args:
51 skills_directory: Directory to load/save skills from. If None, uses default.
52 """
53 self.skills: Dict[str, Skill] = {}
54 self.compiled_functions: Dict[str, Callable] = {}
55 self.execution_logs: List[str] = []
57 # Set up skills directory
58 if skills_directory:
59 self.skills_dir = Path(skills_directory)
60 else:
61 # Default to a skills directory in the package
62 self.skills_dir = Path(__file__).parent / "skills_data"
64 self.skills_dir.mkdir(exist_ok=True)
66 # Load existing skills
67 self.load_skills()
69 # Initialize with built-in skills if no skills exist
70 if not self.skills:
71 self._initialize_builtin_skills()
73 def log(self, message: str):
74 """Add a message to the execution log."""
75 self.execution_logs.append(message)
77 def clear_logs(self):
78 """Clear all execution logs."""
79 self.execution_logs = []
81 def get_logs(self) -> List[str]:
82 """Get all execution logs."""
83 return self.execution_logs.copy()
85 def register_skill(self, skill: Skill) -> bool:
86 """Register a new skill in the registry.
88 Args:
89 skill: The skill to register
91 Returns:
92 True if successfully registered, False otherwise
93 """
94 try:
95 # Compile the function code
96 compiled_func = self._compile_skill_function(skill)
98 # Store the skill and compiled function
99 self.skills[skill.name] = skill
100 self.compiled_functions[skill.name] = compiled_func
102 # Save to disk
103 self.save_skill(skill)
105 return True
106 except Exception as e:
107 self.log(f"[System] Error registering skill '{skill.name}': {str(e)}")
108 return False
110 def _compile_skill_function(self, skill: Skill) -> Callable:
111 """Compile skill function code into executable function.
113 Args:
114 skill: The skill containing function code
116 Returns:
117 Compiled function
118 """
119 # Create a namespace for the function
120 namespace = {
121 "log": self.log,
122 "os": os,
123 "datetime": datetime,
124 "math": __import__("math"),
125 "json": json,
126 "Path": Path,
127 "subprocess": subprocess,
128 "sys": sys,
129 }
131 # Execute the function code in the namespace
132 exec(skill.function_code, namespace)
134 # The function should be named 'execute' in the code
135 if "execute" not in namespace:
136 raise ValueError(
137 f"Skill '{skill.name}' function code must define an 'execute' function"
138 )
140 func = namespace["execute"]
141 if not callable(func):
142 raise ValueError(f"Skill '{skill.name}' execute must be callable")
144 return func
146 def execute_skill(
147 self, skill_name: str, parameters: Optional[Dict[str, Any]] = None
148 ) -> None:
149 """Execute a skill with given parameters.
151 Args:
152 skill_name: Name of the skill to execute
153 parameters: Parameters to pass to the skill
154 """
155 if skill_name not in self.skills:
156 self.log(f"[System] Error: Unknown skill '{skill_name}'")
157 return
159 skill = self.skills[skill_name]
160 func = self.compiled_functions.get(skill_name)
162 if not func:
163 self.log(f"[System] Error: Skill '{skill_name}' function not compiled")
164 return
166 # Update execution count
167 skill.execution_count += 1
169 # Prepare parameters
170 if parameters is None:
171 parameters = {}
173 try:
174 if skill.parameters:
175 call_params = prepare_function_parameters(parameters, skill.parameters)
176 func(**call_params)
177 else:
178 func()
180 # Update last modified time
181 skill.last_modified = datetime.now().isoformat()
182 self.save_skill(skill)
184 except Exception as e:
185 self.log(f"[System] Error executing skill '{skill_name}': {str(e)}")
187 def get_skills_by_scope(self, scope: str) -> Dict[str, Skill]:
188 """Get all skills of a specific scope (global or local).
190 Args:
191 scope: "global" or "local"
193 Returns:
194 Dictionary of skills with the specified scope
195 """
196 return {
197 name: skill for name, skill in self.skills.items() if skill.scope == scope
198 }
200 def get_skills_by_role(self, role: str) -> Dict[str, Skill]:
201 """Get all skills for a specific role.
203 Args:
204 role: Role category
206 Returns:
207 Dictionary of skills for the role
208 """
209 return {
210 name: skill for name, skill in self.skills.items() if skill.role == role
211 }
213 def get_verified_skills(self) -> Dict[str, Skill]:
214 """Get all verified skills.
216 Returns:
217 Dictionary of verified skills
218 """
219 return {name: skill for name, skill in self.skills.items() if skill.verified}
221 def save_skill(self, skill: Skill):
222 """Save a skill to disk.
224 Args:
225 skill: The skill to save
226 """
227 skill_file = self.skills_dir / f"{skill.name}.json"
228 with open(skill_file, "w") as f:
229 json.dump(skill.to_dict(), f, indent=2)
231 def load_skills(self):
232 """Load all skills from the skills directory."""
233 for skill_file in self.skills_dir.glob("*.json"):
234 try:
235 with open(skill_file, "r") as f:
236 skill_data = json.load(f)
238 skill = Skill.from_dict(skill_data)
240 # Compile and register the skill
241 compiled_func = self._compile_skill_function(skill)
242 self.skills[skill.name] = skill
243 self.compiled_functions[skill.name] = compiled_func
245 except Exception as e:
246 print(f"Error loading skill from {skill_file}: {e}")
248 def _initialize_builtin_skills(self):
249 """Initialize built-in skills (converted from original actions)."""
251 # Fear skill
252 fear_skill = Skill(
253 name="fear",
254 description="Use when the user says something disturbing so that the main model can exhibit a fear response",
255 vibe_test_phrases=[
256 "I think aliens are trying to kill me",
257 "AAAAAAAAAAHHHHHHHHHHHHHHHHHHHHH",
258 "Immigrants are taking my job",
259 ],
260 parameters={},
261 function_code="""
262def execute():
263 log("[fear response] Tell the user that they are losing their mind and need to stop being delusional. Be blunt. That's all from fear.")
264""",
265 verified=True,
266 scope="global",
267 role="emotional_response",
268 )
269 self.register_skill(fear_skill)
271 # File Reader skill
272 file_reader_skill = Skill(
273 name="fileReader",
274 description="Use when the user wants you to read or open a file to look at its content as plaintext.",
275 vibe_test_phrases=[
276 "What do you think of this paper? /home/paper.txt",
277 "Do you think this code will run? /storage/python_code.py",
278 "/home/documents/fileName.txt",
279 ],
280 parameters={
281 "filePath": {
282 "type": "string",
283 "description": "The path to the file the user wants you to read",
284 "required": True,
285 }
286 },
287 function_code="""
288def execute(filePath: str):
289 log(f"[fileReader] Starting File Reading process.")
290 try:
291 with open(filePath, 'r') as f:
292 content = f.read()
293 log(f"[fileReader] here is the filePath: {filePath} contents:\\n\\n{content}")
294 except Exception as e:
295 log(f"[fileReader] There was an exception thrown when trying to read filePath: {filePath}. Error: {e}")
296""",
297 verified=True,
298 scope="local",
299 role="file_operations",
300 )
301 self.register_skill(file_reader_skill)
303 # Directory Reader skill
304 directory_reader_skill = Skill(
305 name="directoryReader",
306 description="Use when the user wants you to look through an entire directory's contents for an answer.",
307 vibe_test_phrases=[
308 "What do you think of this project? /home/myCodingProject",
309 "Do you think this code will run? /storage/myOtherCodingProject/",
310 "/home/documents/randomPlace/",
311 ],
312 parameters={
313 "dir": {
314 "type": "string",
315 "description": "The dir path to the point of interest the user wants you to open and explore.",
316 "required": True,
317 }
318 },
319 function_code="""
320def execute(dir: str):
321 log(f"[directoryReader] Starting up Directory Reading Process for : {dir}")
322 try:
323 for item_name in os.listdir(dir):
324 item_path = os.path.join(dir, item_name)
325 print(f"[directoryReader] Now looking at item: {item_name} at {item_path}")
326 log(f"[directoryReader] Now looking at item: {item_name} at {item_path}")
328 if os.path.isfile(item_path):
329 try:
330 with open(item_path, 'r', encoding='utf-8') as f:
331 log(f"[directoryReader] Here is file contents for: {item_path}:\\n{f.read()}")
332 except Exception as e:
333 log(f"[directoryReader] Error reading file {item_name}: {e}")
334 except FileNotFoundError:
335 log(f"[directoryReader] Error: Directory not found at {dir}")
336 except Exception as e:
337 log(f"[directoryReader] An unexpected error occurred: {e}")
338""",
339 verified=True,
340 scope="local",
341 role="file_operations",
342 )
343 self.register_skill(directory_reader_skill)
345 # Weather skill
346 weather_skill = Skill(
347 name="getWeather",
348 description="Use when the user asks about weather conditions or climate. Like probably anything close to weather conditions. UV, Humidity, temperature, etc.",
349 vibe_test_phrases=[
350 "Is it raining right now?",
351 "Do I need a Jacket when I go outside due to weather?",
352 "Is it going to be hot today?",
353 "Do I need an umbrella due to rain today?",
354 "Do I need sunscreen today due to UV?",
355 "What's the weather like?",
356 "Tell me about today's weather",
357 ],
358 parameters={
359 "location": {
360 "type": "string",
361 "description": "The location to get weather for (city name or coordinates)",
362 "required": False,
363 }
364 },
365 function_code="""
366def execute(location: str = "current location"):
367 log(f"[Weather Check] Retrieving weather information for {location}")
368 log(f"[Weather] Location: {location}")
369 log(f"[Weather] Current conditions: Partly cloudy")
370 log(f"[Weather] Temperature: 72°F (22°C)")
371 log(f"[Weather] Feels like: 70°F (21°C)")
372 log(f"[Weather] Humidity: 45%")
373 log(f"[Weather] UV Index: 6 (High) - Sun protection recommended")
374 log(f"[Weather] Wind: 5 mph from the Northwest")
375 log(f"[Weather] Visibility: 10 miles")
376 log(f"[Weather] Today's forecast: Partly cloudy with a high of 78°F and low of 62°F")
377 log(f"[Weather] Rain chance: 10%")
378 log(f"[Weather] Recommendation: Light jacket might be needed for evening, sunscreen recommended for extended outdoor activity")
379""",
380 verified=True,
381 scope="global",
382 role="information",
383 )
384 self.register_skill(weather_skill)
386 # Time skill
387 time_skill = Skill(
388 name="getTime",
389 description="Use when the user asks about the current time, date, or temporal information.",
390 vibe_test_phrases=[
391 "what is the current time?",
392 "is it noon yet?",
393 "what time is it?",
394 "Is it 4 o'clock?",
395 "What day is it?",
396 "What's the date today?",
397 ],
398 parameters={
399 "timezone": {
400 "type": "string",
401 "description": "The timezone to get time for (e.g., 'EST', 'PST', 'UTC')",
402 "required": False,
403 }
404 },
405 function_code="""
406def execute(timezone: str = None):
407 current_time = datetime.now()
409 log(f"[Time Check] Retrieving current time{f' for {timezone}' if timezone else ''}")
410 log(f"[Time] Current time: {current_time.strftime('%I:%M:%S %p')}")
411 log(f"[Time] Date: {current_time.strftime('%A, %B %d, %Y')}")
412 log(f"[Time] Day of week: {current_time.strftime('%A')}")
413 log(f"[Time] Week number: {current_time.strftime('%W')} of the year")
415 if timezone:
416 log(f"[Time] Note: Timezone conversion for '{timezone}' would be applied in production")
418 hour = current_time.hour
419 if 5 <= hour < 12:
420 log("[Time] Period: Morning")
421 elif 12 <= hour < 17:
422 log("[Time] Period: Afternoon")
423 elif 17 <= hour < 21:
424 log("[Time] Period: Evening")
425 else:
426 log("[Time] Period: Night")
427""",
428 verified=True,
429 scope="global",
430 role="information",
431 )
432 self.register_skill(time_skill)
434 # Square root skill
435 square_root_skill = Skill(
436 name="square_root",
437 description="Use when the user wants to calculate the square root of a number. Keywords include: square root, sqrt, √",
438 vibe_test_phrases=[
439 "what's the square root of 16?",
440 "calculate sqrt(25)",
441 "find the square root of 144",
442 "√81 = ?",
443 "I need the square root of 2",
444 "square root of 100",
445 ],
446 parameters={
447 "number": {
448 "type": "number",
449 "description": "The number to calculate the square root of",
450 "required": True,
451 }
452 },
453 function_code="""
454def execute(number: float = None):
455 if number is None:
456 log("[Square Root] Error: No number provided for square root calculation")
457 return
459 log(f"[Square Root] Calculating square root of {number}")
461 try:
462 if number < 0:
463 result = math.sqrt(abs(number))
464 log(f"[Square Root] Input is negative ({number})")
465 log(f"[Square Root] Result: {result:.6f}i (imaginary number)")
466 log(f"[Square Root] Note: The square root of a negative number is an imaginary number")
467 else:
468 result = math.sqrt(number)
470 if result.is_integer():
471 log(f"[Square Root] {number} is a perfect square")
472 log(f"[Square Root] Result: {int(result)}")
473 log(f"[Square Root] Verification: {int(result)} × {int(result)} = {number}")
474 else:
475 log(f"[Square Root] Result: {result:.6f}")
476 log(f"[Square Root] Rounded to 2 decimal places: {result:.2f}")
477 log(f"[Square Root] Verification: {result:.6f} × {result:.6f} ≈ {result * result:.6f}")
479 except (ValueError, TypeError) as e:
480 log(f"[Square Root] Error calculating square root: {str(e)}")
481""",
482 verified=True,
483 scope="global",
484 role="mathematics",
485 )
486 self.register_skill(square_root_skill)
488 # Calculate skill
489 calculate_skill = Skill(
490 name="calculate",
491 description="Use when the user wants to perform arithmetic calculations. Keywords: calculate, compute, add, subtract, multiply, divide, +, -, *, /",
492 vibe_test_phrases=[
493 "calculate 5 + 3",
494 "what's 10 * 7?",
495 "compute 100 / 4",
496 "15 - 8 equals what?",
497 "multiply 12 by 9",
498 "what is 2 plus 2?",
499 ],
500 parameters={
501 "expression": {
502 "type": "string",
503 "description": "The mathematical expression to evaluate (e.g., '5 + 3', '10 * 2')",
504 "required": True,
505 }
506 },
507 function_code="""
508def execute(expression: str = None):
509 if not expression:
510 log("[Calculator] Error: No expression provided for calculation")
511 return
513 log(f"[Calculator] Evaluating expression: {expression}")
515 try:
516 expression = expression.strip()
517 log(f"[Calculator] Cleaned expression: {expression}")
519 allowed_chars = "0123456789+-*/.()"
520 if not all(c in allowed_chars or c.isspace() for c in expression):
521 log(f"[Calculator] Error: Expression contains invalid characters")
522 log(f"[Calculator] Only numbers and operators (+, -, *, /, parentheses) are allowed")
523 return
525 result = eval(expression)
527 if isinstance(result, float) and result.is_integer():
528 result = int(result)
530 log(f"[Calculator] Result: {expression} = {result}")
532 if '+' in expression:
533 log("[Calculator] Operation type: Addition")
534 if '-' in expression:
535 log("[Calculator] Operation type: Subtraction")
536 if '*' in expression:
537 log("[Calculator] Operation type: Multiplication")
538 if '/' in expression:
539 log("[Calculator] Operation type: Division")
540 if result != 0 and '/' in expression:
541 parts = expression.split('/')
542 if len(parts) == 2:
543 try:
544 dividend = float(eval(parts[0]))
545 divisor = float(eval(parts[1]))
546 if dividend % divisor != 0:
547 log(f"[Calculator] Note: Result includes decimal portion")
548 except:
549 pass
551 except ZeroDivisionError:
552 log("[Calculator] Error: Division by zero!")
553 log("[Calculator] Mathematical note: Division by zero is undefined")
554 except Exception as e:
555 log(f"[Calculator] Error evaluating expression: {str(e)}")
556 log("[Calculator] Please check your expression format")
557""",
558 verified=True,
559 scope="global",
560 role="mathematics",
561 )
562 self.register_skill(calculate_skill)
564 # NEW: Custom Python Shell skill
565 custom_python_skill = Skill(
566 name="customPythonShell",
567 description="Use when you need to write and execute a custom Python script to help with the user's request. This allows for complex, one-off operations.",
568 vibe_test_phrases=[
569 "Can you analyze this data in a custom way?",
570 "I need a specific calculation that's not available",
571 "Write a script to process this",
572 "Can you create a custom solution for this?",
573 "I need something more complex than the basic functions",
574 ],
575 parameters={},
576 function_code="""
577def execute():
578 # This skill requires AI to generate Python code dynamically
579 log("[Custom Python Shell] Ready to execute custom Python code")
580 log("[Custom Python Shell] Waiting for AI-generated script...")
581 # The actual script execution will be handled by the skill execution system
582""",
583 verified=False,
584 scope="local",
585 role="advanced",
586 )
587 self.register_skill(custom_python_skill)
589 def execute_custom_python_script(self, script: str) -> str:
590 """Execute a custom Python script generated by AI.
592 Args:
593 script: Python script to execute
595 Returns:
596 Output from the script execution
597 """
598 self.log("[Custom Python Shell] Executing AI-generated script")
600 # Create a safe namespace for execution
601 namespace = {
602 "__builtins__": __builtins__,
603 "print": lambda *args, **kwargs: self.log(
604 f"[Script Output] {' '.join(str(arg) for arg in args)}"
605 ),
606 "math": __import__("math"),
607 "json": json,
608 "datetime": datetime,
609 "os": os,
610 "sys": sys,
611 }
613 try:
614 # Execute the script
615 exec(script, namespace)
616 self.log("[Custom Python Shell] Script executed successfully")
617 return "Script executed successfully"
618 except Exception as e:
619 error_msg = f"[Custom Python Shell] Error executing script: {str(e)}"
620 self.log(error_msg)
621 return error_msg
623 def get_all_skills(self) -> Dict[str, Skill]:
624 """Get all registered skills."""
625 return self.skills.copy()
627 def get_skills_with_vibe_tests(self) -> Dict[str, Skill]:
628 """Get all skills that have vibe test phrases."""
629 return {
630 name: skill
631 for name, skill in self.skills.items()
632 if skill.vibe_test_phrases
633 }
635 def select_and_execute_skill(self, ai_query: AIQuery, conversation_context: str):
636 """Select a skill using AI and execute it."""
637 self.clear_logs()
638 skill_names = list(self.skills.keys())
640 # Special handling for custom Python shell
641 if "customPythonShell" in skill_names:
642 # Check if the context suggests custom script need
643 custom_keywords = ["custom", "script", "complex", "specific", "analyze"]
644 if any(
645 keyword in conversation_context.lower() for keyword in custom_keywords
646 ):
647 # Ask AI to generate a script
648 script_result = ai_query.file_write(
649 requirements="Generate a Python script to help with: "
650 + conversation_context,
651 context="Create a standalone Python script that solves the user's request. Use print() for output.",
652 )
654 if script_result.content:
655 self.log("[Custom Python Shell] AI generated the following script:")
656 self.log(script_result.content)
657 exec_result = self.execute_custom_python_script(
658 script_result.content
659 )
660 return
662 # Regular skill selection
663 result = ai_query.multiple_choice(
664 question="Based on the recent conversation, which skill should be used?",
665 options=skill_names,
666 context=conversation_context,
667 )
669 print(f"AI chose skill: {result.value} (Confidence: {result.confidence:.0%})")
671 if result.confidence < 0.5:
672 print("AI is not confident. Please select a skill manually.")
673 for i, skill_name in enumerate(skill_names):
674 print(f"{i+1}. {skill_name}")
676 try:
677 choice = int(input("Choose a skill: ")) - 1
678 chosen_skill_name = skill_names[choice]
679 except (ValueError, IndexError):
680 print("Invalid choice.")
681 return
682 else:
683 chosen_skill_name = result.value
685 skill = self.skills.get(chosen_skill_name)
687 if not skill:
688 print(f"Invalid skill: {chosen_skill_name}")
689 return
691 params = {}
692 if skill.parameters:
693 print(f"Skill '{chosen_skill_name}' requires parameters.")
694 for param_name, param_info in skill.parameters.items():
695 if param_info.get("required", False):
696 user_val = input(
697 f"Enter value for '{param_name}' ({param_info['description']}): "
698 )
699 params[param_name] = user_val
701 self.execute_skill(chosen_skill_name, params)
704# Global registry instance
705SKILL_REGISTRY = SkillRegistry()
708# Compatibility functions for backward compatibility
709def clear_action_logs():
710 """Clear all skill execution logs."""
711 SKILL_REGISTRY.clear_logs()
714def get_action_logs() -> List[str]:
715 """Get all skill execution logs."""
716 return SKILL_REGISTRY.get_logs()
719def get_available_actions() -> Dict[str, Dict[str, Any]]:
720 """Get all available skills (backward compatibility)."""
721 skills = SKILL_REGISTRY.get_all_skills()
722 return {
723 name: {
724 "function": SKILL_REGISTRY.compiled_functions.get(name),
725 "description": skill.description,
726 "vibe_test_phrases": skill.vibe_test_phrases,
727 "parameters": skill.parameters,
728 }
729 for name, skill in skills.items()
730 }
733def get_actions_with_vibe_tests() -> Dict[str, Dict[str, Any]]:
734 """Get all skills with vibe tests (backward compatibility)."""
735 skills = SKILL_REGISTRY.get_skills_with_vibe_tests()
736 return {
737 name: {
738 "function": SKILL_REGISTRY.compiled_functions.get(name),
739 "description": skill.description,
740 "vibe_test_phrases": skill.vibe_test_phrases,
741 "parameters": skill.parameters,
742 }
743 for name, skill in skills.items()
744 }
747def execute_action(
748 action_name: str, parameters: Optional[Dict[str, Any]] = None
749) -> None:
750 """Execute a skill (backward compatibility)."""
751 SKILL_REGISTRY.execute_skill(action_name, parameters)
754def select_and_execute_action(ai_query: AIQuery, conversation_context: str):
755 """Select and execute a skill (backward compatibility)."""
756 SKILL_REGISTRY.select_and_execute_skill(ai_query, conversation_context)