EU_Robot_Group-D/aa.py

763 lines
30 KiB
Python

import numpy as np
import math
import pickle
import random
from collections import defaultdict
import gtsam
from gtsam import Pose2, BetweenFactorPose2, NonlinearFactorGraph, Values
from gtsam import ISAM2, ISAM2Params
from gtsam.noiseModel import Diagonal
from PyQt5 import QtWidgets, QtGui, QtCore
from PyQt5.QtWidgets import QApplication, QMainWindow, QGraphicsScene, QGraphicsView, QGraphicsEllipseItem, QGraphicsLineItem, QFileDialog, QVBoxLayout, QWidget, QStatusBar
from pytmx import TiledMap
import sys
import time
import logging
import matplotlib
matplotlib.use('Qt5Agg')
from matplotlib.backends.backend_qt5agg import FigureCanvasQTAgg as FigureCanvas
from matplotlib.figure import Figure
# ==============================
# LOGGING SETUP
# ==============================
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# ==============================
# NOISE MODELS
# ==============================
class NoiseModel:
def sample(self):
raise NotImplementedError("Subclasses must implement sample()")
class DriftNoiseModel(NoiseModel):
def __init__(self, drift_std=1e-4):
self.drift_std = drift_std
self.bias = 0.0
def sample(self):
self.bias += np.random.normal(0, self.drift_std)
return self.bias
class TemperatureNoiseModel(NoiseModel):
def __init__(self, coeff=0.001):
self.coeff = coeff
self.temperature = 20.0 # Default temperature in °C
def set_temperature(self, temp):
self.temperature = temp
def sample(self):
return (self.temperature - 20.0) * self.coeff
# ==============================
# SENSOR MODEL
# ==============================
class Sensor:
def __init__(self, name, sensor_type, pos_x, pos_y, angle_rad,
min_range, max_range, fov_rad, noise_std=0.01):
self.name = name
self.sensor_type = sensor_type.upper()
self.pos_x = pos_x
self.pos_y = pos_y
self.angle_rad = angle_rad
self.min_range = min_range
self.max_range = max_range
self.fov_rad = fov_rad
self.drift_model = DriftNoiseModel(drift_std=noise_std * 1e-2)
self.temp_model = TemperatureNoiseModel(coeff=noise_std * 1e-2)
self.noise_std = noise_std
# Set number of rays based on sensor type for multi-ray casting
if self.sensor_type == "ULTRASOUND":
self.num_rays = 7
elif self.sensor_type == "IR":
self.num_rays = 3
else:
self.num_rays = 1
def measure(self, env, sx, sy, global_angle_rad):
try:
# Multi-ray casting to find the closest obstacle within FOV
if self.num_rays == 1:
true_dist = env.raycast(sx, sy, global_angle_rad, self.max_range)
if true_dist is None:
true_dist = self.max_range
else:
half_fov = self.fov_rad / 2
start_angle = global_angle_rad - half_fov
end_angle = global_angle_rad + half_fov
ray_angles = np.linspace(start_angle, end_angle, self.num_rays)
distances = [env.raycast(sx, sy, angle, self.max_range) for angle in ray_angles]
distances = [d if d is not None else self.max_range for d in distances]
true_dist = min(distances)
# Distance-dependent noise based on sensor type
if self.sensor_type == "ULTRASOUND":
noise_std = self.noise_std * (1 + (true_dist / self.max_range) ** 2)
elif self.sensor_type == "IR":
noise_std = 0.05 * true_dist # 5% of distance as per typical IR specs
else:
noise_std = self.noise_std
# Temperature scaling for ultrasound (speed of sound effect)
if self.sensor_type == "ULTRASOUND":
c_true = 331 + 0.6 * self.temp_model.temperature # m/s
scale_factor = 343 / c_true # 343 m/s is assumed speed at 20°C
d_scaled = true_dist * scale_factor
else:
d_scaled = true_dist
# Apply drift and noise
drift = self.drift_model.sample()
noise = np.random.normal(0, noise_std)
measured = d_scaled + drift + noise
# Drop-out probability for ultrasound (2% chance of returning max_range)
if self.sensor_type == "ULTRASOUND" and np.random.random() < 0.02:
measured = self.max_range
# Clamp measurement to valid range
return max(self.min_range, min(self.max_range, measured))
except Exception as e:
logger.error(f"Error in sensor measurement: {e}")
return self.max_range
# ==============================
# SENSOR MANAGEMENT
# ==============================
class DynamicCrosstalkManager:
def __init__(self, sensors, interference_dist=0.10, angle_overlap_rad=0.1745): # Updated interference_dist
self.sensors = sensors
self.N = len(sensors)
self.interference_dist = interference_dist
self.angle_overlap_rad = angle_overlap_rad
self.conflict = np.zeros((self.N, self.N), dtype=bool)
self._build_conflict_matrix()
self.slots = self._build_time_slots()
def _angle_diff(self, a_rad, b_rad, fov_a, fov_b):
d = abs((b_rad - a_rad + np.pi) % (2 * np.pi) - np.pi)
return d < (self.angle_overlap_rad + fov_a + fov_b)
def _build_conflict_matrix(self):
for i in range(self.N):
for j in range(i + 1, self.N):
si, sj = self.sensors[i], self.sensors[j]
dist = math.hypot(si.pos_x - sj.pos_x, si.pos_y - sj.pos_y)
if dist < self.interference_dist and self._angle_diff(si.angle_rad, sj.angle_rad, si.fov_rad, sj.fov_rad):
self.conflict[i, j] = self.conflict[j, i] = True
def _build_time_slots(self):
slots = []
for i in range(self.N):
placed = False
for slot in slots:
if all(not self.conflict[i, j] for j in slot):
slot.append(i)
placed = True
break
if not placed:
slots.append([i])
return slots
def firing_order(self):
return self.slots
class SensorManager:
def __init__(self, sensors):
self.sensors = sensors
def cluster_sensors(self, eps=0.1, min_samples=1):
from sklearn.cluster import DBSCAN
feats = np.array([[s.pos_x, s.pos_y, s.angle_rad] for s in self.sensors])
labels = DBSCAN(eps=eps, min_samples=min_samples).fit_predict(feats)
clusters = defaultdict(list)
for i, l in enumerate(labels):
clusters[l].append(i)
return list(clusters.values())
def schedule(self, heading_rad):
dtcm = DynamicCrosstalkManager(self.sensors, interference_dist=0.10, angle_overlap_rad=0.0873)
slots = dtcm.firing_order()
fwd, bwd = [], []
for slot in slots:
if any(abs(((self.sensors[i].angle_rad - heading_rad + np.pi) % (2 * np.pi)) - np.pi) > np.pi/2 for i in slot):
bwd.append(slot)
else:
fwd.append(slot)
return fwd + bwd
# ==============================
# MOTION & ODOMETRY
# ==============================
class MotionModel:
def __init__(self, wheel_base=0.1):
self.wheel_base = wheel_base
def update_pose(self, pose, v, omega, dt):
x, y, theta = pose
if abs(omega) < 1e-6:
return (x + v * math.cos(theta) * dt, y + v * math.sin(theta) * dt, theta)
R = v / omega
dth = omega * dt
return (x + R * (math.sin(theta + dth) - math.sin(theta)),
y - R * (math.cos(theta + dth) - math.cos(theta)),
(theta + dth) % (2 * np.pi))
class OdometrySimulator:
def __init__(self, noise_std_v=0.01, noise_std_omega=0.005):
self.noise_std_v = noise_std_v
self.noise_std_omega = noise_std_omega
def get_odometry(self, v_cmd, omega_cmd):
return (v_cmd + np.random.normal(0, self.noise_std_v),
omega_cmd + np.random.normal(0, self.noise_std_omega))
# ==============================
# SLAM ENGINE
# ==============================
class SLAMEngine:
def __init__(self, grid_resolution=0.02):
self.og = OccupancyGrid(resolution=grid_resolution)
self.isam = ISAM2(ISAM2Params())
self.initial = Values()
self.noise = Diagonal.Sigmas([1e-2, 1e-2, 1e-3])
self.isam_index = 1 # Start at 1 since x0 is initialized
self.graph = NonlinearFactorGraph()
self.initial.insert(gtsam.symbol('x', 0), Pose2(0, 0, 0))
self.graph.add(gtsam.PriorFactorPose2(gtsam.symbol('x', 0), Pose2(0, 0, 0), self.noise))
self.isam.update(self.graph, self.initial)
self.graph = NonlinearFactorGraph()
self.initial = Values()
def predict(self, odom):
try:
key0 = gtsam.symbol('x', self.isam_index - 1)
key1 = gtsam.symbol('x', self.isam_index)
prev = self.isam.calculateEstimate().atPose2(key0)
dx, dtheta = odom
incr = Pose2(dx, 0.0, dtheta)
self.initial.insert(key1, prev.compose(incr))
self.graph.add(BetweenFactorPose2(key0, key1, incr, self.noise))
self.isam.update(self.graph, self.initial)
self.graph = NonlinearFactorGraph()
self.initial = Values()
self.isam_index += 1
except Exception as e:
logger.error(f"Error in SLAM prediction: {e}")
def update(self, sensor_data):
try:
pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1))
for meas in sensor_data:
self.og.cone_update(*meas)
except Exception as e:
logger.error(f"Error in SLAM update: {e}")
def detect_loop_closure(self, threshold=0.2):
if self.isam_index < 10:
return
try:
curr_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', self.isam_index - 1))
curr_t = curr_pose.translation()
for i in range(max(0, self.isam_index - 50), self.isam_index - 5):
prev_pose = self.isam.calculateEstimate().atPose2(gtsam.symbol('x', i))
prev_t = prev_pose.translation()
dist = np.linalg.norm(curr_t - prev_t)
if dist < threshold:
rel = prev_pose.inverse().compose(curr_pose)
self.graph.add(BetweenFactorPose2(gtsam.symbol('x', i),
gtsam.symbol('x', self.isam_index - 1),
rel, self.noise))
logger.info(f"Loop closure detected between poses x{i} and x{self.isam_index - 1}")
break
except Exception as e:
logger.error(f"Error in loop closure detection: {e}")
def optimize(self):
try:
self.isam.update(self.graph, self.initial)
self.initial = Values()
self.graph = NonlinearFactorGraph()
except RuntimeError as e:
logger.error(f"Optimization failed: {e}")
# ==============================
# OCCUPANCY GRID & ANALYSIS
# ==============================
class OccupancyGrid:
def __init__(self, width_m=3.0, height_m=2.0, resolution=0.02):
self.width_m = width_m
self.height_m = height_m
self.resolution = resolution
self.cells_x = int(math.ceil(width_m / resolution))
self.cells_y = int(math.ceil(height_m / resolution))
self.grid = np.full((self.cells_y, self.cells_x), np.nan, dtype=float)
self.coverage = np.zeros_like(self.grid, dtype=int)
def world_to_map(self, wx, wy):
c = int(wx / self.resolution)
r = int(wy / self.resolution)
return (r, c) if 0 <= r < self.cells_y and 0 <= c < self.cells_x else None
def update_cell_logodds(self, row, col, prob):
p = max(min(prob, 0.99), 0.01)
if np.isnan(self.grid[row, col]):
self.grid[row, col] = 0.0
self.grid[row, col] += math.log(p / (1 - p))
self.grid[row, col] = np.clip(self.grid[row, col], -20, 20) # Clipping to prevent overflow
self.coverage[row, col] += 1
def cone_update(self, sx, sy, center_angle_rad, dist, fov_rad, sub_rays=7, occ_prob=0.7, free_prob=0.3):
half = fov_rad / 2
step = (2 * half) / (sub_rays - 1) if sub_rays > 1 else 0
start = center_angle_rad - half
for i in range(sub_rays):
a = start + i * step
dx, dy = math.cos(a), math.sin(a)
steps = max(1, int(dist / (self.resolution * 1.5)))
for s in range(steps):
frac = s / steps
wx, wy = sx + frac * dist * dx, sy + frac * dist * dy
m = self.world_to_map(wx, wy)
if not m:
break
self.update_cell_logodds(m[0], m[1], free_prob)
fin = self.world_to_map(sx + dist * dx, sy + dist * dy)
if fin:
self.update_cell_logodds(fin[0], fin[1], occ_prob)
def get_probability(self):
valid = ~np.isnan(self.grid)
prob = np.full_like(self.grid, np.nan)
prob[valid] = np.exp(self.grid[valid]) / (1 + np.exp(self.grid[valid]))
return prob
def save(self, fn):
with open(fn, 'wb') as f:
pickle.dump(self, f)
@staticmethod
def load(fn):
with open(fn, 'rb') as f:
return pickle.load(f)
class CoverageAnalyzer:
def __init__(self, og):
self.og = og
def blind_spots(self, threshold=0):
ys, xs = np.where(self.og.coverage <= threshold)
return list(zip(ys, xs))
# ==============================
# ENVIRONMENT & OBSTACLES
# ==============================
class Environment:
def __init__(self):
self.walls = [((0, 0), (3, 0)), ((3, 0), (3, 2)), ((3, 2), (0, 2)), ((0, 2), (0, 0)), ((1.2, 0.6), (2.4, 1.4))]
self.dynamic = []
def add_dynamic(self, obs):
self.dynamic.append(obs)
def raycast(self, sx, sy, angle, max_range):
best = None
dx, dy = math.cos(angle), math.sin(angle)
for (x1, y1), (x2, y2) in self.walls:
Lx, Ly = x2 - x1, y2 - y1
denom = dx * (-Ly) + dy * Lx
if abs(denom) < 1e-9:
continue
tx, ty = x1 - sx, y1 - sy
t = (tx * (-Ly) + ty * Lx) / denom
if t < 0 or t > max_range:
continue
u = ((sx + dx * t - x1) / Lx) if abs(Lx) > 1e-9 else ((sy + dy * t - y1) / Ly)
if 0 <= u <= 1 and (best is None or t < best):
best = t
d = best if best is not None else max_range
for obs in self.dynamic:
ox, oy = obs.position(t=0)
tval = (dx * (ox - sx) + dy * (oy - sy))
perp = abs(-dy * (ox - sx) + dx * (oy - sy))
if 0 <= tval <= d and perp < 0.05:
d = tval
return d
@staticmethod
def load_from_tiled(tmx_file):
try:
tmx = TiledMap(tmx_file)
walls = []
for layer in tmx.layers:
if hasattr(layer, 'objects'):
for obj in layer.objects:
if obj.points:
pts = [(obj.x + p[0], obj.y + p[1]) for p in obj.points]
walls += [((pts[i][0], pts[i][1]), (pts[i + 1][0], pts[i + 1][1])) for i in range(len(pts) - 1)]
elif obj.width and obj.height:
walls += [((obj.x, obj.y), (obj.x + obj.width, obj.y)),
((obj.x + obj.width, obj.y), (obj.x + obj.width, obj.y + obj.height)),
((obj.x + obj.width, obj.y + obj.height), (obj.x, obj.y + obj.height)),
((obj.x, obj.y + obj.height), (obj.x, obj.y))]
env = Environment()
env.walls = walls
return env
except Exception as e:
logger.error(f"Failed to load tiled map: {e}")
return ScenarioGenerator.random_walls()
class DynamicObstacle:
def __init__(self, path):
self.path = path
def position(self, t):
return self.path[int(t) % len(self.path)]
# ==============================
# SCENARIO & BENCHMARKING
# ==============================
class ScenarioGenerator:
@staticmethod
def random_walls(num=5, area=(3, 2)):
walls = []
for _ in range(num):
x1, y1 = random.random() * area[0], random.random() * area[1]
x2, y2 = random.random() * area[0], random.random() * area[1]
walls.append(((x1, y1), (x2, y2)))
env = Environment()
env.walls = walls
return env
# ==============================
# VISUALIZATION
# ==============================
class Visualizer:
def __init__(self):
self.figure = Figure()
self.canvas = FigureCanvas(self.figure)
self.ax = self.figure.add_subplot(111)
def plot_2d(self, og, pose=None, walls=None):
self.ax.clear()
prob = og.get_probability()
self.ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0,
extent=[0, og.width_m, 0, og.height_m])
self.ax.imshow(np.isnan(prob), origin='lower', cmap='gray_r', alpha=0.4,
extent=[0, og.width_m, 0, og.height_m])
self.ax.grid(True, which='both', linestyle='--', alpha=0.3)
self.ax.set_title('Occupancy Grid')
if walls:
for a, b in walls:
self.ax.plot([a[0], b[0]], [a[1], b[1]], 'b-', label='Walls' if a == walls[0] else None)
if pose:
self.ax.plot(pose[0], pose[1], 'ro', label='Robot')
self.ax.arrow(pose[0], pose[1], 0.1 * math.cos(pose[2]), 0.1 * math.sin(pose[2]),
color='r', head_width=0.05)
self.ax.legend()
self.canvas.draw()
# ==============================
# INTERACTIVE GUI LAYOUT EDITOR
# ==============================
class SensorLayoutEditor(QMainWindow):
def __init__(self, robot_config, env, slam, motion_model, odom_sim, sensor_manager):
super().__init__()
self.setWindowTitle('Sensor Layout Editor')
self.robot = robot_config
self.env = env
self.slam = slam
self.motion_model = motion_model
self.odom_sim = odom_sim
self.sensor_manager = sensor_manager
self.scale = 200
self.pose = [1.5, 1.0, 0.0]
self.t = 0
self.v_cmd = 0.0
self.ω_cmd = 0.0
self.pressed_keys = set()
self.visualizer = Visualizer()
self.central_widget = QWidget()
self.setCentralWidget(self.central_widget)
self.layout = QVBoxLayout(self.central_widget)
self.scene = QGraphicsScene()
self.view = QGraphicsView(self.scene)
self.layout.addWidget(self.view)
self.layout.addWidget(self.visualizer.canvas)
self.status_bar = QStatusBar()
self.setStatusBar(self.status_bar)
self.status_bar.showMessage("Simulation ready - Use Z/S for forward/back, Q/D for left/right, X to stop")
self._draw_robot()
self._draw_sensors()
self._setup_timer()
self.show()
def _draw_robot(self):
w, h = self.robot.width * self.scale, self.robot.length * self.scale
self.robot_item = self.scene.addRect(-h/2, -w/2, h, w, pen=QtGui.QPen(QtCore.Qt.black), brush=QtGui.QBrush(QtCore.Qt.lightGray))
x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale
self.robot_item.setPos(x, y)
self.robot_item.setRotation(-math.degrees(self.pose[2]))
def _draw_sensors(self):
self.items = []
for sRef in self.robot.sensors:
x, y = sRef.pos_x * self.scale, -sRef.pos_y * self.scale
item = QGraphicsEllipseItem(x - 5, y - 5, 10, 10)
item.setBrush(QtGui.QBrush(QtCore.Qt.red))
item.setFlag(QGraphicsEllipseItem.ItemIsMovable)
item.sensor = sRef
self.scene.addItem(item)
line = QGraphicsLineItem(x, y, x + math.cos(sRef.angle_rad) * 20, y - math.sin(sRef.angle_rad) * 20)
self.scene.addItem(line)
self.items.append((item, line))
def _setup_timer(self):
self.timer = QtCore.QTimer()
self.timer.timeout.connect(self.update_simulation)
self.timer.start(100) # 10 Hz
def update_simulation(self):
try:
v, ω = self.odom_sim.get_odometry(self.v_cmd, self.ω_cmd)
self.pose = self.motion_model.update_pose(self.pose, v, ω, dt=0.1)
self.slam.predict((v * 0.1, ω * 0.1))
self.slam.optimize()
schedule = self.sensor_manager.schedule(self.pose[2])
sensor_data = []
for slot in schedule:
meas_group = []
for idx in slot:
sRef = self.robot.sensors[idx]
sx = self.pose[0] + sRef.pos_x * math.cos(self.pose[2]) - sRef.pos_y * math.sin(self.pose[2])
sy = self.pose[1] + sRef.pos_x * math.sin(self.pose[2]) + sRef.pos_y * math.cos(self.pose[2])
ang = self.pose[2] + sRef.angle_rad
d = sRef.measure(self.env, sx, sy, ang)
meas_group.append((sx, sy, ang, d, sRef.fov_rad))
sensor_data += meas_group
self.slam.update(sensor_data)
self.slam.detect_loop_closure()
self.slam.optimize()
self.visualizer.plot_2d(self.slam.og, pose=self.pose, walls=self.env.walls)
ca = CoverageAnalyzer(self.slam.og)
bs = ca.blind_spots(0)
self.status_bar.showMessage(
f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells"
)
logger.info(f"t={self.t/10:.1f}s: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}, Blind spots: {len(bs)} cells")
x, y = self.pose[0] * self.scale, -self.pose[1] * self.scale
self.robot_item.setPos(x, y)
self.robot_item.setRotation(-math.degrees(self.pose[2]))
self.t += 1
except Exception as e:
logger.error(f"Simulation update failed: {e}")
self.status_bar.showMessage(f"Error: {e}")
self.timer.stop()
def closeEvent(self, event):
try:
# Save map as pickle
with open("map.pkl", "wb") as f:
pickle.dump(self.slam.og, f)
logger.info("Map saved as map.pkl")
# Save raw map as PNG (without obstacles)
prob = self.slam.og.get_probability()
fig, ax = matplotlib.pyplot.subplots()
ax.imshow(prob, origin='lower', cmap='gray', vmin=0.0, vmax=1.0,
extent=[0, self.slam.og.width_m, 0, self.slam.og.height_m])
ax.axis('off')
fig.savefig("map.png", bbox_inches='tight', pad_inches=0)
logger.info("Map image saved as map.png")
matplotlib.pyplot.close(fig)
except Exception as e:
logger.error(f"Error during map saving: {e}")
def keyPressEvent(self, event):
key = event.key()
if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]:
self.pressed_keys.add(key)
self._update_velocities()
if key == QtCore.Qt.Key_X:
self.pressed_keys.clear()
self.v_cmd = 0.0
self.ω_cmd = 0.0
self.status_bar.showMessage(f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}")
elif key == QtCore.Qt.Key_Space:
if self.timer.isActive():
self.timer.stop()
self.status_bar.showMessage("Simulation paused")
else:
self.timer.start(100)
self.status_bar.showMessage(f"Simulation resumed: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}")
elif key == QtCore.Qt.Key_Escape:
fname, _ = QFileDialog.getSaveFileName(self, 'Save Layout', '.', 'Pickle Files (*.pkl)')
if fname:
with open(fname, 'wb') as f:
pickle.dump(self.robot, f)
self.status_bar.showMessage(f"Layout saved to {fname}")
super().keyPressEvent(event)
def keyReleaseEvent(self, event):
key = event.key()
if key in [QtCore.Qt.Key_Z, QtCore.Qt.Key_S, QtCore.Qt.Key_Q, QtCore.Qt.Key_D]:
self.pressed_keys.discard(key)
self._update_velocities()
super().keyReleaseEvent(event)
def _update_velocities(self):
self.v_cmd = 0.0
self.ω_cmd = 0.0
if QtCore.Qt.Key_Z in self.pressed_keys:
self.v_cmd = 0.2
elif QtCore.Qt.Key_S in self.pressed_keys:
self.v_cmd = -0.2
if QtCore.Qt.Key_Q in self.pressed_keys:
self.ω_cmd = 0.5
elif QtCore.Qt.Key_D in self.pressed_keys:
self.ω_cmd = -0.5
self.status_bar.showMessage(
f"Moving: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}"
if self.v_cmd != 0 or self.ω_cmd != 0 else
f"Stopped: v_cmd={self.v_cmd:.2f}, ω_cmd={self.ω_cmd:.2f}"
)
# ==============================
# ROBOT CONFIGURATION
# ==============================
class RobotConfig:
def __init__(self, width=0.3, length=0.5, height=0.2):
self.width = width
self.length = length
self.height = height
self.sensors = []
def add_sensor(self, sensor):
self.sensors.append(sensor)
# ==============================
# MAIN
# ==============================
def main(gui=True, tiled_map=None):
robot = RobotConfig(width=0.24, length=0.16, height=0.2)
positions = [
(0.08, 0.11), # Front-left
(0.08, -0.11), # Front-right
(0.06, 0.12), # Left-front
(-0.06, 0.12), # Left-back
(0.06, -0.12), # Right-front
(-0.06, -0.12) # Right-back
]
angles = [
np.pi/4, # Front-left: 45° left
-np.pi/4, # Front-right: 45° right
np.pi/2, # Left-front: 90° left
np.pi/2, # Left-back: 90° left
-np.pi/2, # Right-front: 90° right
-np.pi/2 # Right-back: 90° right
]
for i, (pos_x, pos_y) in enumerate(positions):
angle_rad = angles[i]
# Add ultrasound sensor with realistic parameters
robot.add_sensor(Sensor(
name=f"Ultrasound{i+1}",
sensor_type="ULTRASOUND",
pos_x=pos_x,
pos_y=pos_y,
angle_rad=angle_rad,
min_range=0.2,
max_range=2.5, # Reduced from 4.0 m
fov_rad=np.radians(25), # Reduced from 30°
noise_std=0.005 # Base noise, scaled in measure()
))
# Add IR sensor with realistic parameters
robot.add_sensor(Sensor(
name=f"IR{i+1}",
sensor_type="IR",
pos_x=pos_x,
pos_y=pos_y,
angle_rad=angle_rad,
min_range=0.15, # Increased from 0.1 m
max_range=0.8, # Reduced from 1.5 m
fov_rad=np.radians(5), # Reduced from 15°
noise_std=0.01 # Base noise, overridden by 5% distance
))
env = Environment.load_from_tiled(tmx_file=tiled_map) if tiled_map else ScenarioGenerator.random_walls()
motion_model = MotionModel()
odom_sim = OdometrySimulator()
slam = SLAMEngine()
sensor_manager = SensorManager(robot.sensors)
if gui:
app = QApplication(sys.argv)
editor = SensorLayoutEditor(robot, env, slam, motion_model, odom_sim, sensor_manager)
sys.exit(app.exec_())
else:
pose = [1.5, 1.0, 0.0]
visualizer = Visualizer()
commands = [(0.2, 0.0), (0.2, 0.1), (0.1, -0.05)]
step, t = 0, 0
start_time = time.time()
while step < len(commands):
current_time = time.time()
if current_time - start_time >= 0.1:
try:
v_cmd, om_cmd = commands[step]
v, om = odom_sim.get_odometry(v_cmd, om_cmd)
pose = motion_model.update_pose(pose, v, om, dt=0.1)
slam.predict((v * 0.1, om * 0.1))
schedule = sensor_manager.schedule(pose[2])
sensor_data = []
for slot in schedule:
meas_group = []
for idx in slot:
s = robot.sensors[idx]
sx = pose[0] + s.pos_x * math.cos(pose[2]) - s.pos_y * math.sin(pose[2])
sy = pose[1] + s.pos_x * math.sin(pose[2]) + s.pos_y * math.cos(pose[2])
ang = pose[2] + s.angle_rad
d = s.measure(env, sx, sy, ang)
meas_group.append((sx, sy, ang, d, s.fov_rad))
sensor_data += meas_group
slam.update(sensor_data)
slam.detect_loop_closure()
slam.optimize()
visualizer.plot_2d(slam.og, pose=pose, walls=env.walls)
ca = CoverageAnalyzer(slam.og)
bs = ca.blind_spots(0)
logger.info(f"Step {step}, t={t}: Blind spots: {len(bs)} cells")
t += 1
if t >= 10:
t = 0
step += 1
start_time = current_time
except Exception as e:
logger.error(f"Non-GUI simulation failed: {e}")
break
if __name__ == '__main__':
main(gui=True)