1use futures::{channel::oneshot, TryFutureExt};
2use std::{
3 ffi::CString,
4 future::Future,
5 mem::MaybeUninit,
6 sync::{Mutex, Weak},
7};
8
9use crate::{error::*, msg_types::*, QosProfile};
10use r2r_rcl::*;
11
12pub struct Client<T>
17where
18 T: WrappedServiceTypeSupport,
19{
20 client: Weak<Mutex<TypedClient<T>>>,
21}
22
23impl<T: 'static> Client<T>
24where
25 T: WrappedServiceTypeSupport,
26{
27 pub fn request(&self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
31 where
32 T: WrappedServiceTypeSupport,
33 {
34 let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
36 let mut client = client.lock().unwrap();
37 client.request(msg)
38 }
39}
40
41pub struct ClientUntyped {
49 client: Weak<Mutex<UntypedClient_>>,
50}
51
52impl ClientUntyped {
53 pub fn request(
60 &self, msg: serde_json::Value,
61 ) -> Result<impl Future<Output = Result<Result<serde_json::Value>>>> {
62 let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
64 let mut client = client.lock().unwrap();
65 client.request(msg)
66 }
67}
68
69pub fn make_client<T>(client: Weak<Mutex<TypedClient<T>>>) -> Client<T>
70where
71 T: WrappedServiceTypeSupport,
72{
73 Client { client }
74}
75
76pub fn make_untyped_client(client: Weak<Mutex<UntypedClient_>>) -> ClientUntyped {
77 ClientUntyped { client }
78}
79
80unsafe impl<T> Send for TypedClient<T> where T: WrappedServiceTypeSupport {}
81
82impl<T: 'static> TypedClient<T>
83where
84 T: WrappedServiceTypeSupport,
85{
86 pub fn request(&mut self, msg: &T::Request) -> Result<impl Future<Output = Result<T::Response>>>
87 where
88 T: WrappedServiceTypeSupport,
89 {
90 let native_msg: WrappedNativeMsg<T::Request> = WrappedNativeMsg::<T::Request>::from(msg);
91 let mut seq_no = 0i64;
92 let result =
93 unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
94
95 let (sender, receiver) = oneshot::channel::<T::Response>();
96
97 if result == RCL_RET_OK as i32 {
98 self.response_channels.push((seq_no, sender));
99 Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
101 } else {
102 log::error!("could not send request {}", result);
103 Err(Error::from_rcl_error(result))
104 }
105 }
106}
107
108unsafe impl Send for UntypedClient_ {}
109
110impl UntypedClient_ {
111 pub fn request(
112 &mut self, msg: serde_json::Value,
113 ) -> Result<impl Future<Output = Result<Result<serde_json::Value>>>> {
114 let native_msg = (self.service_type.make_request_msg)();
115 native_msg.from_json(msg)?;
116
117 let mut seq_no = 0i64;
118 let result =
119 unsafe { rcl_send_request(&self.rcl_handle, native_msg.void_ptr(), &mut seq_no) };
120
121 let (sender, receiver) = oneshot::channel::<Result<serde_json::Value>>();
122
123 if result == RCL_RET_OK as i32 {
124 self.response_channels.push((seq_no, sender));
125 Ok(receiver.map_err(|_| Error::RCL_RET_CLIENT_INVALID))
127 } else {
128 log::error!("could not send request {}", result);
129 Err(Error::from_rcl_error(result))
130 }
131 }
132}
133
134pub trait Client_ {
135 fn handle(&self) -> &rcl_client_t;
136 fn handle_response(&mut self);
137 fn register_poll_available(&mut self, s: oneshot::Sender<()>);
138 fn poll_available(&mut self, node: &mut rcl_node_t);
139 fn destroy(&mut self, node: &mut rcl_node_t);
140}
141
142pub struct TypedClient<T>
143where
144 T: WrappedServiceTypeSupport,
145{
146 pub rcl_handle: rcl_client_t,
147 pub response_channels: Vec<(i64, oneshot::Sender<T::Response>)>,
148 pub poll_available_channels: Vec<oneshot::Sender<()>>,
149}
150
151impl<T: 'static> Client_ for TypedClient<T>
152where
153 T: WrappedServiceTypeSupport,
154{
155 fn handle(&self) -> &rcl_client_t {
156 &self.rcl_handle
157 }
158
159 fn handle_response(&mut self) {
160 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
161 let mut response_msg = WrappedNativeMsg::<T::Response>::new();
162
163 let ret = unsafe {
164 rcl_take_response(
165 &self.rcl_handle,
166 request_id.as_mut_ptr(),
167 response_msg.void_ptr_mut(),
168 )
169 };
170 if ret == RCL_RET_OK as i32 {
171 let request_id = unsafe { request_id.assume_init() };
172 if let Some(idx) = self
173 .response_channels
174 .iter()
175 .position(|(id, _)| id == &request_id.sequence_number)
176 {
177 let (_, sender) = self.response_channels.swap_remove(idx);
178 let response = T::Response::from_native(&response_msg);
179 match sender.send(response) {
180 Ok(()) => {}
181 Err(e) => {
182 log::debug!("error sending to client: {:?}", e);
183 }
184 }
185 } else {
186 let we_have: String = self
187 .response_channels
188 .iter()
189 .map(|(id, _)| id.to_string())
190 .collect::<Vec<_>>()
191 .join(",");
192 log::error!(
193 "no such req id: {}, we have [{}], ignoring",
194 request_id.sequence_number,
195 we_have
196 );
197 }
198 } }
200
201 fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
202 self.poll_available_channels.push(s);
203 }
204
205 fn poll_available(&mut self, node: &mut rcl_node_t) {
206 if self.poll_available_channels.is_empty() {
207 return;
208 }
209 let available = service_available_helper(node, self.handle());
210 match available {
211 Ok(true) => {
212 while let Some(sender) = self.poll_available_channels.pop() {
214 let _res = sender.send(()); }
216 }
217 Ok(false) => {
218 }
220 Err(_) => {
221 self.poll_available_channels.clear();
223 }
224 }
225 }
226
227 fn destroy(&mut self, node: &mut rcl_node_t) {
228 unsafe {
229 rcl_client_fini(&mut self.rcl_handle, node);
230 }
231 }
232}
233
234pub struct UntypedClient_ {
235 pub service_type: UntypedServiceSupport,
236 pub rcl_handle: rcl_client_t,
237 pub response_channels: Vec<(i64, oneshot::Sender<Result<serde_json::Value>>)>,
238 pub poll_available_channels: Vec<oneshot::Sender<()>>,
239}
240
241impl Client_ for UntypedClient_ {
242 fn handle(&self) -> &rcl_client_t {
243 &self.rcl_handle
244 }
245
246 fn handle_response(&mut self) {
247 let mut request_id = MaybeUninit::<rmw_request_id_t>::uninit();
248 let mut response_msg = (self.service_type.make_response_msg)();
249
250 let ret = unsafe {
251 rcl_take_response(
252 &self.rcl_handle,
253 request_id.as_mut_ptr(),
254 response_msg.void_ptr_mut(),
255 )
256 };
257 if ret == RCL_RET_OK as i32 {
258 let request_id = unsafe { request_id.assume_init() };
259 if let Some(idx) = self
260 .response_channels
261 .iter()
262 .position(|(id, _)| id == &request_id.sequence_number)
263 {
264 let (_, sender) = self.response_channels.swap_remove(idx);
265 let response = response_msg.to_json();
266 match sender.send(response) {
267 Ok(()) => {}
268 Err(e) => {
269 log::debug!("error sending to client: {:?}", e);
270 }
271 }
272 } else {
273 let we_have: String = self
274 .response_channels
275 .iter()
276 .map(|(id, _)| id.to_string())
277 .collect::<Vec<_>>()
278 .join(",");
279 log::error!(
280 "no such req id: {}, we have [{}], ignoring",
281 request_id.sequence_number,
282 we_have
283 );
284 }
285 } }
287
288 fn register_poll_available(&mut self, s: oneshot::Sender<()>) {
289 self.poll_available_channels.push(s);
290 }
291
292 fn poll_available(&mut self, node: &mut rcl_node_t) {
293 if self.poll_available_channels.is_empty() {
294 return;
295 }
296 let available = service_available_helper(node, self.handle());
297 match available {
298 Ok(true) => {
299 while let Some(sender) = self.poll_available_channels.pop() {
301 let _res = sender.send(()); }
303 }
304 Ok(false) => {
305 }
307 Err(_) => {
308 self.poll_available_channels.clear();
310 }
311 }
312 }
313
314 fn destroy(&mut self, node: &mut rcl_node_t) {
315 unsafe {
316 rcl_client_fini(&mut self.rcl_handle, node);
317 }
318 }
319}
320
321pub fn create_client_helper(
322 node: *mut rcl_node_t, service_name: &str, service_ts: *const rosidl_service_type_support_t,
323 qos_profile: QosProfile,
324) -> Result<rcl_client_t> {
325 let mut client_handle = unsafe { rcl_get_zero_initialized_client() };
326 let service_name_c_string =
327 CString::new(service_name).map_err(|_| Error::RCL_RET_INVALID_ARGUMENT)?;
328
329 let result = unsafe {
330 let mut client_options = rcl_client_get_default_options();
331 client_options.qos = qos_profile.into();
332 rcl_client_init(
333 &mut client_handle,
334 node,
335 service_ts,
336 service_name_c_string.as_ptr(),
337 &client_options,
338 )
339 };
340 if result == RCL_RET_OK as i32 {
341 Ok(client_handle)
342 } else {
343 Err(Error::from_rcl_error(result))
344 }
345}
346
347pub fn service_available_helper(node: &mut rcl_node_t, client: &rcl_client_t) -> Result<bool> {
348 let mut avail = false;
349 let result = unsafe { rcl_service_server_is_available(node, client, &mut avail) };
350
351 if result == RCL_RET_OK as i32 {
352 Ok(avail)
353 } else {
354 Err(Error::from_rcl_error(result))
355 }
356}
357
358use crate::nodes::IsAvailablePollable;
359
360impl<T: 'static> IsAvailablePollable for Client<T>
361where
362 T: WrappedServiceTypeSupport,
363{
364 fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
365 let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
366 let mut client = client.lock().unwrap();
367 client.register_poll_available(sender);
368 Ok(())
369 }
370}
371
372impl IsAvailablePollable for ClientUntyped {
373 fn register_poll_available(&self, sender: oneshot::Sender<()>) -> Result<()> {
374 let client = self.client.upgrade().ok_or(Error::RCL_RET_CLIENT_INVALID)?;
375 let mut client = client.lock().unwrap();
376 client.register_poll_available(sender);
377 Ok(())
378 }
379}