1use std::{
2 borrow::Cow,
3 collections::HashMap,
4 mem,
5 sync::{Arc, Mutex},
6 thread::sleep,
7 time::Duration,
8};
9
10use anyhow::format_err;
11use arci::{
12 nalgebra as na, BaseVelocity, JointPositionLimit, JointPositionLimiter, JointTrajectoryClient,
13 JointVelocityLimiter, Localization, MoveBase, Navigation, TrajectoryPoint, WaitFuture,
14};
15use schemars::JsonSchema;
16use scoped_sleep::ScopedSleep;
17use serde::{Deserialize, Serialize};
18use tokio::sync::oneshot;
19use tracing::debug;
20use url::Url;
21
22use crate::utils::*;
23
24#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
25#[serde(deny_unknown_fields)]
26pub struct UrdfVizWebClientConfig {
27 pub name: String,
28 pub joint_names: Option<Vec<String>>,
29 #[serde(default)]
30 pub wrap_with_joint_position_limiter: bool,
31 #[serde(default)]
32 pub wrap_with_joint_velocity_limiter: bool,
33 pub joint_velocity_limits: Option<Vec<f64>>,
34
35 pub joint_position_limits: Option<Vec<JointPositionLimit>>,
39}
40
41pub fn create_joint_trajectory_clients(
48 configs: Vec<UrdfVizWebClientConfig>,
49 urdf_robot: Option<&urdf_rs::Robot>,
50) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
51 create_joint_trajectory_clients_inner(configs, urdf_robot, false)
52}
53
54pub fn create_joint_trajectory_clients_lazy(
58 configs: Vec<UrdfVizWebClientConfig>,
59 urdf_robot: Option<&urdf_rs::Robot>,
60) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
61 create_joint_trajectory_clients_inner(configs, urdf_robot, true)
62}
63
64fn create_joint_trajectory_clients_inner(
65 configs: Vec<UrdfVizWebClientConfig>,
66 urdf_robot: Option<&urdf_rs::Robot>,
67 lazy: bool,
68) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
69 if configs.is_empty() {
70 return Ok(HashMap::default());
71 }
72
73 let mut clients = HashMap::new();
74 let mut urdf_robot = urdf_robot.map(Cow::Borrowed);
75
76 let create_all_client = move || {
77 debug!("create_joint_trajectory_clients_inner: creating UrdfVizWebClient");
78 let all_client = UrdfVizWebClient::default();
79 all_client.run_send_joint_positions_thread();
80 Ok(all_client)
81 };
82 let all_client: Arc<dyn JointTrajectoryClient> = if lazy && urdf_robot.is_some() {
83 let urdf_robot = urdf_robot.as_ref().unwrap();
84 Arc::new(arci::Lazy::with_joint_names(
85 create_all_client,
86 urdf_robot
87 .joints
88 .iter()
89 .filter(|j| j.joint_type != urdf_rs::JointType::Fixed)
90 .map(|j| j.name.clone())
91 .collect(),
92 ))
93 } else {
94 let client = create_all_client()?;
97 if urdf_robot.is_none()
98 && configs.iter().any(|config| {
99 config.wrap_with_joint_position_limiter && config.joint_position_limits.is_none()
100 || config.wrap_with_joint_velocity_limiter
101 && config.joint_velocity_limits.is_none()
102 })
103 {
104 urdf_robot = Some(Cow::Owned(client.get_urdf()?));
105 }
106 Arc::new(client)
107 };
108
109 for config in configs {
110 if config.wrap_with_joint_position_limiter
111 && config.joint_position_limits.is_none()
112 && urdf_robot.is_none()
113 {
114 return Err(format_err!(
115 "`wrap_with_joint_position_limiter=true` requires urdf or joint_position_limits \
116 is specified",
117 )
118 .into());
119 }
120 let client = if let Some(joint_names) = &config.joint_names {
121 Arc::new(arci::PartialJointTrajectoryClient::new(
122 joint_names.to_owned(),
123 all_client.clone(),
124 )?)
125 } else {
126 all_client.clone()
127 };
128 let client: Arc<dyn JointTrajectoryClient> = if config.wrap_with_joint_velocity_limiter {
129 if config.wrap_with_joint_position_limiter {
130 Arc::new(new_joint_position_limiter(
131 new_joint_velocity_limiter(
132 client,
133 config.joint_velocity_limits,
134 urdf_robot.as_deref(),
135 )?,
136 config.joint_position_limits,
137 urdf_robot.as_deref(),
138 )?)
139 } else {
140 Arc::new(new_joint_velocity_limiter(
141 client,
142 config.joint_velocity_limits,
143 urdf_robot.as_deref(),
144 )?)
145 }
146 } else if config.wrap_with_joint_position_limiter {
147 Arc::new(new_joint_position_limiter(
148 client,
149 config.joint_position_limits,
150 urdf_robot.as_deref(),
151 )?)
152 } else {
153 client
154 };
155 if clients.insert(config.name.clone(), client).is_some() {
156 return Err(
157 format_err!("client named '{}' has already been specified", config.name).into(),
158 );
159 }
160 }
161 Ok(clients)
162}
163
164fn new_joint_position_limiter<C>(
165 client: C,
166 position_limits: Option<Vec<JointPositionLimit>>,
167 urdf_robot: Option<&urdf_rs::Robot>,
168) -> Result<JointPositionLimiter<C>, arci::Error>
169where
170 C: JointTrajectoryClient,
171{
172 match position_limits {
173 Some(position_limits) => Ok(JointPositionLimiter::new(client, position_limits)),
174 None => JointPositionLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
175 }
176}
177
178fn new_joint_velocity_limiter<C>(
179 client: C,
180 velocity_limits: Option<Vec<f64>>,
181 urdf_robot: Option<&urdf_rs::Robot>,
182) -> Result<JointVelocityLimiter<C>, arci::Error>
183where
184 C: JointTrajectoryClient,
185{
186 match velocity_limits {
187 Some(velocity_limits) => Ok(JointVelocityLimiter::new(client, velocity_limits)),
188 None => JointVelocityLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
189 }
190}
191
192#[derive(Debug, Default)]
193enum SendJointPositionsTarget {
194 Some {
195 trajectory: Vec<TrajectoryPoint>,
196 sender: oneshot::Sender<Result<(), arci::Error>>,
197 },
198 #[default]
199 None,
200 Abort,
201}
202
203impl SendJointPositionsTarget {
204 fn set_positions(&mut self, positions: Vec<f64>, duration: Duration) -> WaitFuture {
205 self.set_trajectory(vec![TrajectoryPoint::new(positions, duration)])
206 }
207
208 fn set_trajectory(&mut self, trajectory: Vec<TrajectoryPoint>) -> WaitFuture {
209 let (sender, receiver) = oneshot::channel();
210 *self = Self::Some { trajectory, sender };
211 WaitFuture::new(async move { receiver.await.map_err(anyhow::Error::from)? })
212 }
213}
214
215#[derive(Debug, Default)]
216struct ThreadState {
217 has_send_joint_positions_thread: bool,
218 has_send_velocity_thread: bool,
219}
220
221#[derive(Debug, Clone)]
222pub struct UrdfVizWebClient(Arc<UrdfVizWebClientInner>);
223
224#[derive(Debug)]
225struct UrdfVizWebClientInner {
226 base_url: Url,
227 joint_names: Vec<String>,
228 velocity: Mutex<BaseVelocity>,
229 send_joint_positions_target: Mutex<SendJointPositionsTarget>,
230 threads: Mutex<ThreadState>,
231}
232
233impl UrdfVizWebClientInner {
234 fn is_dropping(self: &Arc<Self>) -> bool {
235 let state = self.threads.lock().unwrap();
236 Arc::strong_count(self)
237 <= state.has_send_joint_positions_thread as usize
238 + state.has_send_velocity_thread as usize
239 }
240}
241
242impl UrdfVizWebClient {
243 pub fn new(base_url: Url) -> Result<Self, anyhow::Error> {
244 let joint_state = get_joint_positions(&base_url)?;
245 Ok(Self(Arc::new(UrdfVizWebClientInner {
246 base_url,
247 joint_names: joint_state.names,
248 velocity: Mutex::new(BaseVelocity::default()),
249 send_joint_positions_target: Mutex::new(Default::default()),
250 threads: Mutex::new(ThreadState::default()),
251 })))
252 }
253
254 pub fn run_send_velocity_thread(&self) {
255 if mem::replace(
256 &mut self.0.threads.lock().unwrap().has_send_velocity_thread,
257 true,
258 ) {
259 panic!("send_velocity_thread is running");
260 }
261
262 let inner = self.0.clone();
263 std::thread::spawn(move || {
264 struct Bomb(Arc<UrdfVizWebClientInner>);
265 impl Drop for Bomb {
266 fn drop(&mut self) {
267 self.0.threads.lock().unwrap().has_send_velocity_thread = false;
268 debug!("terminating send_velocity_thread");
269 }
270 }
271
272 let bomb = Bomb(inner);
273 'outer: while !bomb.0.is_dropping() {
274 const DT: f64 = 0.02;
275
276 macro_rules! continue_on_err {
277 ($expr:expr $(,)?) => {
278 match $expr {
279 Ok(x) => x,
280 Err(_e) => {
281 continue 'outer;
282 }
283 }
284 };
285 }
286
287 let _guard = ScopedSleep::from_secs(DT);
288 let velocity = bomb.0.velocity.lock().unwrap();
289 let mut pose = continue_on_err!(get_robot_origin(&bomb.0.base_url));
290 let mut rpy = euler_angles_from_quaternion(&pose.quaternion);
291 let yaw = rpy.2;
292 pose.position[0] += (velocity.x * yaw.cos() - velocity.y * yaw.sin()) * DT;
293 pose.position[1] += (velocity.x * yaw.sin() + velocity.y * yaw.cos()) * DT;
294 rpy.2 += velocity.theta * DT;
295 pose.quaternion = quaternion_from_euler_angles(rpy.0, rpy.1, rpy.2);
296 continue_on_err!(send_robot_origin(&bomb.0.base_url, pose));
297 }
298 });
299 }
300
301 pub fn run_send_joint_positions_thread(&self) {
302 if mem::replace(
303 &mut self
304 .0
305 .threads
306 .lock()
307 .unwrap()
308 .has_send_joint_positions_thread,
309 true,
310 ) {
311 panic!("send_joint_positions_thread is running");
312 }
313
314 let inner = self.0.clone();
315 std::thread::spawn(move || {
316 struct Bomb(Arc<UrdfVizWebClientInner>);
317 impl Drop for Bomb {
318 fn drop(&mut self) {
319 self.0
320 .threads
321 .lock()
322 .unwrap()
323 .has_send_joint_positions_thread = false;
324 debug!("terminating send_joint_positions_thread");
325 }
326 }
327
328 let bomb = Bomb(inner);
329 'outer: while !bomb.0.is_dropping() {
330 const UNIT_DURATION: Duration = Duration::from_millis(10);
331 let prev = mem::take(&mut *bomb.0.send_joint_positions_target.lock().unwrap());
332 let (trajectory, sender) = match prev {
333 SendJointPositionsTarget::Some { trajectory, sender } => (trajectory, sender),
334 SendJointPositionsTarget::None | SendJointPositionsTarget::Abort => {
335 sleep(UNIT_DURATION);
336 continue;
337 }
338 };
339
340 macro_rules! continue_on_err {
341 ($expr:expr $(,)?) => {
342 match $expr {
343 Ok(x) => x,
344 Err(e) => {
345 let _ = sender.send(Err(e));
347 continue 'outer;
348 }
349 }
350 };
351 }
352
353 let mut last_time = Duration::default();
354 for target in trajectory {
355 let duration = target.time_from_start - last_time;
356 last_time = target.time_from_start;
357 if duration.as_nanos() == 0 {
358 let start_time = std::time::Instant::now();
359 let target_state = JointState {
360 names: bomb.0.joint_names.clone(),
361 positions: target.positions.clone(),
362 };
363 continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
364
365 let elapsed = start_time.elapsed();
366 if UNIT_DURATION > elapsed {
367 let sleep_duration = UNIT_DURATION - elapsed;
368 sleep(sleep_duration);
369 }
370 continue;
371 }
372
373 let current = continue_on_err!(get_joint_positions(&bomb.0.base_url)).positions;
374 let duration_sec = duration.as_secs_f64();
375 let unit_sec = UNIT_DURATION.as_secs_f64();
376 let trajectories = continue_on_err!(openrr_planner::interpolate(
377 &[current, target.positions.to_vec()],
378 duration_sec,
379 unit_sec,
380 )
381 .ok_or_else(|| arci::Error::InterpolationError("".to_owned())));
382
383 for traj in trajectories {
384 if matches!(
385 *bomb.0.send_joint_positions_target.lock().unwrap(),
386 SendJointPositionsTarget::Some { .. } | SendJointPositionsTarget::Abort
387 ) {
388 debug!("Abort old target");
389 let _ = sender.send(Ok(()));
391 continue 'outer;
392 }
393 let start_time = std::time::Instant::now();
394 let target_state = JointState {
395 names: bomb.0.joint_names.clone(),
396 positions: traj.position,
397 };
398 continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
399 let elapsed = start_time.elapsed();
400 if UNIT_DURATION > elapsed {
401 let sleep_duration = UNIT_DURATION - elapsed;
402 sleep(sleep_duration);
403 }
404 }
405 }
406 let _ = sender.send(Ok(()));
408 }
409 });
410 }
411
412 pub fn abort(&self) {
413 *self.0.send_joint_positions_target.lock().unwrap() = SendJointPositionsTarget::Abort;
414 }
415
416 pub fn get_urdf(&self) -> Result<urdf_rs::Robot, arci::Error> {
417 get_urdf(&self.0.base_url)
418 }
419}
420
421impl Default for UrdfVizWebClient {
422 fn default() -> Self {
423 Self::new(Url::parse("http://127.0.0.1:7777").unwrap()).unwrap()
424 }
425}
426
427impl JointTrajectoryClient for UrdfVizWebClient {
428 fn joint_names(&self) -> Vec<String> {
429 self.0.joint_names.clone()
430 }
431
432 fn current_joint_positions(&self) -> Result<Vec<f64>, arci::Error> {
433 Ok(get_joint_positions(&self.0.base_url)?.positions)
434 }
435
436 fn send_joint_positions(
437 &self,
438 positions: Vec<f64>,
439 duration: Duration,
440 ) -> Result<WaitFuture, arci::Error> {
441 if !self
442 .0
443 .threads
444 .lock()
445 .unwrap()
446 .has_send_joint_positions_thread
447 {
448 panic!("send_joint_positions called without run_send_joint_positions_thread being called first");
449 }
450
451 Ok(self
452 .0
453 .send_joint_positions_target
454 .lock()
455 .unwrap()
456 .set_positions(positions, duration))
457 }
458
459 fn send_joint_trajectory(
460 &self,
461 trajectory: Vec<TrajectoryPoint>,
462 ) -> Result<WaitFuture, arci::Error> {
463 if trajectory.is_empty() {
464 return Ok(WaitFuture::ready());
465 }
466
467 Ok(self
468 .0
469 .send_joint_positions_target
470 .lock()
471 .unwrap()
472 .set_trajectory(trajectory))
473 }
474}
475
476impl Localization for UrdfVizWebClient {
477 fn current_pose(&self, _frame_id: &str) -> Result<na::Isometry2<f64>, arci::Error> {
478 let pose = get_robot_origin(&self.0.base_url)?;
479 let yaw = euler_angles_from_quaternion(&pose.quaternion).2;
480 Ok(na::Isometry2::new(
481 na::Vector2::new(pose.position[0], pose.position[1]),
482 yaw,
483 ))
484 }
485}
486
487impl Navigation for UrdfVizWebClient {
488 fn send_goal_pose(
489 &self,
490 goal: na::Isometry2<f64>,
491 _frame_id: &str,
492 _timeout: Duration,
493 ) -> Result<WaitFuture, arci::Error> {
494 send_robot_origin(&self.0.base_url, goal.into())?;
496
497 Ok(WaitFuture::ready())
498 }
499
500 fn cancel(&self) -> Result<(), arci::Error> {
501 todo!()
502 }
503}
504
505impl MoveBase for UrdfVizWebClient {
506 fn send_velocity(&self, velocity: &arci::BaseVelocity) -> Result<(), arci::Error> {
507 if !self.0.threads.lock().unwrap().has_send_velocity_thread {
508 panic!("send_velocity called without run_send_velocity_thread being called first");
509 }
510
511 get_robot_origin(&self.0.base_url)?;
515
516 velocity.clone_into(&mut self.0.velocity.lock().unwrap());
517 Ok(())
518 }
519
520 fn current_velocity(&self) -> Result<arci::BaseVelocity, arci::Error> {
521 Ok(self.0.velocity.lock().unwrap().to_owned())
522 }
523}