110 lines
3.3 KiB
Python
110 lines
3.3 KiB
Python
"""
|
|
memory_api.py — Session + place memory interface
|
|
"""
|
|
import sys
|
|
import os
|
|
from Core.env_loader import PROJECT_ROOT
|
|
from API.odometry_api import get_position
|
|
|
|
MEMORY_AVAILABLE = False
|
|
mem = None
|
|
|
|
|
|
def init_memory() -> bool:
|
|
"""Start memory subsystem. Returns True if successful."""
|
|
global MEMORY_AVAILABLE, mem
|
|
|
|
# marcus_memory.py lives in Brain/
|
|
brain_dir = os.path.join(PROJECT_ROOT, "Brain")
|
|
for d in (brain_dir,):
|
|
if d not in sys.path:
|
|
sys.path.insert(0, d)
|
|
|
|
try:
|
|
from marcus_memory import Memory
|
|
mem = Memory()
|
|
mem.start_session()
|
|
MEMORY_AVAILABLE = True
|
|
print("Memory started")
|
|
return True
|
|
except ImportError as e:
|
|
print(f"marcus_memory.py not found ({e}) — memory disabled")
|
|
return False
|
|
except Exception as e:
|
|
print(f"Memory error: {e} — memory disabled")
|
|
return False
|
|
|
|
|
|
def log_cmd(cmd: str, response: str = "", duration: float = 0.0):
|
|
"""Log command to session memory."""
|
|
if mem:
|
|
try:
|
|
mem.log_command(cmd, response, duration)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def log_detection(class_name: str, position: str = "", distance: str = ""):
|
|
"""Log YOLO detection to session memory with current position."""
|
|
if mem and class_name:
|
|
pos = get_position()
|
|
try:
|
|
mem.log_detection(
|
|
class_name, position, distance,
|
|
x=pos["x"] if pos else None,
|
|
y=pos["y"] if pos else None,
|
|
)
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def place_save(name: str) -> bool:
|
|
"""Save current position as named place."""
|
|
if not mem:
|
|
print(" [Places] Memory not available — cannot save place")
|
|
return False
|
|
pos = get_position()
|
|
return mem.save_place(
|
|
name,
|
|
x=pos["x"] if pos else None,
|
|
y=pos["y"] if pos else None,
|
|
heading=pos["heading"] if pos else None,
|
|
)
|
|
|
|
|
|
def place_goto(name: str) -> bool:
|
|
"""Navigate to a named saved place."""
|
|
from API.odometry_api import odom, ODOM_AVAILABLE
|
|
|
|
if not mem:
|
|
print(" [Places] Memory not available")
|
|
return False
|
|
place = mem.get_place(name)
|
|
if place is None:
|
|
return False
|
|
if not place.get("has_odom"):
|
|
print(f" [Places] '{name}' was saved without odometry — no coordinates")
|
|
return False
|
|
if not odom or not ODOM_AVAILABLE:
|
|
print(" [Places] Odometry not running — cannot navigate")
|
|
return False
|
|
print(f" [Places] Navigating to '{name}' "
|
|
f"(x={place['x']:.2f}, y={place['y']:.2f}, h={place['heading']:.1f})")
|
|
return odom.navigate_to(place["x"], place["y"], place["heading"])
|
|
|
|
|
|
def places_list_str() -> str:
|
|
"""Return formatted string of all saved places."""
|
|
if not mem:
|
|
return "Memory not available"
|
|
places = mem.list_places()
|
|
if not places:
|
|
return "No places saved yet. Say 'remember this as <name>' to save one."
|
|
lines = [f" {'Name':<20} {'Coordinates':<25} {'Saved at'}"]
|
|
lines.append(" " + "-" * 60)
|
|
for p in places:
|
|
coord = (f"x={p['x']:.2f} y={p['y']:.2f} h={p['heading']:.1f}"
|
|
if p.get("has_odom") else "no coordinates")
|
|
lines.append(f" {p['name']:<20} {coord:<25} {p.get('saved_at', '')}")
|
|
return "\n".join(lines)
|