import numpy as np import math import pickle import random from collections import defaultdict import gtsam from gtsam import Pose2, BetweenFactorPose2, NonlinearFactorGraph, Values from gtsam import ISAM2, ISAM2Params from gtsam.noiseModel import Diagonal from PyQt5 import QtWidgets, QtGui, QtCore from PyQt5.QtWidgets import QApplication, QMainWindow, QGraphicsScene, QGraphicsView, QGraphicsEllipseItem, QGraphicsLineItem, QFileDialog, QVBoxLayout, QWidget, QStatusBar from pytmx import TiledMap import sys import time import logging import matplotlib matplotlib.use('Qt5Agg') from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas from matplotlib.figure import Figure # ============================== # LOGGING SETUP # ============================== logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s') logger = logging.getLogger(__name__) # ============================== # NOISE MODELS # ============================== class NoiseModel: def sample(self): raise NotImplementedError("Subclasses must implement sample()") class DriftNoiseModel(NoiseModel): def __init__(self, drift_std=1e-4): self.drift_std = drift_std self.bias = 0.0 def sample(self): self.bias += np.random.normal(0, self.drift_std) return self.bias class TemperatureNoiseModel(NoiseModel): def __init__(self, coeff=0.001): self.coeff = coeff self.temperature = 20.0 # Default temperature in °C def set_temperature(self, temp): self.temperature = temp def sample(self): return (self.temperature - 20.0) * self.coeff # ============================== # SENSOR MODEL # ============================== class Sensor: def __init__(self, name, sensor_type, pos_x, pos_y, angle_rad, min_range, max_range, fov_rad, noise_std=0.01): self.name = name self.sensor_type = sensor_type.upper() self.pos_x = pos_x self.pos_y = pos_y self.angle_rad = angle_rad self.min_range = min_range self.max_range = max_range self.fov_rad = fov_rad self.drift_model = DriftNoiseModel(drift_std=noise_std * 1e-2) self.temp_model = TemperatureNoiseModel(coeff=noise_std * 1e-2) self.noise_std = noise_std # Set number of rays based on sensor type for multi-ray casting if self.sensor_type == "ULTRASOUND": self.num_rays = 7 elif self.sensor_type == "IR": self.num_rays = 3 else: self.num_rays = 1 def measure(self, env, sx, sy, global_angle_rad): try: # Multi-ray casting to find the closest obstacle within FOV if self.num_rays == 1: true_dist = env.raycast(sx, sy, global_angle_rad, self.max_range) if true_dist is None: true_dist = self.max_range else: half_fov = self.fov_rad / 2 start_angle = global_angle_rad - half_fov end_angle = global_angle_rad + half_fov ray_angles = np.linspace(start_angle, end_angle, self.num_rays) distances = [env.raycast(sx, sy, angle, self.max_range) for angle in ray_angles] distances = [d if d is not None else self.max_range for d in distances] true_dist = min(distances) # Distance-dependent noise based on sensor type if self.sensor_type == "ULTRASOUND": noise_std = self.noise_std * (1 + (true_dist / self.max_range) ** 2) elif self.sensor_type == "IR": noise_std = 0.05 * true_dist # 5% of distance as per typical IR specs else: noise_std = self.noise_std # Temperature scaling for ultrasound (speed of sound effect) if self.sensor_type == "ULTRASOUND": c_true = 331 + 0.6 * self.temp_model.temperature # m/s scale_factor = 343 / c_true # 343 m/s is assumed speed at 20°C d_scaled = true_dist * scale_factor else: d_scaled = true_dist # Apply drift and noise drift = self.drift_model.sample() noise = np.random.normal(0, noise_std) measured = d_scaled + drift + noise # Drop-out probability for ultrasound (2% chance of returning max_range) if self.sensor_type == "ULTRASOUND" and np.random.random() < 0.02: measured = self.max_range # Clamp measurement to valid range return max(self.min_range, min(self.max_range, measured)) except Exception as e: logger.error(f"Error in sensor measurement: {e}") return self.max_range # ============================== # SENSOR MANAGEMENT # ============================== class DynamicCrosstalkManager: def __init__(self, sensors, interference_dist=0.10, angle_overlap_rad=0.1745): # Updated interference_dist self.sensors = sensors self.N = len(sensors) self.interference_dist = interference_dist self.angle_overlap_rad = angle_overlap_rad self.conflict = np.zeros((self.N, self.N), dtype=bool) self._build_conflict_matrix() self.slots = self._build_time_slots() def _angle_diff(self, a_rad, b_rad, fov_a, fov_b): d = abs((b_rad - a_rad + np.pi) % (2 * np.pi) - np.pi) return d < (self.angle_overlap_rad + fov_a + fov_b) def _build_conflict_matrix(self): for i in range(self.N): for j in range(i + 1, self.N): si, sj = self.sensors[i], self.sensors[j] dist = math.hypot(si.pos_x - sj.pos_x, si.pos_y - sj.pos_y) if dist < self.interference_dist and self._angle_diff(si.angle_rad, sj.angle_rad, si.fov_rad, sj.fov_rad): self.conflict[i, j] = self.conflict[j, i] = True def _build_time_slots(self): slots = [] for i in range(self.N): placed = False for slot in slots: if all(not self.conflict[i, j] for j in slot): slot.append(i) placed = True break if not placed: slots.append([i]) return slots def firing_order(self): return self.slots class SensorManager: def __init__(self, sensors): self.sensors = sensors def cluster_sensors(self, eps=0.1, min_samples=1): from sklearn.cluster import DBSCAN feats = np.array([[s.pos_x, s.pos_y, s.angle_rad] for s in self.sensors]) labels = DBSCAN(eps=eps, min_samples=min_samples).fit_predict(feats) clusters = defaultdict(list) for i, l in enumerate(labels): clusters[l].append(i) return list(clusters.values()) def schedule(self, heading_rad): dtcm = DynamicCrosstalkManager(self.sensors, interference_dist=0.10, angle_overlap_rad=0.0873) slots = dtcm.firing_order() fwd, bwd = [], [] for slot in slots: if any(abs(((self.sensors[i].angle_rad - heading_rad + np.pi) % (2 * np.pi)) - np.pi) > np.pi/2 for i in slot): bwd.append(slot) else: fwd.append(slot) return fwd + bwd # ============================== # MOTION & ODOMETRY # ============================== class MotionModel: def __init__(self, wheel_base=0.1): self.wheel_base = wheel_base def update_pose(self, pose, v, omega, dt): x, y, theta = pose if abs(omega) < 1e-6: return (x + v * math.cos(theta) * dt, y + v * math.sin(theta) * dt, theta) R = v / omega dth = omega * dt return (x + R * (math.sin(theta + dth) - math.sin(theta)), y - R * (math.cos(theta + dth) - math.cos(theta)), (theta + dth) % (2 * np.pi)) class OdometrySimulator: def __init__(self, noise_std_v=0.01, noise_std_omega=0.005): self.noise_std_v = noise_std_v self.noise_std_omega = noise_std_omega def get_odometry(self, v_cmd, omega_cmd): return (v_cmd + np.random.normal(0, self.noise_std_v), omega_cmd + np.random.normal(0, self.noise_std_omega)) # ============================== # SLAM ENGINE # ============================== class SLAMEngine: def __init__(self, grid_resolution=0.02): self.og = OccupancyGrid(resolution=grid_resolution) self.isam = ISAM2(ISAM2Params()) self.initial = Values() self.noise = Diagonal.Sigmas([1e-2, 1e-2, 1e-3]) self.isam_index = 1 # Start at 1 since x0 is initialized self.graph = NonlinearFactorGraph() self.initial.insert(gtsam.symbol('x', 0), Pose2(0, 0, 0)) self.graph.add(gtsam.PriorFactorPose2(gtsam.symbol('x', 0), Pose2(0, 0, 0), self.noise)) self.isam.update(self.graph, self.initial) self.graph = NonlinearFactorGraph() self.initial = Values() def predict(self, odom): try: key0 = gtsam.symbol('x', self.isam_index - 1) key1 = gtsam.symbol('x', self.isam_index) prev = self.isam.calculateEstimate().atPose2(key0) dx, dtheta = odom incr = Pose2(dx, 0.0, dtheta) self.initial.insert(key1, prev.compose(incr)) self.graph.add(BetweenFactorPose2(key0, key1, incr, self.noise)) self.isam.update(self.graph, self.initial) self.graph = NonlinearFactorGraph() self.initial = Values() self.isam_index += 1 except Exception as e: logger.error(f"Error in SLAM prediction: {e}") def update(self, sensor_data): try: pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1)) for meas in sensor_data: self.og.cone_update(*meas) except Exception as e: logger.error(f"Error in SLAM update: {e}") def detect_loop_closure(self, threshold=0.2): if self.isam_index < 10: return try: curr_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1)) curr_t = curr_pose.translation() for i in range(max(0, self.isam_index - 50), self.isam_index - 5): prev_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', i)) prev_t = prev_pose.translation() dist = np.linalg.norm(curr_t - prev_t) if dist < threshold: rel = prev_pose.inverse().compose(curr_pose) self.graph.add(BetweenFactorPose2(gtsam.symbol('x', i), gtsam.symbol('x', self.isam_index - 1), rel, self.noise)) logger.info(f"Loop closure detected between poses x{i} and x{self.isam_index - 1}") break except Exception as e: logger.error(f"Error in loop closure detection: {e}") def optimize(self): try: self.isam.update(self.graph, self.initial) self.initial = Values() self.graph = NonlinearFactorGraph() except RuntimeError as e: logger.error(f"Optimization failed: {e}") # ============================== # OCCUPANCY GRID & ANALYSIS # ============================== class OccupancyGrid: def __init__(self, width_m=3.0, height_m=2.0, resolution=0.02): self.width_m = width_m self.height_m = height_m self.resolution = resolution self.cells_x = int(math.ceil(width_m / resolution)) self.cells_y = int(math.ceil(height_m / resolution)) self.grid = np.full((self.cells_y, self.cells_x), np.nan, dtype=float) self.coverage = np.zeros_like(self.grid, dtype=int) def world_to_map(self, wx, wy): c = int(wx / self.resolution) r = int(wy / self.resolution) return (r, c) if 0 <= r < self.cells_y and 0 <= c < self.cells_x else None def update_cell_logodds(self, row, col, prob): p = max(min(prob, 0.99), 0.01) if np.isnan(self.grid[row, col]): self.grid[row, col] = 0.0 self.grid[row, col] += math.log(p / (1 - p)) self.grid[row, col] = np.clip(self.grid[row, col], -20, 20) # Clipping to prevent overflow self.coverage[row, col] += 1 def cone_update(self, sx, sy, center_angle_rad, dist, fov_rad, sub_rays=7, occ_prob=0.7, free_prob=0.3): half = fov_rad / 2 step = (2 * half) / (sub_rays - 1) if sub_rays > 1 else 0 start = center_angle_rad - half for i in range(sub_rays): a = start + i * step dx, dy = math.cos(a), math.sin(a) steps = max(1, int(dist / (self.resolution * 1.5))) for s in range(steps): frac = s / steps wx, wy = sx + frac * dist * dx, sy + frac * dist * dy m = self.world_to_map(wx, wy) if not m: break self.update_cell_logodds(m[0], m[1], free_prob) fin = self.world_to_map(sx + dist * dx, sy + dist * dy) if fin: self.update_cell_logodds(fin[0], fin[1], occ_prob) def get_probability(self): valid = ~np.isnan(self.grid) prob = np.full_like(self.grid, np.nan) prob[valid] = np.exp(self.grid[valid]) / (1 + np.exp(self.grid[valid])) return prob def save(self, fn): with open(fn, 'wb') as f: pickle.dump(self, f) @staticmethod def load(fn): with open(fn, 'rb') as f: return pickle.load(f) class CoverageAnalyzer: def __init__(self, og): self.og = og def blind_spots(self, threshold=0): ys, xs = np.where(self.og.coverage <= threshold) return list(zip(ys, xs)) # ============================== # ENVIRONMENT & OBSTACLES # ============================== class Environment: def __init__(self): self.walls = [((0, 0), (3, 0)), ((3, 0), (3, 2)), ((3, 2), (0, 2)), ((0, 2), (0, 0)), ((1.2, 0.6), (2.4, 1.4))] self.dynamic = [] def add_dynamic(self, obs): self.dynamic.append(obs) def raycast(self, sx, sy, angle, max_range): best = None dx, dy = math.cos(angle), math.sin(angle) for (x1, y1), (x2, y2) in self.walls: Lx, Ly = x2 - x1, y2 - y1 denom = dx * (-Ly) + dy * Lx if abs(denom) < 1e-9: continue tx, ty = x1 - sx, y1 - sy t = (tx * (-Ly) + ty * Lx) / denom if t < 0 or t > max_range: continue u = ((sx + dx * t - x1) / Lx) if abs(Lx) > 1e-9 else ((sy + dy * t - y1) / Ly) if 0 <= u <= 1 and (best is None or t < best): best = t d = best if best is not None else max_range for obs in self.dynamic: ox, oy = obs.position(t=0) tval = (dx * (ox - sx) + dy * (oy - sy)) perp = abs(-dy * (ox - sx) + dx * (oy - sy)) if 0 <= tval <= d and perp < 0.05: d = tval return d @staticmethod def load_from_tiled(tmx_file): try: tmx = TiledMap(tmx_file) walls = [] for layer in tmx.layers: if hasattr(layer, 'objects'): for obj in layer.objects: if obj.points: pts = [(obj.x + p[0], obj.y + p[1]) for p in obj.points] walls += [((pts[i][0], pts[i][1]), (pts[i + 1][0], pts[i + 1][1])) for i in range(len(pts) - 1)] elif obj.width and obj.height: walls += [((obj.x, obj.y), (obj.x + obj.width, obj.y)), ((obj.x + obj.width, obj.y), (obj.x + obj.width, obj.y + obj.height)), ((obj.x + obj.width, obj.y + obj.height), (obj.x, obj.y + obj.height)), ((obj.x, obj.y + obj.height), (obj.x, obj.y))] env = Environment() env.walls = walls return env except Exception as e: logger.error(f"Failed to load tiled map: {e}") return ScenarioGenerator.random_walls() class DynamicObstacle: def __init__(self, path): self.path = path def position(self, t): return self.path[int(t) % len(self.path)] # ============================== # SCENARIO & BENCHMARKING # ============================== class ScenarioGenerator: @staticmethod def random_walls(num=5, area=(3, 2)): walls = [] for _ in range(num): x1, y1 = random.random() * area[0], random.random() * area[1] x2, y2 = random.random() * area[0], random.random() * area[1] walls.append(((x1, y1), (x2, y2))) env = Environment() env.walls = walls return env # ============================== # VISUALIZATION # ============================== class Visualizer: def __init__(self): self.figure = Figure() self.canvas = FigureCanvas(self.figure) self.ax = self.figure.add_subplot(111) def plot_2d(self, og, pose=None, walls=None): self.ax.clear() prob = og.get_probability() self.ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0, extent=[0, og.width_m, 0, og.height_m]) self.ax.imshow(np.isnan(prob), origin='lower', cmap='gray_r', alpha=0.4, extent=[0, og.width_m, 0, og.height_m]) self.ax.grid(True, which='both', linestyle='--', alpha=0.3) self.ax.set_title('Occupancy Grid') if walls: for a, b in walls: self.ax.plot([a[0], b[0]], [a[1], b[1]], 'b-', label='Walls' if a == walls[0] else None) if pose: self.ax.plot(pose[0], pose[1], 'ro', label='Robot') self.ax.arrow(pose[0], pose[1], 0.1 * math.cos(pose[2]), 0.1 * math.sin(pose[2]), color='r', head_width=0.05) self.ax.legend() self.canvas.draw() # ============================== # INTERACTIVE GUI LAYOUT EDITOR # ============================== class SensorLayoutEditor(QMainWindow): def __init__(self, robot_config, env, slam, motion_model, odom_sim, sensor_manager): super().__init__() self.setWindowTitle('Sensor Layout Editor') self.robot = robot_config self.env = env self.slam = slam self.motion_model = motion_model self.odom_sim = odom_sim self.sensor_manager = sensor_manager self.scale = 200 self.pose = [1.5, 1.0, 0.0] self.t = 0 self.v_cmd = 0.0 self.ω_cmd = 0.0 self.pressed_keys = set() self.visualizer = Visualizer() self.central_widget = QWidget() self.setCentralWidget(self.central_widget) self.layout = QVBoxLayout(self.central_widget) self.scene = QGraphicsScene() self.view = QGraphicsView(self.scene) self.layout.addWidget(self.view) self.layout.addWidget(self.visualizer.canvas) self.status_bar = QStatusBar() self.setStatusBar(self.status_bar) self.status_bar.showMessage("Simulation ready - Use Z/S for forward/back, Q/D for left/right, X to stop") self._draw_robot() self._draw_sensors() self._setup_timer() self.show() def _draw_robot(self): w, h = self.robot.width * self.scale, self.robot.length * self.scale self.robot_item = self.scene.addRect(-h/2, -w/2, h, w, pen=QtGui.QPen(QtCore.Qt.black), brush=QtGui.QBrush(QtCore.Qt.lightGray)) x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale self.robot_item.setPos(x, y) self.robot_item.setRotation(-math.degrees(self.pose[2])) def _draw_sensors(self): self.items = [] for sRef in self.robot.sensors: x, y = sRef.pos_x * self.scale, -sRef.pos_y * self.scale item = QGraphicsEllipseItem(x - 5, y - 5, 10, 10) item.setBrush(QtGui.QBrush(QtCore.Qt.red)) item.setFlag(QGraphicsEllipseItem.ItemIsMovable) item.sensor = sRef self.scene.addItem(item) line = QGraphicsLineItem(x, y, x + math.cos(sRef.angle_rad) * 20, y - math.sin(sRef.angle_rad) * 20) self.scene.addItem(line) self.items.append((item, line)) def _setup_timer(self): self.timer = QtCore.QTimer() self.timer.timeout.connect(self.update_simulation) self.timer.start(100) # 10 Hz def update_simulation(self): try: v, ω = self.odom_sim.get_odometry(self.v_cmd, self.ω_cmd) self.pose = self.motion_model.update_pose(self.pose, v, ω, dt=0.1) self.slam.predict((v * 0.1, ω * 0.1)) self.slam.optimize() schedule = self.sensor_manager.schedule(self.pose[2]) sensor_data = [] for slot in schedule: meas_group = [] for idx in slot: sRef = self.robot.sensors[idx] sx = self.pose[0] + sRef.pos_x * math.cos(self.pose[2]) - sRef.pos_y * math.sin(self.pose[2]) sy = self.pose[1] + sRef.pos_x * math.sin(self.pose[2]) + sRef.pos_y * math.cos(self.pose[2]) ang = self.pose[2] + sRef.angle_rad d = sRef.measure(self.env, sx, sy, ang) meas_group.append((sx, sy, ang, d, sRef.fov_rad)) sensor_data += meas_group self.slam.update(sensor_data) self.slam.detect_loop_closure() self.slam.optimize() self.visualizer.plot_2d(self.slam.og, pose=self.pose, walls=self.env.walls) ca = CoverageAnalyzer(self.slam.og) bs = ca.blind_spots(0) self.status_bar.showMessage( f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells" ) logger.info(f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells") x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale self.robot_item.setPos(x, y) self.robot_item.setRotation(-math.degrees(self.pose[2])) self.t += 1 except Exception as e: logger.error(f"Simulation update failed: {e}") self.status_bar.showMessage(f"Error: {e}") self.timer.stop() def closeEvent(self, event): try: # Save map as pickle with open("map.pkl", "wb") as f: pickle.dump(self.slam.og, f) logger.info("Map saved as map.pkl") # Save raw map as PNG (without obstacles) prob = self.slam.og.get_probability() fig, ax = matplotlib.pyplot.subplots() ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0, extent=[0, self.slam.og.width_m, 0, self.slam.og.height_m]) ax.axis('off') fig.savefig("map.png", bbox_inches='tight', pad_inches=0) logger.info("Map image saved as map.png") matplotlib.pyplot.close(fig) except Exception as e: logger.error(f"Error during map saving: {e}") def keyPressEvent(self, event): key = event.key() if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]: self.pressed_keys.add(key) self._update_velocities() if key == QtCore.Qt.Key_X: self.pressed_keys.clear() self.v_cmd = 0.0 self.ω_cmd = 0.0 self.status_bar.showMessage(f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}") elif key == QtCore.Qt.Key_Space: if self.timer.isActive(): self.timer.stop() self.status_bar.showMessage("Simulation paused") else: self.timer.start(100) self.status_bar.showMessage(f"Simulation resumed: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}") elif key == QtCore.Qt.Key_Escape: fname, _ = QFileDialog.getSaveFileName(self, 'Save Layout', '.', 'Pickle Files (*.pkl)') if fname: with open(fname, 'wb') as f: pickle.dump(self.robot, f) self.status_bar.showMessage(f"Layout saved to {fname}") super().keyPressEvent(event) def keyReleaseEvent(self, event): key = event.key() if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]: self.pressed_keys.discard(key) self._update_velocities() super().keyReleaseEvent(event) def _update_velocities(self): self.v_cmd = 0.0 self.ω_cmd = 0.0 if QtCore.Qt.Key_Z in self.pressed_keys: self.v_cmd = 0.2 elif QtCore.Qt.Key_S in self.pressed_keys: self.v_cmd = -0.2 if QtCore.Qt.Key_Q in self.pressed_keys: self.ω_cmd = 0.5 elif QtCore.Qt.Key_D in self.pressed_keys: self.ω_cmd = -0.5 self.status_bar.showMessage( f"Moving: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}" if self.v_cmd != 0 or self.ω_cmd != 0 else f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}" ) # ============================== # ROBOT CONFIGURATION # ============================== class RobotConfig: def __init__(self, width=0.3, length=0.5, height=0.2): self.width = width self.length = length self.height = height self.sensors = [] def add_sensor(self, sensor): self.sensors.append(sensor) # ============================== # MAIN # ============================== def main(gui=True, tiled_map=None): robot = RobotConfig(width=0.24, length=0.16, height=0.2) positions = [ (0.08, 0.11), # Front-left (0.08, -0.11), # Front-right (0.06, 0.12), # Left-front (-0.06, 0.12), # Left-back (0.06, -0.12), # Right-front (-0.06, -0.12) # Right-back ] angles = [ np.pi/4, # Front-left: 45° left -np.pi/4, # Front-right: 45° right np.pi/2, # Left-front: 90° left np.pi/2, # Left-back: 90° left -np.pi/2, # Right-front: 90° right -np.pi/2 # Right-back: 90° right ] for i, (pos_x, pos_y) in enumerate(positions): angle_rad = angles[i] # Add ultrasound sensor with realistic parameters robot.add_sensor(Sensor( name=f"Ultrasound{i+1}", sensor_type="ULTRASOUND", pos_x=pos_x, pos_y=pos_y, angle_rad=angle_rad, min_range=0.2, max_range=2.5, # Reduced from 4.0 m fov_rad=np.radians(25), # Reduced from 30° noise_std=0.005 # Base noise, scaled in measure() )) # Add IR sensor with realistic parameters robot.add_sensor(Sensor( name=f"IR{i+1}", sensor_type="IR", pos_x=pos_x, pos_y=pos_y, angle_rad=angle_rad, min_range=0.15, # Increased from 0.1 m max_range=0.8, # Reduced from 1.5 m fov_rad=np.radians(5), # Reduced from 15° noise_std=0.01 # Base noise, overridden by 5% distance )) env = Environment.load_from_tiled(tmx_file=tiled_map) if tiled_map else ScenarioGenerator.random_walls() motion_model = MotionModel() odom_sim = OdometrySimulator() slam = SLAMEngine() sensor_manager = SensorManager(robot.sensors) if gui: app = QApplication(sys.argv) editor = SensorLayoutEditor(robot, env, slam, motion_model, odom_sim, sensor_manager) sys.exit(app.exec_()) else: pose = [1.5, 1.0, 0.0] visualizer = Visualizer() commands = [(0.2, 0.0), (0.2, 0.1), (0.1, -0.05)] step, t = 0, 0 start_time = time.time() while step < len(commands): current_time = time.time() if current_time - start_time >= 0.1: try: v_cmd, om_cmd = commands[step] v, om = odom_sim.get_odometry(v_cmd, om_cmd) pose = motion_model.update_pose(pose, v, om, dt=0.1) slam.predict((v * 0.1, om * 0.1)) schedule = sensor_manager.schedule(pose[2]) sensor_data = [] for slot in schedule: meas_group = [] for idx in slot: s = robot.sensors[idx] sx = pose[0] + s.pos_x * math.cos(pose[2]) - s.pos_y * math.sin(pose[2]) sy = pose[1] + s.pos_x * math.sin(pose[2]) + s.pos_y * math.cos(pose[2]) ang = pose[2] + s.angle_rad d = s.measure(env, sx, sy, ang) meas_group.append((sx, sy, ang, d, s.fov_rad)) sensor_data += meas_group slam.update(sensor_data) slam.detect_loop_closure() slam.optimize() visualizer.plot_2d(slam.og, pose=pose, walls=env.walls) ca = CoverageAnalyzer(slam.og) bs = ca.blind_spots(0) logger.info(f"Step {step}, t={t}: Blind spots: {len(bs)} cells") t += 1 if t >= 10: t = 0 step += 1 start_time = current_time except Exception as e: logger.error(f"Non-GUI simulation failed: {e}") break if __name__ == '__main__': main(gui=True)