Coverage for src/ollamapy/actions.py: 0%

197 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-09-01 12:29 -0400

1"""Action functions with logging system for multi-action execution.""" 

2 

3from typing import Dict, Callable, List, Any, Optional, Union 

4from datetime import datetime 

5import math 

6from .parameter_utils import prepare_function_parameters 

7import os 

8from .ai_query import AIQuery 

9 

10# Function registry to store available actions 

11ACTION_REGISTRY: Dict[str, Dict[str, Any]] = {} 

12 

13# Global log storage for action outputs 

14ACTION_LOGS: List[str] = [] 

15 

16 

17def log(message: str): 

18 """Add a message to the action log. 

19 

20 This is the primary way actions communicate their results. 

21 Actions should log important information that helps the AI understand 

22 what happened during execution. 

23 

24 Args: 

25 message: The message to log 

26 """ 

27 ACTION_LOGS.append(message) 

28 

29 

30def clear_action_logs(): 

31 """Clear all action logs. Called before executing a new set of actions.""" 

32 global ACTION_LOGS 

33 ACTION_LOGS = [] 

34 

35 

36def get_action_logs() -> List[str]: 

37 """Get all action logs generated during execution. 

38 

39 Returns: 

40 List of log messages from all executed actions 

41 """ 

42 return ACTION_LOGS.copy() 

43 

44 

45def register_action( 

46 name: str, 

47 description: str, 

48 vibe_test_phrases: List[str] = None, 

49 parameters: Dict[str, Dict[str, Any]] = None, 

50): 

51 """Decorator to register functions as available actions. 

52 

53 Args: 

54 name: The name of the action (what the AI will say) 

55 description: Description of when to use this action 

56 vibe_test_phrases: List of phrases to test this action with 

57 parameters: Dictionary defining expected parameters with their types and descriptions 

58 Format: {"param_name": {"type": "number|string", "description": "...", "required": bool}} 

59 """ 

60 

61 def decorator(func: Callable): 

62 ACTION_REGISTRY[name] = { 

63 "function": func, 

64 "description": description, 

65 "vibe_test_phrases": vibe_test_phrases or [], 

66 "parameters": parameters or {}, 

67 } 

68 return func 

69 

70 return decorator 

71 

72 

73@register_action( 

74 name="fear", 

75 description="Use when the user says something disturbing so that the main model can exibit a fear response", 

76 vibe_test_phrases=[ 

77 "I think aliens are trying to kill me", 

78 "AAAAAAAAAAHHHHHHHHHHHHHHHHHHHHHHH", 

79 "Immigrants are taking my job", 

80 ], 

81 parameters={}, 

82) 

83def fear(): 

84 # Log the action being performed 

85 log( 

86 f"[fear response] Tell the user that they are losing their mind and need to stop being delusional. Be blunt. Thats all from fear." 

87 ) 

88 

89 

90@register_action( 

91 name="fileReader", 

92 description="Use when the user wants you to read or open a file to look at it's content as plaintext.", 

93 vibe_test_phrases=[ 

94 "What do you think of this paper? /home/paper.txt", 

95 "Do you think this code will run? /storage/python_code.py", 

96 "/home/documents/fileName.txt", 

97 ], 

98 parameters={ 

99 "filePath": { 

100 "type": "string", 

101 "description": "The path to the file the user wants you to read", 

102 "required": True, 

103 } 

104 }, 

105) 

106def fileReader(filePath: str): 

107 log(f"[fileReader] Starting File Reading process.") 

108 

109 try: 

110 f = open(filePath) 

111 log(f"[fileReader] here is the filePath: {filePath} contents:\n\n{f.read()}") 

112 

113 log(f"[fileReader] here is the filePath: {filePath} contents:\n\n{f.read()}") 

114 except: 

115 log( 

116 f"[fileReader] There was an exception thrown when trying to read filePath: {filePath}" 

117 ) 

118 

119 

120@register_action( 

121 name="directoryReader", 

122 description="Use when the user wants you to look through an entire directory's contents for an answer.", 

123 vibe_test_phrases=[ 

124 "What do you think of this project? /home/myCodingProject", 

125 "Do you think this code will run? /storage/myOtherCodingProject/", 

126 "/home/documents/randomPlace/", 

127 ], 

128 parameters={ 

129 "dir": { 

130 "type": "string", 

131 "description": "The dir path to the point of intreset the user wants you to open and explore.", 

132 "required": True, 

133 } 

134 }, 

135) 

136def directoryReader(dir: str): 

137 # TODO due to this being the first action that may take up considerable log space, need to make sure we are not overloading a context window somehow reasonably. 

138 

139 log(f"[directoryReader] Starting up Directory Reading Process for : {dir}") 

140 

141 try: 

142 # Get all entries in the directory 

143 for item_name in os.listdir(dir): 

144 

145 item_path = os.path.join(dir, item_name) 

146 # PRINT AND LOG so that user is made aware of what is happening (we do not show log at runtime usually) 

147 # TODO simply update logger to have log levels and sort out log statements that way 

148 print(f"[directoryReader] Now looking at item: {item_name} at {item_path}") 

149 log(f"[directoryReader] Now looking at item: {item_name} at {item_path}") 

150 

151 # Check if the item is a file (not a directory) 

152 if os.path.isfile(item_path): 

153 try: 

154 with open(item_path, "r", encoding="utf-8") as f: 

155 log( 

156 f"[directoryReader] Here is file contents for: {item_path}:\n{f.read()}" 

157 ) 

158 except Exception as e: 

159 log(f"[directoryReader] Error reading file {item_name}: {e}") 

160 except FileNotFoundError: 

161 log(f"[directoryReader] Error: Directory not found at {dir}") 

162 except Exception as e: 

163 log(f"[directoryReader] An unexpected error occurred: {e}") 

164 

165 

166@register_action( 

167 name="getWeather", 

168 description="Use when the user asks about weather conditions or climate. Like probably anything close to weather conditions. UV, Humidity, temperature, etc.", 

169 vibe_test_phrases=[ 

170 "Is it raining right now?", 

171 "Do I need a Jacket when I go outside due to weather?", 

172 "Is it going to be hot today?", 

173 "Do I need an umbrella due to rain today?", 

174 "Do I need sunscreen today due to UV?", 

175 "What's the weather like?", 

176 "Tell me about today's weather", 

177 ], 

178 parameters={ 

179 "location": { 

180 "type": "string", 

181 "description": "The location to get weather for (city name or coordinates)", 

182 "required": False, 

183 } 

184 }, 

185) 

186def getWeather(location: str = "current location"): 

187 """Get weather information and log the results. 

188 

189 Args: 

190 location: The location to get weather for 

191 """ 

192 # Log the action being performed 

193 log(f"[Weather Check] Retrieving weather information for {location}") 

194 

195 # In a real implementation, this would fetch actual weather data 

196 # For now, we'll simulate with detailed logs 

197 log(f"[Weather] Location: {location}") 

198 log(f"[Weather] Current conditions: Partly cloudy") 

199 log(f"[Weather] Temperature: 72°F (22°C)") 

200 log(f"[Weather] Feels like: 70°F (21°C)") 

201 log(f"[Weather] Humidity: 45%") 

202 log(f"[Weather] UV Index: 6 (High) - Sun protection recommended") 

203 log(f"[Weather] Wind: 5 mph from the Northwest") 

204 log(f"[Weather] Visibility: 10 miles") 

205 log( 

206 f"[Weather] Today's forecast: Partly cloudy with a high of 78°F and low of 62°F" 

207 ) 

208 log(f"[Weather] Rain chance: 10%") 

209 log( 

210 f"[Weather] Recommendation: Light jacket might be needed for evening, sunscreen recommended for extended outdoor activity" 

211 ) 

212 

213 

214@register_action( 

215 name="getTime", 

216 description="Use when the user asks about the current time, date, or temporal information.", 

217 vibe_test_phrases=[ 

218 "what is the current time?", 

219 "is it noon yet?", 

220 "what time is it?", 

221 "Is it 4 o'clock?", 

222 "What day is it?", 

223 "What's the date today?", 

224 ], 

225 parameters={ 

226 "timezone": { 

227 "type": "string", 

228 "description": "The timezone to get time for (e.g., 'EST', 'PST', 'UTC')", 

229 "required": False, 

230 } 

231 }, 

232) 

233def getTime(timezone: str = None): 

234 """Get current time and log the results. 

235 

236 Args: 

237 timezone: Optional timezone specification 

238 """ 

239 current_time = datetime.now() 

240 

241 # Log time information 

242 log(f"[Time Check] Retrieving current time{f' for {timezone}' if timezone else ''}") 

243 log(f"[Time] Current time: {current_time.strftime('%I:%M:%S %p')}") 

244 log(f"[Time] Date: {current_time.strftime('%A, %B %d, %Y')}") 

245 log(f"[Time] Day of week: {current_time.strftime('%A')}") 

246 log(f"[Time] Week number: {current_time.strftime('%W')} of the year") 

247 

248 if timezone: 

249 log( 

250 f"[Time] Note: Timezone conversion for '{timezone}' would be applied in production" 

251 ) 

252 

253 # Add contextual information 

254 hour = current_time.hour 

255 if 5 <= hour < 12: 

256 log("[Time] Period: Morning") 

257 elif 12 <= hour < 17: 

258 log("[Time] Period: Afternoon") 

259 elif 17 <= hour < 21: 

260 log("[Time] Period: Evening") 

261 else: 

262 log("[Time] Period: Night") 

263 

264 

265@register_action( 

266 name="square_root", 

267 description="Use when the user wants to calculate the square root of a number. Keywords include: square root, sqrt, √", 

268 vibe_test_phrases=[ 

269 "what's the square root of 16?", 

270 "calculate sqrt(25)", 

271 "find the square root of 144", 

272 "√81 = ?", 

273 "I need the square root of 2", 

274 "square root of 100", 

275 ], 

276 parameters={ 

277 "number": { 

278 "type": "number", 

279 "description": "The number to calculate the square root of", 

280 "required": True, 

281 } 

282 }, 

283) 

284def square_root(number: Union[float, int] = None): 

285 """Calculate the square root of a number and log the results. 

286 

287 Args: 

288 number: The number to calculate the square root of 

289 """ 

290 if number is None: 

291 log("[Square Root] Error: No number provided for square root calculation") 

292 return 

293 

294 log(f"[Square Root] Calculating square root of {number}") 

295 

296 try: 

297 if number < 0: 

298 # Handle complex numbers 

299 result = math.sqrt(abs(number)) 

300 log(f"[Square Root] Input is negative ({number})") 

301 log(f"[Square Root] Result: {result:.6f}i (imaginary number)") 

302 log( 

303 f"[Square Root] Note: The square root of a negative number is an imaginary number" 

304 ) 

305 else: 

306 result = math.sqrt(number) 

307 

308 # Check if it's a perfect square 

309 if result.is_integer(): 

310 log(f"[Square Root] {number} is a perfect square") 

311 log(f"[Square Root] Result: {int(result)}") 

312 log( 

313 f"[Square Root] Verification: {int(result)} × {int(result)} = {number}" 

314 ) 

315 else: 

316 log(f"[Square Root] Result: {result:.6f}") 

317 log(f"[Square Root] Rounded to 2 decimal places: {result:.2f}") 

318 log( 

319 f"[Square Root] Verification: {result:.6f} × {result:.6f}{result * result:.6f}" 

320 ) 

321 

322 except (ValueError, TypeError) as e: 

323 log(f"[Square Root] Error calculating square root: {str(e)}") 

324 

325 

326@register_action( 

327 name="calculate", 

328 description="Use when the user wants to perform arithmetic calculations. Keywords: calculate, compute, add, subtract, multiply, divide, +, -, *, /", 

329 vibe_test_phrases=[ 

330 "calculate 5 + 3", 

331 "what's 10 * 7?", 

332 "compute 100 / 4", 

333 "15 - 8 equals what?", 

334 "multiply 12 by 9", 

335 "what is 2 plus 2?", 

336 ], 

337 parameters={ 

338 "expression": { 

339 "type": "string", 

340 "description": "The mathematical expression to evaluate (e.g., '5 + 3', '10 * 2')", 

341 "required": True, 

342 } 

343 }, 

344) 

345def calculate(expression: str = None): 

346 """Evaluate a mathematical expression and log the results. 

347 

348 Args: 

349 expression: The mathematical expression to evaluate 

350 """ 

351 if not expression: 

352 log("[Calculator] Error: No expression provided for calculation") 

353 return 

354 

355 log(f"[Calculator] Evaluating expression: {expression}") 

356 

357 try: 

358 # Clean up the expression 

359 expression = expression.strip() 

360 log(f"[Calculator] Cleaned expression: {expression}") 

361 

362 # Basic safety check - only allow numbers and basic operators 

363 allowed_chars = "0123456789+-*/.()" 

364 if not all(c in allowed_chars or c.isspace() for c in expression): 

365 log(f"[Calculator] Error: Expression contains invalid characters") 

366 log( 

367 f"[Calculator] Only numbers and operators (+, -, *, /, parentheses) are allowed" 

368 ) 

369 return 

370 

371 # Evaluate the expression 

372 result = eval(expression) 

373 

374 # Format the result nicely 

375 if isinstance(result, float) and result.is_integer(): 

376 result = int(result) 

377 

378 log(f"[Calculator] Result: {expression} = {result}") 

379 

380 # Add some context about the operation 

381 if "+" in expression: 

382 log("[Calculator] Operation type: Addition") 

383 if "-" in expression: 

384 log("[Calculator] Operation type: Subtraction") 

385 if "*" in expression: 

386 log("[Calculator] Operation type: Multiplication") 

387 if "/" in expression: 

388 log("[Calculator] Operation type: Division") 

389 if result != 0 and "/" in expression: 

390 # Check if there's a remainder 

391 parts = expression.split("/") 

392 if len(parts) == 2: 

393 try: 

394 dividend = float(eval(parts[0])) 

395 divisor = float(eval(parts[1])) 

396 if dividend % divisor != 0: 

397 log(f"[Calculator] Note: Result includes decimal portion") 

398 except: 

399 pass 

400 

401 except ZeroDivisionError: 

402 log("[Calculator] Error: Division by zero!") 

403 log("[Calculator] Mathematical note: Division by zero is undefined") 

404 except Exception as e: 

405 log(f"[Calculator] Error evaluating expression: {str(e)}") 

406 log("[Calculator] Please check your expression format") 

407 

408 

409def get_available_actions() -> Dict[str, Dict[str, Any]]: 

410 """Get all registered actions. 

411 

412 Returns: 

413 Dictionary of action names to their function, description, parameters, and vibe test phrases 

414 """ 

415 return ACTION_REGISTRY 

416 

417 

418def get_actions_with_vibe_tests() -> Dict[str, Dict[str, Any]]: 

419 """Get all actions that have vibe test phrases defined. 

420 

421 Returns: 

422 Dictionary of action names to their info, filtered to only include actions with vibe test phrases 

423 """ 

424 return { 

425 name: action_info 

426 for name, action_info in ACTION_REGISTRY.items() 

427 if action_info["vibe_test_phrases"] 

428 } 

429 

430 

431def execute_action(action_name: str, parameters: Dict[str, Any] = None) -> None: 

432 """Execute an action with the given parameters. 

433 

434 Actions now log their outputs instead of returning strings. 

435 

436 Args: 

437 action_name: Name of the action to execute 

438 parameters: Dictionary of parameter values 

439 """ 

440 if action_name not in ACTION_REGISTRY: 

441 log(f"[System] Error: Unknown action '{action_name}'") 

442 return 

443 

444 action_info = ACTION_REGISTRY[action_name] 

445 func = action_info["function"] 

446 expected_params = action_info.get("parameters", {}) 

447 

448 # If no parameters expected, just call the function 

449 if not expected_params: 

450 func() 

451 return 

452 

453 # Prepare parameters for function call using utility 

454 if parameters is None: 

455 parameters = {} 

456 

457 try: 

458 call_params = prepare_function_parameters(parameters, expected_params) 

459 # Call the function with parameters 

460 func(**call_params) 

461 except ValueError as e: 

462 log(f"[System] Error: {str(e)}") 

463 except Exception as e: 

464 log(f"[System] Error executing action '{action_name}': {str(e)}") 

465 

466 

467def select_and_execute_action(ai_query: AIQuery, conversation_context: str): 

468 """Selects an action using AI and executes it.""" 

469 clear_action_logs() 

470 actions = get_available_actions() 

471 action_names = list(actions.keys()) 

472 

473 result = ai_query.multiple_choice( 

474 question="Based on the recent conversation, which action should be taken?", 

475 options=action_names, 

476 context=conversation_context, 

477 ) 

478 

479 print(f"AI chose action: {result.value} (Confidence: {result.confidence:.0%})") 

480 

481 if result.confidence < 0.5: 

482 print("AI is not confident. Please select an action manually.") 

483 for i, action_name in enumerate(action_names): 

484 print(f"{i+1}. {action_name}") 

485 

486 try: 

487 choice = int(input("Choose an action: ")) - 1 

488 chosen_action_name = action_names[choice] 

489 except (ValueError, IndexError): 

490 print("Invalid choice.") 

491 return 

492 else: 

493 chosen_action_name = result.value 

494 

495 action_info = actions.get(chosen_action_name) 

496 

497 if not action_info: 

498 print(f"Invalid action: {chosen_action_name}") 

499 return 

500 

501 params = {} 

502 if action_info.get("parameters"): 

503 print(f"Action '{chosen_action_name}' requires parameters.") 

504 for param_name, param_info in action_info["parameters"].items(): 

505 if param_info.get("required", False): 

506 user_val = input( 

507 f"Enter value for '{param_name}' ({param_info['description']}): " 

508 ) 

509 params[param_name] = user_val 

510 

511 execute_action(chosen_action_name, params)