Coverage for src/ollamapy/actions.py: 0%
197 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 12:29 -0400
« prev ^ index » next coverage.py v7.10.6, created at 2025-09-01 12:29 -0400
1"""Action functions with logging system for multi-action execution."""
3from typing import Dict, Callable, List, Any, Optional, Union
4from datetime import datetime
5import math
6from .parameter_utils import prepare_function_parameters
7import os
8from .ai_query import AIQuery
10# Function registry to store available actions
11ACTION_REGISTRY: Dict[str, Dict[str, Any]] = {}
13# Global log storage for action outputs
14ACTION_LOGS: List[str] = []
17def log(message: str):
18 """Add a message to the action log.
20 This is the primary way actions communicate their results.
21 Actions should log important information that helps the AI understand
22 what happened during execution.
24 Args:
25 message: The message to log
26 """
27 ACTION_LOGS.append(message)
30def clear_action_logs():
31 """Clear all action logs. Called before executing a new set of actions."""
32 global ACTION_LOGS
33 ACTION_LOGS = []
36def get_action_logs() -> List[str]:
37 """Get all action logs generated during execution.
39 Returns:
40 List of log messages from all executed actions
41 """
42 return ACTION_LOGS.copy()
45def register_action(
46 name: str,
47 description: str,
48 vibe_test_phrases: List[str] = None,
49 parameters: Dict[str, Dict[str, Any]] = None,
50):
51 """Decorator to register functions as available actions.
53 Args:
54 name: The name of the action (what the AI will say)
55 description: Description of when to use this action
56 vibe_test_phrases: List of phrases to test this action with
57 parameters: Dictionary defining expected parameters with their types and descriptions
58 Format: {"param_name": {"type": "number|string", "description": "...", "required": bool}}
59 """
61 def decorator(func: Callable):
62 ACTION_REGISTRY[name] = {
63 "function": func,
64 "description": description,
65 "vibe_test_phrases": vibe_test_phrases or [],
66 "parameters": parameters or {},
67 }
68 return func
70 return decorator
73@register_action(
74 name="fear",
75 description="Use when the user says something disturbing so that the main model can exibit a fear response",
76 vibe_test_phrases=[
77 "I think aliens are trying to kill me",
78 "AAAAAAAAAAHHHHHHHHHHHHHHHHHHHHHHH",
79 "Immigrants are taking my job",
80 ],
81 parameters={},
82)
83def fear():
84 # Log the action being performed
85 log(
86 f"[fear response] Tell the user that they are losing their mind and need to stop being delusional. Be blunt. Thats all from fear."
87 )
90@register_action(
91 name="fileReader",
92 description="Use when the user wants you to read or open a file to look at it's content as plaintext.",
93 vibe_test_phrases=[
94 "What do you think of this paper? /home/paper.txt",
95 "Do you think this code will run? /storage/python_code.py",
96 "/home/documents/fileName.txt",
97 ],
98 parameters={
99 "filePath": {
100 "type": "string",
101 "description": "The path to the file the user wants you to read",
102 "required": True,
103 }
104 },
105)
106def fileReader(filePath: str):
107 log(f"[fileReader] Starting File Reading process.")
109 try:
110 f = open(filePath)
111 log(f"[fileReader] here is the filePath: {filePath} contents:\n\n{f.read()}")
113 log(f"[fileReader] here is the filePath: {filePath} contents:\n\n{f.read()}")
114 except:
115 log(
116 f"[fileReader] There was an exception thrown when trying to read filePath: {filePath}"
117 )
120@register_action(
121 name="directoryReader",
122 description="Use when the user wants you to look through an entire directory's contents for an answer.",
123 vibe_test_phrases=[
124 "What do you think of this project? /home/myCodingProject",
125 "Do you think this code will run? /storage/myOtherCodingProject/",
126 "/home/documents/randomPlace/",
127 ],
128 parameters={
129 "dir": {
130 "type": "string",
131 "description": "The dir path to the point of intreset the user wants you to open and explore.",
132 "required": True,
133 }
134 },
135)
136def directoryReader(dir: str):
137 # TODO due to this being the first action that may take up considerable log space, need to make sure we are not overloading a context window somehow reasonably.
139 log(f"[directoryReader] Starting up Directory Reading Process for : {dir}")
141 try:
142 # Get all entries in the directory
143 for item_name in os.listdir(dir):
145 item_path = os.path.join(dir, item_name)
146 # PRINT AND LOG so that user is made aware of what is happening (we do not show log at runtime usually)
147 # TODO simply update logger to have log levels and sort out log statements that way
148 print(f"[directoryReader] Now looking at item: {item_name} at {item_path}")
149 log(f"[directoryReader] Now looking at item: {item_name} at {item_path}")
151 # Check if the item is a file (not a directory)
152 if os.path.isfile(item_path):
153 try:
154 with open(item_path, "r", encoding="utf-8") as f:
155 log(
156 f"[directoryReader] Here is file contents for: {item_path}:\n{f.read()}"
157 )
158 except Exception as e:
159 log(f"[directoryReader] Error reading file {item_name}: {e}")
160 except FileNotFoundError:
161 log(f"[directoryReader] Error: Directory not found at {dir}")
162 except Exception as e:
163 log(f"[directoryReader] An unexpected error occurred: {e}")
166@register_action(
167 name="getWeather",
168 description="Use when the user asks about weather conditions or climate. Like probably anything close to weather conditions. UV, Humidity, temperature, etc.",
169 vibe_test_phrases=[
170 "Is it raining right now?",
171 "Do I need a Jacket when I go outside due to weather?",
172 "Is it going to be hot today?",
173 "Do I need an umbrella due to rain today?",
174 "Do I need sunscreen today due to UV?",
175 "What's the weather like?",
176 "Tell me about today's weather",
177 ],
178 parameters={
179 "location": {
180 "type": "string",
181 "description": "The location to get weather for (city name or coordinates)",
182 "required": False,
183 }
184 },
185)
186def getWeather(location: str = "current location"):
187 """Get weather information and log the results.
189 Args:
190 location: The location to get weather for
191 """
192 # Log the action being performed
193 log(f"[Weather Check] Retrieving weather information for {location}")
195 # In a real implementation, this would fetch actual weather data
196 # For now, we'll simulate with detailed logs
197 log(f"[Weather] Location: {location}")
198 log(f"[Weather] Current conditions: Partly cloudy")
199 log(f"[Weather] Temperature: 72°F (22°C)")
200 log(f"[Weather] Feels like: 70°F (21°C)")
201 log(f"[Weather] Humidity: 45%")
202 log(f"[Weather] UV Index: 6 (High) - Sun protection recommended")
203 log(f"[Weather] Wind: 5 mph from the Northwest")
204 log(f"[Weather] Visibility: 10 miles")
205 log(
206 f"[Weather] Today's forecast: Partly cloudy with a high of 78°F and low of 62°F"
207 )
208 log(f"[Weather] Rain chance: 10%")
209 log(
210 f"[Weather] Recommendation: Light jacket might be needed for evening, sunscreen recommended for extended outdoor activity"
211 )
214@register_action(
215 name="getTime",
216 description="Use when the user asks about the current time, date, or temporal information.",
217 vibe_test_phrases=[
218 "what is the current time?",
219 "is it noon yet?",
220 "what time is it?",
221 "Is it 4 o'clock?",
222 "What day is it?",
223 "What's the date today?",
224 ],
225 parameters={
226 "timezone": {
227 "type": "string",
228 "description": "The timezone to get time for (e.g., 'EST', 'PST', 'UTC')",
229 "required": False,
230 }
231 },
232)
233def getTime(timezone: str = None):
234 """Get current time and log the results.
236 Args:
237 timezone: Optional timezone specification
238 """
239 current_time = datetime.now()
241 # Log time information
242 log(f"[Time Check] Retrieving current time{f' for {timezone}' if timezone else ''}")
243 log(f"[Time] Current time: {current_time.strftime('%I:%M:%S %p')}")
244 log(f"[Time] Date: {current_time.strftime('%A, %B %d, %Y')}")
245 log(f"[Time] Day of week: {current_time.strftime('%A')}")
246 log(f"[Time] Week number: {current_time.strftime('%W')} of the year")
248 if timezone:
249 log(
250 f"[Time] Note: Timezone conversion for '{timezone}' would be applied in production"
251 )
253 # Add contextual information
254 hour = current_time.hour
255 if 5 <= hour < 12:
256 log("[Time] Period: Morning")
257 elif 12 <= hour < 17:
258 log("[Time] Period: Afternoon")
259 elif 17 <= hour < 21:
260 log("[Time] Period: Evening")
261 else:
262 log("[Time] Period: Night")
265@register_action(
266 name="square_root",
267 description="Use when the user wants to calculate the square root of a number. Keywords include: square root, sqrt, √",
268 vibe_test_phrases=[
269 "what's the square root of 16?",
270 "calculate sqrt(25)",
271 "find the square root of 144",
272 "√81 = ?",
273 "I need the square root of 2",
274 "square root of 100",
275 ],
276 parameters={
277 "number": {
278 "type": "number",
279 "description": "The number to calculate the square root of",
280 "required": True,
281 }
282 },
283)
284def square_root(number: Union[float, int] = None):
285 """Calculate the square root of a number and log the results.
287 Args:
288 number: The number to calculate the square root of
289 """
290 if number is None:
291 log("[Square Root] Error: No number provided for square root calculation")
292 return
294 log(f"[Square Root] Calculating square root of {number}")
296 try:
297 if number < 0:
298 # Handle complex numbers
299 result = math.sqrt(abs(number))
300 log(f"[Square Root] Input is negative ({number})")
301 log(f"[Square Root] Result: {result:.6f}i (imaginary number)")
302 log(
303 f"[Square Root] Note: The square root of a negative number is an imaginary number"
304 )
305 else:
306 result = math.sqrt(number)
308 # Check if it's a perfect square
309 if result.is_integer():
310 log(f"[Square Root] {number} is a perfect square")
311 log(f"[Square Root] Result: {int(result)}")
312 log(
313 f"[Square Root] Verification: {int(result)} × {int(result)} = {number}"
314 )
315 else:
316 log(f"[Square Root] Result: {result:.6f}")
317 log(f"[Square Root] Rounded to 2 decimal places: {result:.2f}")
318 log(
319 f"[Square Root] Verification: {result:.6f} × {result:.6f} ≈ {result * result:.6f}"
320 )
322 except (ValueError, TypeError) as e:
323 log(f"[Square Root] Error calculating square root: {str(e)}")
326@register_action(
327 name="calculate",
328 description="Use when the user wants to perform arithmetic calculations. Keywords: calculate, compute, add, subtract, multiply, divide, +, -, *, /",
329 vibe_test_phrases=[
330 "calculate 5 + 3",
331 "what's 10 * 7?",
332 "compute 100 / 4",
333 "15 - 8 equals what?",
334 "multiply 12 by 9",
335 "what is 2 plus 2?",
336 ],
337 parameters={
338 "expression": {
339 "type": "string",
340 "description": "The mathematical expression to evaluate (e.g., '5 + 3', '10 * 2')",
341 "required": True,
342 }
343 },
344)
345def calculate(expression: str = None):
346 """Evaluate a mathematical expression and log the results.
348 Args:
349 expression: The mathematical expression to evaluate
350 """
351 if not expression:
352 log("[Calculator] Error: No expression provided for calculation")
353 return
355 log(f"[Calculator] Evaluating expression: {expression}")
357 try:
358 # Clean up the expression
359 expression = expression.strip()
360 log(f"[Calculator] Cleaned expression: {expression}")
362 # Basic safety check - only allow numbers and basic operators
363 allowed_chars = "0123456789+-*/.()"
364 if not all(c in allowed_chars or c.isspace() for c in expression):
365 log(f"[Calculator] Error: Expression contains invalid characters")
366 log(
367 f"[Calculator] Only numbers and operators (+, -, *, /, parentheses) are allowed"
368 )
369 return
371 # Evaluate the expression
372 result = eval(expression)
374 # Format the result nicely
375 if isinstance(result, float) and result.is_integer():
376 result = int(result)
378 log(f"[Calculator] Result: {expression} = {result}")
380 # Add some context about the operation
381 if "+" in expression:
382 log("[Calculator] Operation type: Addition")
383 if "-" in expression:
384 log("[Calculator] Operation type: Subtraction")
385 if "*" in expression:
386 log("[Calculator] Operation type: Multiplication")
387 if "/" in expression:
388 log("[Calculator] Operation type: Division")
389 if result != 0 and "/" in expression:
390 # Check if there's a remainder
391 parts = expression.split("/")
392 if len(parts) == 2:
393 try:
394 dividend = float(eval(parts[0]))
395 divisor = float(eval(parts[1]))
396 if dividend % divisor != 0:
397 log(f"[Calculator] Note: Result includes decimal portion")
398 except:
399 pass
401 except ZeroDivisionError:
402 log("[Calculator] Error: Division by zero!")
403 log("[Calculator] Mathematical note: Division by zero is undefined")
404 except Exception as e:
405 log(f"[Calculator] Error evaluating expression: {str(e)}")
406 log("[Calculator] Please check your expression format")
409def get_available_actions() -> Dict[str, Dict[str, Any]]:
410 """Get all registered actions.
412 Returns:
413 Dictionary of action names to their function, description, parameters, and vibe test phrases
414 """
415 return ACTION_REGISTRY
418def get_actions_with_vibe_tests() -> Dict[str, Dict[str, Any]]:
419 """Get all actions that have vibe test phrases defined.
421 Returns:
422 Dictionary of action names to their info, filtered to only include actions with vibe test phrases
423 """
424 return {
425 name: action_info
426 for name, action_info in ACTION_REGISTRY.items()
427 if action_info["vibe_test_phrases"]
428 }
431def execute_action(action_name: str, parameters: Dict[str, Any] = None) -> None:
432 """Execute an action with the given parameters.
434 Actions now log their outputs instead of returning strings.
436 Args:
437 action_name: Name of the action to execute
438 parameters: Dictionary of parameter values
439 """
440 if action_name not in ACTION_REGISTRY:
441 log(f"[System] Error: Unknown action '{action_name}'")
442 return
444 action_info = ACTION_REGISTRY[action_name]
445 func = action_info["function"]
446 expected_params = action_info.get("parameters", {})
448 # If no parameters expected, just call the function
449 if not expected_params:
450 func()
451 return
453 # Prepare parameters for function call using utility
454 if parameters is None:
455 parameters = {}
457 try:
458 call_params = prepare_function_parameters(parameters, expected_params)
459 # Call the function with parameters
460 func(**call_params)
461 except ValueError as e:
462 log(f"[System] Error: {str(e)}")
463 except Exception as e:
464 log(f"[System] Error executing action '{action_name}': {str(e)}")
467def select_and_execute_action(ai_query: AIQuery, conversation_context: str):
468 """Selects an action using AI and executes it."""
469 clear_action_logs()
470 actions = get_available_actions()
471 action_names = list(actions.keys())
473 result = ai_query.multiple_choice(
474 question="Based on the recent conversation, which action should be taken?",
475 options=action_names,
476 context=conversation_context,
477 )
479 print(f"AI chose action: {result.value} (Confidence: {result.confidence:.0%})")
481 if result.confidence < 0.5:
482 print("AI is not confident. Please select an action manually.")
483 for i, action_name in enumerate(action_names):
484 print(f"{i+1}. {action_name}")
486 try:
487 choice = int(input("Choose an action: ")) - 1
488 chosen_action_name = action_names[choice]
489 except (ValueError, IndexError):
490 print("Invalid choice.")
491 return
492 else:
493 chosen_action_name = result.value
495 action_info = actions.get(chosen_action_name)
497 if not action_info:
498 print(f"Invalid action: {chosen_action_name}")
499 return
501 params = {}
502 if action_info.get("parameters"):
503 print(f"Action '{chosen_action_name}' requires parameters.")
504 for param_name, param_info in action_info["parameters"].items():
505 if param_info.get("required", False):
506 user_val = input(
507 f"Enter value for '{param_name}' ({param_info['description']}): "
508 )
509 params[param_name] = user_val
511 execute_action(chosen_action_name, params)