763 lines
30 KiB
Python
763 lines
30 KiB
Python
import numpy as np
|
|
import math
|
|
import pickle
|
|
import random
|
|
from collections import defaultdict
|
|
import gtsam
|
|
from gtsam import Pose2, BetweenFactorPose2, NonlinearFactorGraph, Values
|
|
from gtsam import ISAM2, ISAM2Params
|
|
from gtsam.noiseModel import Diagonal
|
|
from PyQt5 import QtWidgets, QtGui, QtCore
|
|
from PyQt5.QtWidgets import QApplication, QMainWindow, QGraphicsScene, QGraphicsView, QGraphicsEllipseItem, QGraphicsLineItem, QFileDialog, QVBoxLayout, QWidget, QStatusBar
|
|
from pytmx import TiledMap
|
|
import sys
|
|
import time
|
|
import logging
|
|
import matplotlib
|
|
matplotlib.use('Qt5Agg')
|
|
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
|
|
from matplotlib.figure import Figure
|
|
|
|
# ==============================
|
|
# LOGGING SETUP
|
|
# ==============================
|
|
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# ==============================
|
|
# NOISE MODELS
|
|
# ==============================
|
|
class NoiseModel:
|
|
def sample(self):
|
|
raise NotImplementedError("Subclasses must implement sample()")
|
|
|
|
class DriftNoiseModel(NoiseModel):
|
|
def __init__(self, drift_std=1e-4):
|
|
self.drift_std = drift_std
|
|
self.bias = 0.0
|
|
|
|
def sample(self):
|
|
self.bias += np.random.normal(0, self.drift_std)
|
|
return self.bias
|
|
|
|
class TemperatureNoiseModel(NoiseModel):
|
|
def __init__(self, coeff=0.001):
|
|
self.coeff = coeff
|
|
self.temperature = 20.0 # Default temperature in °C
|
|
|
|
def set_temperature(self, temp):
|
|
self.temperature = temp
|
|
|
|
def sample(self):
|
|
return (self.temperature - 20.0) * self.coeff
|
|
|
|
# ==============================
|
|
# SENSOR MODEL
|
|
# ==============================
|
|
class Sensor:
|
|
def __init__(self, name, sensor_type, pos_x, pos_y, angle_rad,
|
|
min_range, max_range, fov_rad, noise_std=0.01):
|
|
self.name = name
|
|
self.sensor_type = sensor_type.upper()
|
|
self.pos_x = pos_x
|
|
self.pos_y = pos_y
|
|
self.angle_rad = angle_rad
|
|
self.min_range = min_range
|
|
self.max_range = max_range
|
|
self.fov_rad = fov_rad
|
|
self.drift_model = DriftNoiseModel(drift_std=noise_std * 1e-2)
|
|
self.temp_model = TemperatureNoiseModel(coeff=noise_std * 1e-2)
|
|
self.noise_std = noise_std
|
|
# Set number of rays based on sensor type for multi-ray casting
|
|
if self.sensor_type == "ULTRASOUND":
|
|
self.num_rays = 7
|
|
elif self.sensor_type == "IR":
|
|
self.num_rays = 3
|
|
else:
|
|
self.num_rays = 1
|
|
|
|
def measure(self, env, sx, sy, global_angle_rad):
|
|
try:
|
|
# Multi-ray casting to find the closest obstacle within FOV
|
|
if self.num_rays == 1:
|
|
true_dist = env.raycast(sx, sy, global_angle_rad, self.max_range)
|
|
if true_dist is None:
|
|
true_dist = self.max_range
|
|
else:
|
|
half_fov = self.fov_rad / 2
|
|
start_angle = global_angle_rad - half_fov
|
|
end_angle = global_angle_rad + half_fov
|
|
ray_angles = np.linspace(start_angle, end_angle, self.num_rays)
|
|
distances = [env.raycast(sx, sy, angle, self.max_range) for angle in ray_angles]
|
|
distances = [d if d is not None else self.max_range for d in distances]
|
|
true_dist = min(distances)
|
|
|
|
# Distance-dependent noise based on sensor type
|
|
if self.sensor_type == "ULTRASOUND":
|
|
noise_std = self.noise_std * (1 + (true_dist / self.max_range) ** 2)
|
|
elif self.sensor_type == "IR":
|
|
noise_std = 0.05 * true_dist # 5% of distance as per typical IR specs
|
|
else:
|
|
noise_std = self.noise_std
|
|
|
|
# Temperature scaling for ultrasound (speed of sound effect)
|
|
if self.sensor_type == "ULTRASOUND":
|
|
c_true = 331 + 0.6 * self.temp_model.temperature # m/s
|
|
scale_factor = 343 / c_true # 343 m/s is assumed speed at 20°C
|
|
d_scaled = true_dist * scale_factor
|
|
else:
|
|
d_scaled = true_dist
|
|
|
|
# Apply drift and noise
|
|
drift = self.drift_model.sample()
|
|
noise = np.random.normal(0, noise_std)
|
|
measured = d_scaled + drift + noise
|
|
|
|
# Drop-out probability for ultrasound (2% chance of returning max_range)
|
|
if self.sensor_type == "ULTRASOUND" and np.random.random() < 0.02:
|
|
measured = self.max_range
|
|
|
|
# Clamp measurement to valid range
|
|
return max(self.min_range, min(self.max_range, measured))
|
|
except Exception as e:
|
|
logger.error(f"Error in sensor measurement: {e}")
|
|
return self.max_range
|
|
|
|
# ==============================
|
|
# SENSOR MANAGEMENT
|
|
# ==============================
|
|
class DynamicCrosstalkManager:
|
|
def __init__(self, sensors, interference_dist=0.10, angle_overlap_rad=0.1745): # Updated interference_dist
|
|
self.sensors = sensors
|
|
self.N = len(sensors)
|
|
self.interference_dist = interference_dist
|
|
self.angle_overlap_rad = angle_overlap_rad
|
|
self.conflict = np.zeros((self.N, self.N), dtype=bool)
|
|
self._build_conflict_matrix()
|
|
self.slots = self._build_time_slots()
|
|
|
|
def _angle_diff(self, a_rad, b_rad, fov_a, fov_b):
|
|
d = abs((b_rad - a_rad + np.pi) % (2 * np.pi) - np.pi)
|
|
return d < (self.angle_overlap_rad + fov_a + fov_b)
|
|
|
|
def _build_conflict_matrix(self):
|
|
for i in range(self.N):
|
|
for j in range(i + 1, self.N):
|
|
si, sj = self.sensors[i], self.sensors[j]
|
|
dist = math.hypot(si.pos_x - sj.pos_x, si.pos_y - sj.pos_y)
|
|
if dist < self.interference_dist and self._angle_diff(si.angle_rad, sj.angle_rad, si.fov_rad, sj.fov_rad):
|
|
self.conflict[i, j] = self.conflict[j, i] = True
|
|
|
|
def _build_time_slots(self):
|
|
slots = []
|
|
for i in range(self.N):
|
|
placed = False
|
|
for slot in slots:
|
|
if all(not self.conflict[i, j] for j in slot):
|
|
slot.append(i)
|
|
placed = True
|
|
break
|
|
if not placed:
|
|
slots.append([i])
|
|
return slots
|
|
|
|
def firing_order(self):
|
|
return self.slots
|
|
|
|
class SensorManager:
|
|
def __init__(self, sensors):
|
|
self.sensors = sensors
|
|
|
|
def cluster_sensors(self, eps=0.1, min_samples=1):
|
|
from sklearn.cluster import DBSCAN
|
|
feats = np.array([[s.pos_x, s.pos_y, s.angle_rad] for s in self.sensors])
|
|
labels = DBSCAN(eps=eps, min_samples=min_samples).fit_predict(feats)
|
|
clusters = defaultdict(list)
|
|
for i, l in enumerate(labels):
|
|
clusters[l].append(i)
|
|
return list(clusters.values())
|
|
|
|
def schedule(self, heading_rad):
|
|
dtcm = DynamicCrosstalkManager(self.sensors, interference_dist=0.10, angle_overlap_rad=0.0873)
|
|
slots = dtcm.firing_order()
|
|
fwd, bwd = [], []
|
|
for slot in slots:
|
|
if any(abs(((self.sensors[i].angle_rad - heading_rad + np.pi) % (2 * np.pi)) - np.pi) > np.pi/2 for i in slot):
|
|
bwd.append(slot)
|
|
else:
|
|
fwd.append(slot)
|
|
return fwd + bwd
|
|
|
|
# ==============================
|
|
# MOTION & ODOMETRY
|
|
# ==============================
|
|
class MotionModel:
|
|
def __init__(self, wheel_base=0.1):
|
|
self.wheel_base = wheel_base
|
|
|
|
def update_pose(self, pose, v, omega, dt):
|
|
x, y, theta = pose
|
|
if abs(omega) < 1e-6:
|
|
return (x + v * math.cos(theta) * dt, y + v * math.sin(theta) * dt, theta)
|
|
R = v / omega
|
|
dth = omega * dt
|
|
return (x + R * (math.sin(theta + dth) - math.sin(theta)),
|
|
y - R * (math.cos(theta + dth) - math.cos(theta)),
|
|
(theta + dth) % (2 * np.pi))
|
|
|
|
class OdometrySimulator:
|
|
def __init__(self, noise_std_v=0.01, noise_std_omega=0.005):
|
|
self.noise_std_v = noise_std_v
|
|
self.noise_std_omega = noise_std_omega
|
|
|
|
def get_odometry(self, v_cmd, omega_cmd):
|
|
return (v_cmd + np.random.normal(0, self.noise_std_v),
|
|
omega_cmd + np.random.normal(0, self.noise_std_omega))
|
|
|
|
# ==============================
|
|
# SLAM ENGINE
|
|
# ==============================
|
|
class SLAMEngine:
|
|
def __init__(self, grid_resolution=0.02):
|
|
self.og = OccupancyGrid(resolution=grid_resolution)
|
|
self.isam = ISAM2(ISAM2Params())
|
|
self.initial = Values()
|
|
self.noise = Diagonal.Sigmas([1e-2, 1e-2, 1e-3])
|
|
self.isam_index = 1 # Start at 1 since x0 is initialized
|
|
self.graph = NonlinearFactorGraph()
|
|
self.initial.insert(gtsam.symbol('x', 0), Pose2(0, 0, 0))
|
|
self.graph.add(gtsam.PriorFactorPose2(gtsam.symbol('x', 0), Pose2(0, 0, 0), self.noise))
|
|
self.isam.update(self.graph, self.initial)
|
|
self.graph = NonlinearFactorGraph()
|
|
self.initial = Values()
|
|
|
|
def predict(self, odom):
|
|
try:
|
|
key0 = gtsam.symbol('x', self.isam_index - 1)
|
|
key1 = gtsam.symbol('x', self.isam_index)
|
|
|
|
prev = self.isam.calculateEstimate().atPose2(key0)
|
|
dx, dtheta = odom
|
|
incr = Pose2(dx, 0.0, dtheta)
|
|
|
|
self.initial.insert(key1, prev.compose(incr))
|
|
self.graph.add(BetweenFactorPose2(key0, key1, incr, self.noise))
|
|
|
|
self.isam.update(self.graph, self.initial)
|
|
self.graph = NonlinearFactorGraph()
|
|
self.initial = Values()
|
|
|
|
self.isam_index += 1
|
|
except Exception as e:
|
|
logger.error(f"Error in SLAM prediction: {e}")
|
|
|
|
def update(self, sensor_data):
|
|
try:
|
|
pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1))
|
|
for meas in sensor_data:
|
|
self.og.cone_update(*meas)
|
|
except Exception as e:
|
|
logger.error(f"Error in SLAM update: {e}")
|
|
|
|
def detect_loop_closure(self, threshold=0.2):
|
|
if self.isam_index < 10:
|
|
return
|
|
try:
|
|
curr_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1))
|
|
curr_t = curr_pose.translation()
|
|
for i in range(max(0, self.isam_index - 50), self.isam_index - 5):
|
|
prev_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', i))
|
|
prev_t = prev_pose.translation()
|
|
dist = np.linalg.norm(curr_t - prev_t)
|
|
if dist < threshold:
|
|
rel = prev_pose.inverse().compose(curr_pose)
|
|
self.graph.add(BetweenFactorPose2(gtsam.symbol('x', i),
|
|
gtsam.symbol('x', self.isam_index - 1),
|
|
rel, self.noise))
|
|
logger.info(f"Loop closure detected between poses x{i} and x{self.isam_index - 1}")
|
|
break
|
|
except Exception as e:
|
|
logger.error(f"Error in loop closure detection: {e}")
|
|
|
|
def optimize(self):
|
|
try:
|
|
self.isam.update(self.graph, self.initial)
|
|
self.initial = Values()
|
|
self.graph = NonlinearFactorGraph()
|
|
except RuntimeError as e:
|
|
logger.error(f"Optimization failed: {e}")
|
|
|
|
# ==============================
|
|
# OCCUPANCY GRID & ANALYSIS
|
|
# ==============================
|
|
class OccupancyGrid:
|
|
def __init__(self, width_m=3.0, height_m=2.0, resolution=0.02):
|
|
self.width_m = width_m
|
|
self.height_m = height_m
|
|
self.resolution = resolution
|
|
self.cells_x = int(math.ceil(width_m / resolution))
|
|
self.cells_y = int(math.ceil(height_m / resolution))
|
|
self.grid = np.full((self.cells_y, self.cells_x), np.nan, dtype=float)
|
|
self.coverage = np.zeros_like(self.grid, dtype=int)
|
|
|
|
def world_to_map(self, wx, wy):
|
|
c = int(wx / self.resolution)
|
|
r = int(wy / self.resolution)
|
|
return (r, c) if 0 <= r < self.cells_y and 0 <= c < self.cells_x else None
|
|
|
|
def update_cell_logodds(self, row, col, prob):
|
|
p = max(min(prob, 0.99), 0.01)
|
|
if np.isnan(self.grid[row, col]):
|
|
self.grid[row, col] = 0.0
|
|
self.grid[row, col] += math.log(p / (1 - p))
|
|
self.grid[row, col] = np.clip(self.grid[row, col], -20, 20) # Clipping to prevent overflow
|
|
self.coverage[row, col] += 1
|
|
|
|
def cone_update(self, sx, sy, center_angle_rad, dist, fov_rad, sub_rays=7, occ_prob=0.7, free_prob=0.3):
|
|
half = fov_rad / 2
|
|
step = (2 * half) / (sub_rays - 1) if sub_rays > 1 else 0
|
|
start = center_angle_rad - half
|
|
for i in range(sub_rays):
|
|
a = start + i * step
|
|
dx, dy = math.cos(a), math.sin(a)
|
|
steps = max(1, int(dist / (self.resolution * 1.5)))
|
|
for s in range(steps):
|
|
frac = s / steps
|
|
wx, wy = sx + frac * dist * dx, sy + frac * dist * dy
|
|
m = self.world_to_map(wx, wy)
|
|
if not m:
|
|
break
|
|
self.update_cell_logodds(m[0], m[1], free_prob)
|
|
fin = self.world_to_map(sx + dist * dx, sy + dist * dy)
|
|
if fin:
|
|
self.update_cell_logodds(fin[0], fin[1], occ_prob)
|
|
|
|
def get_probability(self):
|
|
valid = ~np.isnan(self.grid)
|
|
prob = np.full_like(self.grid, np.nan)
|
|
prob[valid] = np.exp(self.grid[valid]) / (1 + np.exp(self.grid[valid]))
|
|
return prob
|
|
|
|
def save(self, fn):
|
|
with open(fn, 'wb') as f:
|
|
pickle.dump(self, f)
|
|
|
|
@staticmethod
|
|
def load(fn):
|
|
with open(fn, 'rb') as f:
|
|
return pickle.load(f)
|
|
|
|
class CoverageAnalyzer:
|
|
def __init__(self, og):
|
|
self.og = og
|
|
|
|
def blind_spots(self, threshold=0):
|
|
ys, xs = np.where(self.og.coverage <= threshold)
|
|
return list(zip(ys, xs))
|
|
|
|
# ==============================
|
|
# ENVIRONMENT & OBSTACLES
|
|
# ==============================
|
|
class Environment:
|
|
def __init__(self):
|
|
self.walls = [((0, 0), (3, 0)), ((3, 0), (3, 2)), ((3, 2), (0, 2)), ((0, 2), (0, 0)), ((1.2, 0.6), (2.4, 1.4))]
|
|
self.dynamic = []
|
|
|
|
def add_dynamic(self, obs):
|
|
self.dynamic.append(obs)
|
|
|
|
def raycast(self, sx, sy, angle, max_range):
|
|
best = None
|
|
dx, dy = math.cos(angle), math.sin(angle)
|
|
for (x1, y1), (x2, y2) in self.walls:
|
|
Lx, Ly = x2 - x1, y2 - y1
|
|
denom = dx * (-Ly) + dy * Lx
|
|
if abs(denom) < 1e-9:
|
|
continue
|
|
tx, ty = x1 - sx, y1 - sy
|
|
t = (tx * (-Ly) + ty * Lx) / denom
|
|
if t < 0 or t > max_range:
|
|
continue
|
|
u = ((sx + dx * t - x1) / Lx) if abs(Lx) > 1e-9 else ((sy + dy * t - y1) / Ly)
|
|
if 0 <= u <= 1 and (best is None or t < best):
|
|
best = t
|
|
d = best if best is not None else max_range
|
|
for obs in self.dynamic:
|
|
ox, oy = obs.position(t=0)
|
|
tval = (dx * (ox - sx) + dy * (oy - sy))
|
|
perp = abs(-dy * (ox - sx) + dx * (oy - sy))
|
|
if 0 <= tval <= d and perp < 0.05:
|
|
d = tval
|
|
return d
|
|
|
|
@staticmethod
|
|
def load_from_tiled(tmx_file):
|
|
try:
|
|
tmx = TiledMap(tmx_file)
|
|
walls = []
|
|
for layer in tmx.layers:
|
|
if hasattr(layer, 'objects'):
|
|
for obj in layer.objects:
|
|
if obj.points:
|
|
pts = [(obj.x + p[0], obj.y + p[1]) for p in obj.points]
|
|
walls += [((pts[i][0], pts[i][1]), (pts[i + 1][0], pts[i + 1][1])) for i in range(len(pts) - 1)]
|
|
elif obj.width and obj.height:
|
|
walls += [((obj.x, obj.y), (obj.x + obj.width, obj.y)),
|
|
((obj.x + obj.width, obj.y), (obj.x + obj.width, obj.y + obj.height)),
|
|
((obj.x + obj.width, obj.y + obj.height), (obj.x, obj.y + obj.height)),
|
|
((obj.x, obj.y + obj.height), (obj.x, obj.y))]
|
|
env = Environment()
|
|
env.walls = walls
|
|
return env
|
|
except Exception as e:
|
|
logger.error(f"Failed to load tiled map: {e}")
|
|
return ScenarioGenerator.random_walls()
|
|
|
|
class DynamicObstacle:
|
|
def __init__(self, path):
|
|
self.path = path
|
|
|
|
def position(self, t):
|
|
return self.path[int(t) % len(self.path)]
|
|
|
|
# ==============================
|
|
# SCENARIO & BENCHMARKING
|
|
# ==============================
|
|
class ScenarioGenerator:
|
|
@staticmethod
|
|
def random_walls(num=5, area=(3, 2)):
|
|
walls = []
|
|
for _ in range(num):
|
|
x1, y1 = random.random() * area[0], random.random() * area[1]
|
|
x2, y2 = random.random() * area[0], random.random() * area[1]
|
|
walls.append(((x1, y1), (x2, y2)))
|
|
env = Environment()
|
|
env.walls = walls
|
|
return env
|
|
|
|
# ==============================
|
|
# VISUALIZATION
|
|
# ==============================
|
|
class Visualizer:
|
|
def __init__(self):
|
|
self.figure = Figure()
|
|
self.canvas = FigureCanvas(self.figure)
|
|
self.ax = self.figure.add_subplot(111)
|
|
|
|
def plot_2d(self, og, pose=None, walls=None):
|
|
self.ax.clear()
|
|
prob = og.get_probability()
|
|
self.ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0,
|
|
extent=[0, og.width_m, 0, og.height_m])
|
|
self.ax.imshow(np.isnan(prob), origin='lower', cmap='gray_r', alpha=0.4,
|
|
extent=[0, og.width_m, 0, og.height_m])
|
|
self.ax.grid(True, which='both', linestyle='--', alpha=0.3)
|
|
self.ax.set_title('Occupancy Grid')
|
|
if walls:
|
|
for a, b in walls:
|
|
self.ax.plot([a[0], b[0]], [a[1], b[1]], 'b-', label='Walls' if a == walls[0] else None)
|
|
if pose:
|
|
self.ax.plot(pose[0], pose[1], 'ro', label='Robot')
|
|
self.ax.arrow(pose[0], pose[1], 0.1 * math.cos(pose[2]), 0.1 * math.sin(pose[2]),
|
|
color='r', head_width=0.05)
|
|
self.ax.legend()
|
|
self.canvas.draw()
|
|
|
|
# ==============================
|
|
# INTERACTIVE GUI LAYOUT EDITOR
|
|
# ==============================
|
|
class SensorLayoutEditor(QMainWindow):
|
|
def __init__(self, robot_config, env, slam, motion_model, odom_sim, sensor_manager):
|
|
super().__init__()
|
|
self.setWindowTitle('Sensor Layout Editor')
|
|
self.robot = robot_config
|
|
self.env = env
|
|
self.slam = slam
|
|
self.motion_model = motion_model
|
|
self.odom_sim = odom_sim
|
|
self.sensor_manager = sensor_manager
|
|
self.scale = 200
|
|
self.pose = [1.5, 1.0, 0.0]
|
|
self.t = 0
|
|
self.v_cmd = 0.0
|
|
self.ω_cmd = 0.0
|
|
self.pressed_keys = set()
|
|
self.visualizer = Visualizer()
|
|
|
|
self.central_widget = QWidget()
|
|
self.setCentralWidget(self.central_widget)
|
|
self.layout = QVBoxLayout(self.central_widget)
|
|
|
|
self.scene = QGraphicsScene()
|
|
self.view = QGraphicsView(self.scene)
|
|
self.layout.addWidget(self.view)
|
|
|
|
self.layout.addWidget(self.visualizer.canvas)
|
|
|
|
self.status_bar = QStatusBar()
|
|
self.setStatusBar(self.status_bar)
|
|
self.status_bar.showMessage("Simulation ready - Use Z/S for forward/back, Q/D for left/right, X to stop")
|
|
|
|
self._draw_robot()
|
|
self._draw_sensors()
|
|
self._setup_timer()
|
|
self.show()
|
|
|
|
def _draw_robot(self):
|
|
w, h = self.robot.width * self.scale, self.robot.length * self.scale
|
|
self.robot_item = self.scene.addRect(-h/2, -w/2, h, w, pen=QtGui.QPen(QtCore.Qt.black), brush=QtGui.QBrush(QtCore.Qt.lightGray))
|
|
x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale
|
|
self.robot_item.setPos(x, y)
|
|
self.robot_item.setRotation(-math.degrees(self.pose[2]))
|
|
|
|
def _draw_sensors(self):
|
|
self.items = []
|
|
for sRef in self.robot.sensors:
|
|
x, y = sRef.pos_x * self.scale, -sRef.pos_y * self.scale
|
|
item = QGraphicsEllipseItem(x - 5, y - 5, 10, 10)
|
|
item.setBrush(QtGui.QBrush(QtCore.Qt.red))
|
|
item.setFlag(QGraphicsEllipseItem.ItemIsMovable)
|
|
item.sensor = sRef
|
|
self.scene.addItem(item)
|
|
line = QGraphicsLineItem(x, y, x + math.cos(sRef.angle_rad) * 20, y - math.sin(sRef.angle_rad) * 20)
|
|
self.scene.addItem(line)
|
|
self.items.append((item, line))
|
|
|
|
def _setup_timer(self):
|
|
self.timer = QtCore.QTimer()
|
|
self.timer.timeout.connect(self.update_simulation)
|
|
self.timer.start(100) # 10 Hz
|
|
|
|
def update_simulation(self):
|
|
try:
|
|
v, ω = self.odom_sim.get_odometry(self.v_cmd, self.ω_cmd)
|
|
self.pose = self.motion_model.update_pose(self.pose, v, ω, dt=0.1)
|
|
self.slam.predict((v * 0.1, ω * 0.1))
|
|
self.slam.optimize()
|
|
|
|
schedule = self.sensor_manager.schedule(self.pose[2])
|
|
sensor_data = []
|
|
for slot in schedule:
|
|
meas_group = []
|
|
for idx in slot:
|
|
sRef = self.robot.sensors[idx]
|
|
sx = self.pose[0] + sRef.pos_x * math.cos(self.pose[2]) - sRef.pos_y * math.sin(self.pose[2])
|
|
sy = self.pose[1] + sRef.pos_x * math.sin(self.pose[2]) + sRef.pos_y * math.cos(self.pose[2])
|
|
ang = self.pose[2] + sRef.angle_rad
|
|
d = sRef.measure(self.env, sx, sy, ang)
|
|
meas_group.append((sx, sy, ang, d, sRef.fov_rad))
|
|
sensor_data += meas_group
|
|
|
|
self.slam.update(sensor_data)
|
|
self.slam.detect_loop_closure()
|
|
self.slam.optimize()
|
|
|
|
self.visualizer.plot_2d(self.slam.og, pose=self.pose, walls=self.env.walls)
|
|
ca = CoverageAnalyzer(self.slam.og)
|
|
bs = ca.blind_spots(0)
|
|
self.status_bar.showMessage(
|
|
f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells"
|
|
)
|
|
logger.info(f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells")
|
|
|
|
x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale
|
|
self.robot_item.setPos(x, y)
|
|
self.robot_item.setRotation(-math.degrees(self.pose[2]))
|
|
|
|
self.t += 1
|
|
except Exception as e:
|
|
logger.error(f"Simulation update failed: {e}")
|
|
self.status_bar.showMessage(f"Error: {e}")
|
|
self.timer.stop()
|
|
|
|
def closeEvent(self, event):
|
|
try:
|
|
# Save map as pickle
|
|
with open("map.pkl", "wb") as f:
|
|
pickle.dump(self.slam.og, f)
|
|
logger.info("Map saved as map.pkl")
|
|
|
|
# Save raw map as PNG (without obstacles)
|
|
prob = self.slam.og.get_probability()
|
|
fig, ax = matplotlib.pyplot.subplots()
|
|
ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0,
|
|
extent=[0, self.slam.og.width_m, 0, self.slam.og.height_m])
|
|
ax.axis('off')
|
|
fig.savefig("map.png", bbox_inches='tight', pad_inches=0)
|
|
logger.info("Map image saved as map.png")
|
|
matplotlib.pyplot.close(fig)
|
|
except Exception as e:
|
|
logger.error(f"Error during map saving: {e}")
|
|
|
|
def keyPressEvent(self, event):
|
|
key = event.key()
|
|
if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]:
|
|
self.pressed_keys.add(key)
|
|
self._update_velocities()
|
|
if key == QtCore.Qt.Key_X:
|
|
self.pressed_keys.clear()
|
|
self.v_cmd = 0.0
|
|
self.ω_cmd = 0.0
|
|
self.status_bar.showMessage(f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}")
|
|
elif key == QtCore.Qt.Key_Space:
|
|
if self.timer.isActive():
|
|
self.timer.stop()
|
|
self.status_bar.showMessage("Simulation paused")
|
|
else:
|
|
self.timer.start(100)
|
|
self.status_bar.showMessage(f"Simulation resumed: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}")
|
|
elif key == QtCore.Qt.Key_Escape:
|
|
fname, _ = QFileDialog.getSaveFileName(self, 'Save Layout', '.', 'Pickle Files (*.pkl)')
|
|
if fname:
|
|
with open(fname, 'wb') as f:
|
|
pickle.dump(self.robot, f)
|
|
self.status_bar.showMessage(f"Layout saved to {fname}")
|
|
super().keyPressEvent(event)
|
|
|
|
def keyReleaseEvent(self, event):
|
|
key = event.key()
|
|
if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]:
|
|
self.pressed_keys.discard(key)
|
|
self._update_velocities()
|
|
super().keyReleaseEvent(event)
|
|
|
|
def _update_velocities(self):
|
|
self.v_cmd = 0.0
|
|
self.ω_cmd = 0.0
|
|
if QtCore.Qt.Key_Z in self.pressed_keys:
|
|
self.v_cmd = 0.2
|
|
elif QtCore.Qt.Key_S in self.pressed_keys:
|
|
self.v_cmd = -0.2
|
|
if QtCore.Qt.Key_Q in self.pressed_keys:
|
|
self.ω_cmd = 0.5
|
|
elif QtCore.Qt.Key_D in self.pressed_keys:
|
|
self.ω_cmd = -0.5
|
|
self.status_bar.showMessage(
|
|
f"Moving: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}"
|
|
if self.v_cmd != 0 or self.ω_cmd != 0 else
|
|
f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}"
|
|
)
|
|
|
|
# ==============================
|
|
# ROBOT CONFIGURATION
|
|
# ==============================
|
|
class RobotConfig:
|
|
def __init__(self, width=0.3, length=0.5, height=0.2):
|
|
self.width = width
|
|
self.length = length
|
|
self.height = height
|
|
self.sensors = []
|
|
|
|
def add_sensor(self, sensor):
|
|
self.sensors.append(sensor)
|
|
|
|
# ==============================
|
|
# MAIN
|
|
# ==============================
|
|
def main(gui=True, tiled_map=None):
|
|
robot = RobotConfig(width=0.24, length=0.16, height=0.2)
|
|
|
|
positions = [
|
|
(0.08, 0.11), # Front-left
|
|
(0.08, -0.11), # Front-right
|
|
(0.06, 0.12), # Left-front
|
|
(-0.06, 0.12), # Left-back
|
|
(0.06, -0.12), # Right-front
|
|
(-0.06, -0.12) # Right-back
|
|
]
|
|
|
|
angles = [
|
|
np.pi/4, # Front-left: 45° left
|
|
-np.pi/4, # Front-right: 45° right
|
|
np.pi/2, # Left-front: 90° left
|
|
np.pi/2, # Left-back: 90° left
|
|
-np.pi/2, # Right-front: 90° right
|
|
-np.pi/2 # Right-back: 90° right
|
|
]
|
|
|
|
for i, (pos_x, pos_y) in enumerate(positions):
|
|
angle_rad = angles[i]
|
|
# Add ultrasound sensor with realistic parameters
|
|
robot.add_sensor(Sensor(
|
|
name=f"Ultrasound{i+1}",
|
|
sensor_type="ULTRASOUND",
|
|
pos_x=pos_x,
|
|
pos_y=pos_y,
|
|
angle_rad=angle_rad,
|
|
min_range=0.2,
|
|
max_range=2.5, # Reduced from 4.0 m
|
|
fov_rad=np.radians(25), # Reduced from 30°
|
|
noise_std=0.005 # Base noise, scaled in measure()
|
|
))
|
|
# Add IR sensor with realistic parameters
|
|
robot.add_sensor(Sensor(
|
|
name=f"IR{i+1}",
|
|
sensor_type="IR",
|
|
pos_x=pos_x,
|
|
pos_y=pos_y,
|
|
angle_rad=angle_rad,
|
|
min_range=0.15, # Increased from 0.1 m
|
|
max_range=0.8, # Reduced from 1.5 m
|
|
fov_rad=np.radians(5), # Reduced from 15°
|
|
noise_std=0.01 # Base noise, overridden by 5% distance
|
|
))
|
|
|
|
env = Environment.load_from_tiled(tmx_file=tiled_map) if tiled_map else ScenarioGenerator.random_walls()
|
|
|
|
motion_model = MotionModel()
|
|
odom_sim = OdometrySimulator()
|
|
slam = SLAMEngine()
|
|
sensor_manager = SensorManager(robot.sensors)
|
|
|
|
if gui:
|
|
app = QApplication(sys.argv)
|
|
editor = SensorLayoutEditor(robot, env, slam, motion_model, odom_sim, sensor_manager)
|
|
sys.exit(app.exec_())
|
|
else:
|
|
pose = [1.5, 1.0, 0.0]
|
|
visualizer = Visualizer()
|
|
commands = [(0.2, 0.0), (0.2, 0.1), (0.1, -0.05)]
|
|
step, t = 0, 0
|
|
start_time = time.time()
|
|
while step < len(commands):
|
|
current_time = time.time()
|
|
if current_time - start_time >= 0.1:
|
|
try:
|
|
v_cmd, om_cmd = commands[step]
|
|
v, om = odom_sim.get_odometry(v_cmd, om_cmd)
|
|
pose = motion_model.update_pose(pose, v, om, dt=0.1)
|
|
slam.predict((v * 0.1, om * 0.1))
|
|
|
|
schedule = sensor_manager.schedule(pose[2])
|
|
sensor_data = []
|
|
for slot in schedule:
|
|
meas_group = []
|
|
for idx in slot:
|
|
s = robot.sensors[idx]
|
|
sx = pose[0] + s.pos_x * math.cos(pose[2]) - s.pos_y * math.sin(pose[2])
|
|
sy = pose[1] + s.pos_x * math.sin(pose[2]) + s.pos_y * math.cos(pose[2])
|
|
ang = pose[2] + s.angle_rad
|
|
d = s.measure(env, sx, sy, ang)
|
|
meas_group.append((sx, sy, ang, d, s.fov_rad))
|
|
sensor_data += meas_group
|
|
|
|
slam.update(sensor_data)
|
|
slam.detect_loop_closure()
|
|
slam.optimize()
|
|
|
|
visualizer.plot_2d(slam.og, pose=pose, walls=env.walls)
|
|
ca = CoverageAnalyzer(slam.og)
|
|
bs = ca.blind_spots(0)
|
|
logger.info(f"Step {step}, t={t}: Blind spots: {len(bs)} cells")
|
|
|
|
t += 1
|
|
if t >= 10:
|
|
t = 0
|
|
step += 1
|
|
start_time = current_time
|
|
except Exception as e:
|
|
logger.error(f"Non-GUI simulation failed: {e}")
|
|
break
|
|
|
|
if __name__ == '__main__':
|
|
main(gui=True) |