arci_urdf_viz/
client.rs

1use std::{
2    borrow::Cow,
3    collections::HashMap,
4    mem,
5    sync::{Arc, Mutex},
6    thread::sleep,
7    time::Duration,
8};
9
10use anyhow::format_err;
11use arci::{
12    nalgebra as na, BaseVelocity, JointPositionLimit, JointPositionLimiter, JointTrajectoryClient,
13    JointVelocityLimiter, Localization, MoveBase, Navigation, TrajectoryPoint, WaitFuture,
14};
15use schemars::JsonSchema;
16use scoped_sleep::ScopedSleep;
17use serde::{Deserialize, Serialize};
18use tokio::sync::oneshot;
19use tracing::debug;
20use url::Url;
21
22use crate::utils::*;
23
24#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
25#[serde(deny_unknown_fields)]
26pub struct UrdfVizWebClientConfig {
27    pub name: String,
28    pub joint_names: Option<Vec<String>>,
29    #[serde(default)]
30    pub wrap_with_joint_position_limiter: bool,
31    #[serde(default)]
32    pub wrap_with_joint_velocity_limiter: bool,
33    pub joint_velocity_limits: Option<Vec<f64>>,
34
35    // TOML format has a restriction that if a table itself contains tables,
36    // all keys with non-table values must be emitted first.
37    // Therefore, these fields must be located at the end of the struct.
38    pub joint_position_limits: Option<Vec<JointPositionLimit>>,
39}
40
41/// Returns a map of clients for each config.
42///
43/// The key for the map is [the name of the client](UrdfVizWebClientConfig::name),
44/// and in case of conflict, it becomes an error.
45///
46/// Returns empty map when `configs` are empty.
47pub fn create_joint_trajectory_clients(
48    configs: Vec<UrdfVizWebClientConfig>,
49    urdf_robot: Option<&urdf_rs::Robot>,
50) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
51    create_joint_trajectory_clients_inner(configs, urdf_robot, false)
52}
53
54/// Returns a map of clients that will be created lazily for each config.
55///
56/// See [create_joint_trajectory_clients] for more.
57pub fn create_joint_trajectory_clients_lazy(
58    configs: Vec<UrdfVizWebClientConfig>,
59    urdf_robot: Option<&urdf_rs::Robot>,
60) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
61    create_joint_trajectory_clients_inner(configs, urdf_robot, true)
62}
63
64fn create_joint_trajectory_clients_inner(
65    configs: Vec<UrdfVizWebClientConfig>,
66    urdf_robot: Option<&urdf_rs::Robot>,
67    lazy: bool,
68) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
69    if configs.is_empty() {
70        return Ok(HashMap::default());
71    }
72
73    let mut clients = HashMap::new();
74    let mut urdf_robot = urdf_robot.map(Cow::Borrowed);
75
76    let create_all_client = move || {
77        debug!("create_joint_trajectory_clients_inner: creating UrdfVizWebClient");
78        let all_client = UrdfVizWebClient::default();
79        all_client.run_send_joint_positions_thread();
80        Ok(all_client)
81    };
82    let all_client: Arc<dyn JointTrajectoryClient> = if lazy && urdf_robot.is_some() {
83        let urdf_robot = urdf_robot.as_ref().unwrap();
84        Arc::new(arci::Lazy::with_joint_names(
85            create_all_client,
86            urdf_robot
87                .joints
88                .iter()
89                .filter(|j| j.joint_type != urdf_rs::JointType::Fixed)
90                .map(|j| j.name.clone())
91                .collect(),
92        ))
93    } else {
94        // Subsequent processing call joint_names, so we cannot make the client
95        // lazy if joint_names is not specified.
96        let client = create_all_client()?;
97        if urdf_robot.is_none()
98            && configs.iter().any(|config| {
99                config.wrap_with_joint_position_limiter && config.joint_position_limits.is_none()
100                    || config.wrap_with_joint_velocity_limiter
101                        && config.joint_velocity_limits.is_none()
102            })
103        {
104            urdf_robot = Some(Cow::Owned(client.get_urdf()?));
105        }
106        Arc::new(client)
107    };
108
109    for config in configs {
110        if config.wrap_with_joint_position_limiter
111            && config.joint_position_limits.is_none()
112            && urdf_robot.is_none()
113        {
114            return Err(format_err!(
115                "`wrap_with_joint_position_limiter=true` requires urdf or joint_position_limits \
116                is specified",
117            )
118            .into());
119        }
120        let client = if let Some(joint_names) = &config.joint_names {
121            Arc::new(arci::PartialJointTrajectoryClient::new(
122                joint_names.to_owned(),
123                all_client.clone(),
124            )?)
125        } else {
126            all_client.clone()
127        };
128        let client: Arc<dyn JointTrajectoryClient> = if config.wrap_with_joint_velocity_limiter {
129            if config.wrap_with_joint_position_limiter {
130                Arc::new(new_joint_position_limiter(
131                    new_joint_velocity_limiter(
132                        client,
133                        config.joint_velocity_limits,
134                        urdf_robot.as_deref(),
135                    )?,
136                    config.joint_position_limits,
137                    urdf_robot.as_deref(),
138                )?)
139            } else {
140                Arc::new(new_joint_velocity_limiter(
141                    client,
142                    config.joint_velocity_limits,
143                    urdf_robot.as_deref(),
144                )?)
145            }
146        } else if config.wrap_with_joint_position_limiter {
147            Arc::new(new_joint_position_limiter(
148                client,
149                config.joint_position_limits,
150                urdf_robot.as_deref(),
151            )?)
152        } else {
153            client
154        };
155        if clients.insert(config.name.clone(), client).is_some() {
156            return Err(
157                format_err!("client named '{}' has already been specified", config.name).into(),
158            );
159        }
160    }
161    Ok(clients)
162}
163
164fn new_joint_position_limiter<C>(
165    client: C,
166    position_limits: Option<Vec<JointPositionLimit>>,
167    urdf_robot: Option<&urdf_rs::Robot>,
168) -> Result<JointPositionLimiter<C>, arci::Error>
169where
170    C: JointTrajectoryClient,
171{
172    match position_limits {
173        Some(position_limits) => Ok(JointPositionLimiter::new(client, position_limits)),
174        None => JointPositionLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
175    }
176}
177
178fn new_joint_velocity_limiter<C>(
179    client: C,
180    velocity_limits: Option<Vec<f64>>,
181    urdf_robot: Option<&urdf_rs::Robot>,
182) -> Result<JointVelocityLimiter<C>, arci::Error>
183where
184    C: JointTrajectoryClient,
185{
186    match velocity_limits {
187        Some(velocity_limits) => Ok(JointVelocityLimiter::new(client, velocity_limits)),
188        None => JointVelocityLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
189    }
190}
191
192#[derive(Debug, Default)]
193enum SendJointPositionsTarget {
194    Some {
195        trajectory: Vec<TrajectoryPoint>,
196        sender: oneshot::Sender<Result<(), arci::Error>>,
197    },
198    #[default]
199    None,
200    Abort,
201}
202
203impl SendJointPositionsTarget {
204    fn set_positions(&mut self, positions: Vec<f64>, duration: Duration) -> WaitFuture {
205        self.set_trajectory(vec![TrajectoryPoint::new(positions, duration)])
206    }
207
208    fn set_trajectory(&mut self, trajectory: Vec<TrajectoryPoint>) -> WaitFuture {
209        let (sender, receiver) = oneshot::channel();
210        *self = Self::Some { trajectory, sender };
211        WaitFuture::new(async move { receiver.await.map_err(anyhow::Error::from)? })
212    }
213}
214
215#[derive(Debug, Default)]
216struct ThreadState {
217    has_send_joint_positions_thread: bool,
218    has_send_velocity_thread: bool,
219}
220
221#[derive(Debug, Clone)]
222pub struct UrdfVizWebClient(Arc<UrdfVizWebClientInner>);
223
224#[derive(Debug)]
225struct UrdfVizWebClientInner {
226    base_url: Url,
227    joint_names: Vec<String>,
228    velocity: Mutex<BaseVelocity>,
229    send_joint_positions_target: Mutex<SendJointPositionsTarget>,
230    threads: Mutex<ThreadState>,
231}
232
233impl UrdfVizWebClientInner {
234    fn is_dropping(self: &Arc<Self>) -> bool {
235        let state = self.threads.lock().unwrap();
236        Arc::strong_count(self)
237            <= state.has_send_joint_positions_thread as usize
238                + state.has_send_velocity_thread as usize
239    }
240}
241
242impl UrdfVizWebClient {
243    pub fn new(base_url: Url) -> Result<Self, anyhow::Error> {
244        let joint_state = get_joint_positions(&base_url)?;
245        Ok(Self(Arc::new(UrdfVizWebClientInner {
246            base_url,
247            joint_names: joint_state.names,
248            velocity: Mutex::new(BaseVelocity::default()),
249            send_joint_positions_target: Mutex::new(Default::default()),
250            threads: Mutex::new(ThreadState::default()),
251        })))
252    }
253
254    pub fn run_send_velocity_thread(&self) {
255        if mem::replace(
256            &mut self.0.threads.lock().unwrap().has_send_velocity_thread,
257            true,
258        ) {
259            panic!("send_velocity_thread is running");
260        }
261
262        let inner = self.0.clone();
263        std::thread::spawn(move || {
264            struct Bomb(Arc<UrdfVizWebClientInner>);
265            impl Drop for Bomb {
266                fn drop(&mut self) {
267                    self.0.threads.lock().unwrap().has_send_velocity_thread = false;
268                    debug!("terminating send_velocity_thread");
269                }
270            }
271
272            let bomb = Bomb(inner);
273            'outer: while !bomb.0.is_dropping() {
274                const DT: f64 = 0.02;
275
276                macro_rules! continue_on_err {
277                    ($expr:expr $(,)?) => {
278                        match $expr {
279                            Ok(x) => x,
280                            Err(_e) => {
281                                continue 'outer;
282                            }
283                        }
284                    };
285                }
286
287                let _guard = ScopedSleep::from_secs(DT);
288                let velocity = bomb.0.velocity.lock().unwrap();
289                let mut pose = continue_on_err!(get_robot_origin(&bomb.0.base_url));
290                let mut rpy = euler_angles_from_quaternion(&pose.quaternion);
291                let yaw = rpy.2;
292                pose.position[0] += (velocity.x * yaw.cos() - velocity.y * yaw.sin()) * DT;
293                pose.position[1] += (velocity.x * yaw.sin() + velocity.y * yaw.cos()) * DT;
294                rpy.2 += velocity.theta * DT;
295                pose.quaternion = quaternion_from_euler_angles(rpy.0, rpy.1, rpy.2);
296                continue_on_err!(send_robot_origin(&bomb.0.base_url, pose));
297            }
298        });
299    }
300
301    pub fn run_send_joint_positions_thread(&self) {
302        if mem::replace(
303            &mut self
304                .0
305                .threads
306                .lock()
307                .unwrap()
308                .has_send_joint_positions_thread,
309            true,
310        ) {
311            panic!("send_joint_positions_thread is running");
312        }
313
314        let inner = self.0.clone();
315        std::thread::spawn(move || {
316            struct Bomb(Arc<UrdfVizWebClientInner>);
317            impl Drop for Bomb {
318                fn drop(&mut self) {
319                    self.0
320                        .threads
321                        .lock()
322                        .unwrap()
323                        .has_send_joint_positions_thread = false;
324                    debug!("terminating send_joint_positions_thread");
325                }
326            }
327
328            let bomb = Bomb(inner);
329            'outer: while !bomb.0.is_dropping() {
330                const UNIT_DURATION: Duration = Duration::from_millis(10);
331                let prev = mem::take(&mut *bomb.0.send_joint_positions_target.lock().unwrap());
332                let (trajectory, sender) = match prev {
333                    SendJointPositionsTarget::Some { trajectory, sender } => (trajectory, sender),
334                    SendJointPositionsTarget::None | SendJointPositionsTarget::Abort => {
335                        sleep(UNIT_DURATION);
336                        continue;
337                    }
338                };
339
340                macro_rules! continue_on_err {
341                    ($expr:expr $(,)?) => {
342                        match $expr {
343                            Ok(x) => x,
344                            Err(e) => {
345                                // Ignore error because WaitFuture may have been dropped.
346                                let _ = sender.send(Err(e));
347                                continue 'outer;
348                            }
349                        }
350                    };
351                }
352
353                let mut last_time = Duration::default();
354                for target in trajectory {
355                    let duration = target.time_from_start - last_time;
356                    last_time = target.time_from_start;
357                    if duration.as_nanos() == 0 {
358                        let start_time = std::time::Instant::now();
359                        let target_state = JointState {
360                            names: bomb.0.joint_names.clone(),
361                            positions: target.positions.clone(),
362                        };
363                        continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
364
365                        let elapsed = start_time.elapsed();
366                        if UNIT_DURATION > elapsed {
367                            let sleep_duration = UNIT_DURATION - elapsed;
368                            sleep(sleep_duration);
369                        }
370                        continue;
371                    }
372
373                    let current = continue_on_err!(get_joint_positions(&bomb.0.base_url)).positions;
374                    let duration_sec = duration.as_secs_f64();
375                    let unit_sec = UNIT_DURATION.as_secs_f64();
376                    let trajectories = continue_on_err!(openrr_planner::interpolate(
377                        &[current, target.positions.to_vec()],
378                        duration_sec,
379                        unit_sec,
380                    )
381                    .ok_or_else(|| arci::Error::InterpolationError("".to_owned())));
382
383                    for traj in trajectories {
384                        if matches!(
385                            *bomb.0.send_joint_positions_target.lock().unwrap(),
386                            SendJointPositionsTarget::Some { .. } | SendJointPositionsTarget::Abort
387                        ) {
388                            debug!("Abort old target");
389                            // Ignore error because WaitFuture may have been dropped.
390                            let _ = sender.send(Ok(()));
391                            continue 'outer;
392                        }
393                        let start_time = std::time::Instant::now();
394                        let target_state = JointState {
395                            names: bomb.0.joint_names.clone(),
396                            positions: traj.position,
397                        };
398                        continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
399                        let elapsed = start_time.elapsed();
400                        if UNIT_DURATION > elapsed {
401                            let sleep_duration = UNIT_DURATION - elapsed;
402                            sleep(sleep_duration);
403                        }
404                    }
405                }
406                // Ignore error because WaitFuture may have been dropped.
407                let _ = sender.send(Ok(()));
408            }
409        });
410    }
411
412    pub fn abort(&self) {
413        *self.0.send_joint_positions_target.lock().unwrap() = SendJointPositionsTarget::Abort;
414    }
415
416    pub fn get_urdf(&self) -> Result<urdf_rs::Robot, arci::Error> {
417        get_urdf(&self.0.base_url)
418    }
419}
420
421impl Default for UrdfVizWebClient {
422    fn default() -> Self {
423        Self::new(Url::parse("http://127.0.0.1:7777").unwrap()).unwrap()
424    }
425}
426
427impl JointTrajectoryClient for UrdfVizWebClient {
428    fn joint_names(&self) -> Vec<String> {
429        self.0.joint_names.clone()
430    }
431
432    fn current_joint_positions(&self) -> Result<Vec<f64>, arci::Error> {
433        Ok(get_joint_positions(&self.0.base_url)?.positions)
434    }
435
436    fn send_joint_positions(
437        &self,
438        positions: Vec<f64>,
439        duration: Duration,
440    ) -> Result<WaitFuture, arci::Error> {
441        if !self
442            .0
443            .threads
444            .lock()
445            .unwrap()
446            .has_send_joint_positions_thread
447        {
448            panic!("send_joint_positions called without run_send_joint_positions_thread being called first");
449        }
450
451        Ok(self
452            .0
453            .send_joint_positions_target
454            .lock()
455            .unwrap()
456            .set_positions(positions, duration))
457    }
458
459    fn send_joint_trajectory(
460        &self,
461        trajectory: Vec<TrajectoryPoint>,
462    ) -> Result<WaitFuture, arci::Error> {
463        if trajectory.is_empty() {
464            return Ok(WaitFuture::ready());
465        }
466
467        Ok(self
468            .0
469            .send_joint_positions_target
470            .lock()
471            .unwrap()
472            .set_trajectory(trajectory))
473    }
474}
475
476impl Localization for UrdfVizWebClient {
477    fn current_pose(&self, _frame_id: &str) -> Result<na::Isometry2<f64>, arci::Error> {
478        let pose = get_robot_origin(&self.0.base_url)?;
479        let yaw = euler_angles_from_quaternion(&pose.quaternion).2;
480        Ok(na::Isometry2::new(
481            na::Vector2::new(pose.position[0], pose.position[1]),
482            yaw,
483        ))
484    }
485}
486
487impl Navigation for UrdfVizWebClient {
488    fn send_goal_pose(
489        &self,
490        goal: na::Isometry2<f64>,
491        _frame_id: &str,
492        _timeout: Duration,
493    ) -> Result<WaitFuture, arci::Error> {
494        // JUMP!
495        send_robot_origin(&self.0.base_url, goal.into())?;
496
497        Ok(WaitFuture::ready())
498    }
499
500    fn cancel(&self) -> Result<(), arci::Error> {
501        todo!()
502    }
503}
504
505impl MoveBase for UrdfVizWebClient {
506    fn send_velocity(&self, velocity: &arci::BaseVelocity) -> Result<(), arci::Error> {
507        if !self.0.threads.lock().unwrap().has_send_velocity_thread {
508            panic!("send_velocity called without run_send_velocity_thread being called first");
509        }
510
511        // Check that the connection works.
512        // TODO: We use this trick because currently there is no way to tell the
513        // main thread when an error has occurred in send_velocity_thread.
514        get_robot_origin(&self.0.base_url)?;
515
516        velocity.clone_into(&mut self.0.velocity.lock().unwrap());
517        Ok(())
518    }
519
520    fn current_velocity(&self) -> Result<arci::BaseVelocity, arci::Error> {
521        Ok(self.0.velocity.lock().unwrap().to_owned())
522    }
523}