1use std::{
2 borrow::Cow,
3 collections::HashMap,
4 mem,
5 sync::{Arc, Mutex},
6 thread::sleep,
7 time::Duration,
8};
9
10use anyhow::format_err;
11use arci::{
12 BaseVelocity, JointPositionLimit, JointPositionLimiter, JointTrajectoryClient,
13 JointVelocityLimiter, Localization, MoveBase, Navigation, TrajectoryPoint, WaitFuture,
14 nalgebra as na,
15};
16use schemars::JsonSchema;
17use scoped_sleep::ScopedSleep;
18use serde::{Deserialize, Serialize};
19use tokio::sync::oneshot;
20use tracing::debug;
21use url::Url;
22
23use crate::utils::*;
24
25#[derive(Debug, Serialize, Deserialize, Clone, JsonSchema)]
26#[serde(deny_unknown_fields)]
27pub struct UrdfVizWebClientConfig {
28 pub name: String,
29 pub joint_names: Option<Vec<String>>,
30 #[serde(default)]
31 pub wrap_with_joint_position_limiter: bool,
32 #[serde(default)]
33 pub wrap_with_joint_velocity_limiter: bool,
34 pub joint_velocity_limits: Option<Vec<f64>>,
35
36 pub joint_position_limits: Option<Vec<JointPositionLimit>>,
40}
41
42pub fn create_joint_trajectory_clients(
49 configs: Vec<UrdfVizWebClientConfig>,
50 urdf_robot: Option<&urdf_rs::Robot>,
51) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
52 create_joint_trajectory_clients_inner(configs, urdf_robot, false)
53}
54
55pub fn create_joint_trajectory_clients_lazy(
59 configs: Vec<UrdfVizWebClientConfig>,
60 urdf_robot: Option<&urdf_rs::Robot>,
61) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
62 create_joint_trajectory_clients_inner(configs, urdf_robot, true)
63}
64
65fn create_joint_trajectory_clients_inner(
66 configs: Vec<UrdfVizWebClientConfig>,
67 urdf_robot: Option<&urdf_rs::Robot>,
68 lazy: bool,
69) -> Result<HashMap<String, Arc<dyn JointTrajectoryClient>>, arci::Error> {
70 if configs.is_empty() {
71 return Ok(HashMap::default());
72 }
73
74 let mut clients = HashMap::new();
75 let mut urdf_robot = urdf_robot.map(Cow::Borrowed);
76
77 let create_all_client = move || {
78 debug!("create_joint_trajectory_clients_inner: creating UrdfVizWebClient");
79 let all_client = UrdfVizWebClient::default();
80 all_client.run_send_joint_positions_thread();
81 Ok(all_client)
82 };
83 let all_client: Arc<dyn JointTrajectoryClient> = if lazy && let Some(urdf_robot) = &urdf_robot {
84 Arc::new(arci::Lazy::with_joint_names(
85 create_all_client,
86 urdf_robot
87 .joints
88 .iter()
89 .filter(|j| j.joint_type != urdf_rs::JointType::Fixed)
90 .map(|j| j.name.clone())
91 .collect(),
92 ))
93 } else {
94 let client = create_all_client()?;
97 if urdf_robot.is_none()
98 && configs.iter().any(|config| {
99 config.wrap_with_joint_position_limiter && config.joint_position_limits.is_none()
100 || config.wrap_with_joint_velocity_limiter
101 && config.joint_velocity_limits.is_none()
102 })
103 {
104 urdf_robot = Some(Cow::Owned(client.get_urdf()?));
105 }
106 Arc::new(client)
107 };
108
109 for config in configs {
110 if config.wrap_with_joint_position_limiter
111 && config.joint_position_limits.is_none()
112 && urdf_robot.is_none()
113 {
114 return Err(format_err!(
115 "`wrap_with_joint_position_limiter=true` requires urdf or joint_position_limits \
116 is specified",
117 )
118 .into());
119 }
120 let client = if let Some(joint_names) = &config.joint_names {
121 Arc::new(arci::PartialJointTrajectoryClient::new(
122 joint_names.to_owned(),
123 all_client.clone(),
124 )?)
125 } else {
126 all_client.clone()
127 };
128 let client: Arc<dyn JointTrajectoryClient> = if config.wrap_with_joint_velocity_limiter {
129 if config.wrap_with_joint_position_limiter {
130 Arc::new(new_joint_position_limiter(
131 new_joint_velocity_limiter(
132 client,
133 config.joint_velocity_limits,
134 urdf_robot.as_deref(),
135 )?,
136 config.joint_position_limits,
137 urdf_robot.as_deref(),
138 )?)
139 } else {
140 Arc::new(new_joint_velocity_limiter(
141 client,
142 config.joint_velocity_limits,
143 urdf_robot.as_deref(),
144 )?)
145 }
146 } else if config.wrap_with_joint_position_limiter {
147 Arc::new(new_joint_position_limiter(
148 client,
149 config.joint_position_limits,
150 urdf_robot.as_deref(),
151 )?)
152 } else {
153 client
154 };
155 if clients.insert(config.name.clone(), client).is_some() {
156 return Err(
157 format_err!("client named '{}' has already been specified", config.name).into(),
158 );
159 }
160 }
161 Ok(clients)
162}
163
164fn new_joint_position_limiter<C>(
165 client: C,
166 position_limits: Option<Vec<JointPositionLimit>>,
167 urdf_robot: Option<&urdf_rs::Robot>,
168) -> Result<JointPositionLimiter<C>, arci::Error>
169where
170 C: JointTrajectoryClient,
171{
172 match position_limits {
173 Some(position_limits) => Ok(JointPositionLimiter::new(client, position_limits)),
174 None => JointPositionLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
175 }
176}
177
178fn new_joint_velocity_limiter<C>(
179 client: C,
180 velocity_limits: Option<Vec<f64>>,
181 urdf_robot: Option<&urdf_rs::Robot>,
182) -> Result<JointVelocityLimiter<C>, arci::Error>
183where
184 C: JointTrajectoryClient,
185{
186 match velocity_limits {
187 Some(velocity_limits) => Ok(JointVelocityLimiter::new(client, velocity_limits)),
188 None => JointVelocityLimiter::from_urdf(client, &urdf_robot.unwrap().joints),
189 }
190}
191
192#[derive(Debug, Default)]
193enum SendJointPositionsTarget {
194 Some {
195 trajectory: Vec<TrajectoryPoint>,
196 sender: oneshot::Sender<Result<(), arci::Error>>,
197 },
198 #[default]
199 None,
200 Abort,
201}
202
203impl SendJointPositionsTarget {
204 fn set_positions(&mut self, positions: Vec<f64>, duration: Duration) -> WaitFuture {
205 self.set_trajectory(vec![TrajectoryPoint::new(positions, duration)])
206 }
207
208 fn set_trajectory(&mut self, trajectory: Vec<TrajectoryPoint>) -> WaitFuture {
209 let (sender, receiver) = oneshot::channel();
210 *self = Self::Some { trajectory, sender };
211 WaitFuture::new(async move { receiver.await.map_err(anyhow::Error::from)? })
212 }
213}
214
215#[derive(Debug, Default)]
216struct ThreadState {
217 has_send_joint_positions_thread: bool,
218 has_send_velocity_thread: bool,
219}
220
221#[derive(Debug, Clone)]
222pub struct UrdfVizWebClient(Arc<UrdfVizWebClientInner>);
223
224#[derive(Debug)]
225struct UrdfVizWebClientInner {
226 base_url: Url,
227 joint_names: Vec<String>,
228 velocity: Mutex<BaseVelocity>,
229 send_joint_positions_target: Mutex<SendJointPositionsTarget>,
230 threads: Mutex<ThreadState>,
231}
232
233impl UrdfVizWebClientInner {
234 fn is_dropping(self: &Arc<Self>) -> bool {
235 let state = self.threads.lock().unwrap();
236 Arc::strong_count(self)
237 <= state.has_send_joint_positions_thread as usize
238 + state.has_send_velocity_thread as usize
239 }
240}
241
242impl UrdfVizWebClient {
243 pub fn new(base_url: Url) -> Result<Self, anyhow::Error> {
244 let joint_state = get_joint_positions(&base_url)?;
245 Ok(Self(Arc::new(UrdfVizWebClientInner {
246 base_url,
247 joint_names: joint_state.names,
248 velocity: Mutex::new(BaseVelocity::default()),
249 send_joint_positions_target: Mutex::new(Default::default()),
250 threads: Mutex::new(ThreadState::default()),
251 })))
252 }
253
254 pub fn run_send_velocity_thread(&self) {
255 if mem::replace(
256 &mut self.0.threads.lock().unwrap().has_send_velocity_thread,
257 true,
258 ) {
259 panic!("send_velocity_thread is running");
260 }
261
262 let inner = self.0.clone();
263 std::thread::spawn(move || {
264 struct Bomb(Arc<UrdfVizWebClientInner>);
265 impl Drop for Bomb {
266 fn drop(&mut self) {
267 self.0.threads.lock().unwrap().has_send_velocity_thread = false;
268 debug!("terminating send_velocity_thread");
269 }
270 }
271
272 let bomb = Bomb(inner);
273 'outer: while !bomb.0.is_dropping() {
274 const DT: f64 = 0.02;
275
276 macro_rules! continue_on_err {
277 ($expr:expr $(,)?) => {
278 match $expr {
279 Ok(x) => x,
280 Err(_e) => {
281 continue 'outer;
282 }
283 }
284 };
285 }
286
287 let _guard = ScopedSleep::from_secs(DT);
288 let velocity = bomb.0.velocity.lock().unwrap();
289 let mut pose = continue_on_err!(get_robot_origin(&bomb.0.base_url));
290 let mut rpy = euler_angles_from_quaternion(&pose.quaternion);
291 let yaw = rpy.2;
292 pose.position[0] += (velocity.x * yaw.cos() - velocity.y * yaw.sin()) * DT;
293 pose.position[1] += (velocity.x * yaw.sin() + velocity.y * yaw.cos()) * DT;
294 rpy.2 += velocity.theta * DT;
295 pose.quaternion = quaternion_from_euler_angles(rpy.0, rpy.1, rpy.2);
296 continue_on_err!(send_robot_origin(&bomb.0.base_url, pose));
297 }
298 });
299 }
300
301 pub fn run_send_joint_positions_thread(&self) {
302 if mem::replace(
303 &mut self
304 .0
305 .threads
306 .lock()
307 .unwrap()
308 .has_send_joint_positions_thread,
309 true,
310 ) {
311 panic!("send_joint_positions_thread is running");
312 }
313
314 let inner = self.0.clone();
315 std::thread::spawn(move || {
316 struct Bomb(Arc<UrdfVizWebClientInner>);
317 impl Drop for Bomb {
318 fn drop(&mut self) {
319 self.0
320 .threads
321 .lock()
322 .unwrap()
323 .has_send_joint_positions_thread = false;
324 debug!("terminating send_joint_positions_thread");
325 }
326 }
327
328 let bomb = Bomb(inner);
329 'outer: while !bomb.0.is_dropping() {
330 const UNIT_DURATION: Duration = Duration::from_millis(10);
331 let prev = mem::take(&mut *bomb.0.send_joint_positions_target.lock().unwrap());
332 let (trajectory, sender) = match prev {
333 SendJointPositionsTarget::Some { trajectory, sender } => (trajectory, sender),
334 SendJointPositionsTarget::None | SendJointPositionsTarget::Abort => {
335 sleep(UNIT_DURATION);
336 continue;
337 }
338 };
339
340 macro_rules! continue_on_err {
341 ($expr:expr $(,)?) => {
342 match $expr {
343 Ok(x) => x,
344 Err(e) => {
345 let _ = sender.send(Err(e));
347 continue 'outer;
348 }
349 }
350 };
351 }
352
353 let mut last_time = Duration::default();
354 for target in trajectory {
355 let duration = target.time_from_start - last_time;
356 last_time = target.time_from_start;
357 if duration.as_nanos() == 0 {
358 let start_time = std::time::Instant::now();
359 let target_state = JointState {
360 names: bomb.0.joint_names.clone(),
361 positions: target.positions.clone(),
362 };
363 continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
364
365 let elapsed = start_time.elapsed();
366 if UNIT_DURATION > elapsed {
367 let sleep_duration = UNIT_DURATION - elapsed;
368 sleep(sleep_duration);
369 }
370 continue;
371 }
372
373 let current = continue_on_err!(get_joint_positions(&bomb.0.base_url)).positions;
374 let duration_sec = duration.as_secs_f64();
375 let unit_sec = UNIT_DURATION.as_secs_f64();
376 let trajectories = continue_on_err!(
377 openrr_planner::interpolate(
378 &[current, target.positions.to_vec()],
379 duration_sec,
380 unit_sec,
381 )
382 .ok_or_else(|| arci::Error::InterpolationError("".to_owned()))
383 );
384
385 for traj in trajectories {
386 if matches!(
387 *bomb.0.send_joint_positions_target.lock().unwrap(),
388 SendJointPositionsTarget::Some { .. } | SendJointPositionsTarget::Abort
389 ) {
390 debug!("Abort old target");
391 let _ = sender.send(Ok(()));
393 continue 'outer;
394 }
395 let start_time = std::time::Instant::now();
396 let target_state = JointState {
397 names: bomb.0.joint_names.clone(),
398 positions: traj.position,
399 };
400 continue_on_err!(send_joint_positions(&bomb.0.base_url, target_state));
401 let elapsed = start_time.elapsed();
402 if UNIT_DURATION > elapsed {
403 let sleep_duration = UNIT_DURATION - elapsed;
404 sleep(sleep_duration);
405 }
406 }
407 }
408 let _ = sender.send(Ok(()));
410 }
411 });
412 }
413
414 pub fn abort(&self) {
415 *self.0.send_joint_positions_target.lock().unwrap() = SendJointPositionsTarget::Abort;
416 }
417
418 pub fn get_urdf(&self) -> Result<urdf_rs::Robot, arci::Error> {
419 get_urdf(&self.0.base_url)
420 }
421}
422
423impl Default for UrdfVizWebClient {
424 fn default() -> Self {
425 Self::new(Url::parse("http://127.0.0.1:7777").unwrap()).unwrap()
426 }
427}
428
429impl JointTrajectoryClient for UrdfVizWebClient {
430 fn joint_names(&self) -> Vec<String> {
431 self.0.joint_names.clone()
432 }
433
434 fn current_joint_positions(&self) -> Result<Vec<f64>, arci::Error> {
435 Ok(get_joint_positions(&self.0.base_url)?.positions)
436 }
437
438 fn send_joint_positions(
439 &self,
440 positions: Vec<f64>,
441 duration: Duration,
442 ) -> Result<WaitFuture, arci::Error> {
443 if !self
444 .0
445 .threads
446 .lock()
447 .unwrap()
448 .has_send_joint_positions_thread
449 {
450 panic!(
451 "send_joint_positions called without run_send_joint_positions_thread being called first"
452 );
453 }
454
455 Ok(self
456 .0
457 .send_joint_positions_target
458 .lock()
459 .unwrap()
460 .set_positions(positions, duration))
461 }
462
463 fn send_joint_trajectory(
464 &self,
465 trajectory: Vec<TrajectoryPoint>,
466 ) -> Result<WaitFuture, arci::Error> {
467 if trajectory.is_empty() {
468 return Ok(WaitFuture::ready());
469 }
470
471 Ok(self
472 .0
473 .send_joint_positions_target
474 .lock()
475 .unwrap()
476 .set_trajectory(trajectory))
477 }
478}
479
480impl Localization for UrdfVizWebClient {
481 fn current_pose(&self, _frame_id: &str) -> Result<na::Isometry2<f64>, arci::Error> {
482 let pose = get_robot_origin(&self.0.base_url)?;
483 let yaw = euler_angles_from_quaternion(&pose.quaternion).2;
484 Ok(na::Isometry2::new(
485 na::Vector2::new(pose.position[0], pose.position[1]),
486 yaw,
487 ))
488 }
489}
490
491impl Navigation for UrdfVizWebClient {
492 fn send_goal_pose(
493 &self,
494 goal: na::Isometry2<f64>,
495 _frame_id: &str,
496 _timeout: Duration,
497 ) -> Result<WaitFuture, arci::Error> {
498 send_robot_origin(&self.0.base_url, goal.into())?;
500
501 Ok(WaitFuture::ready())
502 }
503
504 fn cancel(&self) -> Result<(), arci::Error> {
505 todo!()
506 }
507}
508
509impl MoveBase for UrdfVizWebClient {
510 fn send_velocity(&self, velocity: &arci::BaseVelocity) -> Result<(), arci::Error> {
511 if !self.0.threads.lock().unwrap().has_send_velocity_thread {
512 panic!("send_velocity called without run_send_velocity_thread being called first");
513 }
514
515 get_robot_origin(&self.0.base_url)?;
519
520 velocity.clone_into(&mut self.0.velocity.lock().unwrap());
521 Ok(())
522 }
523
524 fn current_velocity(&self) -> Result<arci::BaseVelocity, arci::Error> {
525 Ok(self.0.velocity.lock().unwrap().to_owned())
526 }
527}