1use std::cell::{Cell, RefCell};
4use std::io::Result as IOResult;
5use std::marker::PhantomData;
6use std::mem;
7use std::os::fd::OwnedFd;
8use std::os::unix::io::{AsFd, AsRawFd, BorrowedFd, RawFd};
9use std::rc::Rc;
10use std::sync::atomic::Ordering;
11use std::sync::{Arc, Condvar, Mutex};
12use std::thread::JoinHandle;
13use std::time::{Duration, Instant};
14
15use calloop::ping::Ping;
16use rustix::event::{PollFd, PollFlags};
17use rustix::pipe::{self, PipeFlags};
18use sctk::reexports::calloop::Error as CalloopError;
19use sctk::reexports::calloop_wayland_source::WaylandSource;
20use sctk::reexports::client::{globals, Connection, QueueHandle};
21use tracing::warn;
22
23use crate::cursor::OnlyCursorImage;
24use crate::dpi::LogicalSize;
25use crate::error::{EventLoopError, OsError as RootOsError};
26use crate::event::{Event, InnerSizeWriter, StartCause, WindowEvent};
27use crate::event_loop::{ActiveEventLoop as RootActiveEventLoop, ControlFlow, DeviceEvents};
28use crate::platform::pump_events::PumpStatus;
29use crate::platform_impl::platform::min_timeout;
30use crate::platform_impl::{
31 ActiveEventLoop as PlatformActiveEventLoop, OsError, PlatformCustomCursor,
32};
33use crate::window::{CustomCursor as RootCustomCursor, CustomCursorSource};
34
35mod proxy;
36pub mod sink;
37
38pub use proxy::EventLoopProxy;
39use sink::EventSink;
40
41use super::state::{WindowCompositorUpdate, WinitState};
42use super::window::state::FrameCallbackState;
43use super::{logical_to_physical_rounded, DeviceId, WaylandError, WindowId};
44
45type WaylandDispatcher = calloop::Dispatcher<'static, WaylandSource<WinitState>, WinitState>;
46
47pub struct EventLoop<T: 'static> {
49 loop_running: bool,
51
52 buffer_sink: EventSink,
53 compositor_updates: Vec<WindowCompositorUpdate>,
54 window_ids: Vec<WindowId>,
55
56 user_events_sender: calloop::channel::Sender<T>,
58
59 pending_user_events: Rc<RefCell<Vec<T>>>,
63
64 wayland_dispatcher: WaylandDispatcher,
67
68 connection: Connection,
70
71 window_target: RootActiveEventLoop,
73
74 event_loop: calloop::EventLoop<'static, WinitState>,
77
78 pump_event_notifier: Option<PumpEventNotifier>,
79}
80
81impl<T: 'static> EventLoop<T> {
82 pub fn new() -> Result<EventLoop<T>, EventLoopError> {
83 macro_rules! map_err {
84 ($e:expr, $err:expr) => {
85 $e.map_err(|error| os_error!($err(error).into()))
86 };
87 }
88
89 let connection = map_err!(Connection::connect_to_env(), WaylandError::Connection)?;
90
91 let (globals, mut event_queue) =
92 map_err!(globals::registry_queue_init(&connection), WaylandError::Global)?;
93 let queue_handle = event_queue.handle();
94
95 let event_loop =
96 map_err!(calloop::EventLoop::<WinitState>::try_new(), WaylandError::Calloop)?;
97
98 let mut winit_state = WinitState::new(&globals, &queue_handle, event_loop.handle())
99 .map_err(|error| os_error!(error))?;
100
101 map_err!(event_queue.roundtrip(&mut winit_state), WaylandError::Dispatch)?;
104
105 let wayland_source = WaylandSource::new(connection.clone(), event_queue);
107 let wayland_dispatcher =
108 calloop::Dispatcher::new(wayland_source, |_, queue, winit_state: &mut WinitState| {
109 let result = queue.dispatch_pending(winit_state);
110 if result.is_ok()
111 && (!winit_state.events_sink.is_empty()
112 || !winit_state.window_compositor_updates.is_empty())
113 {
114 winit_state.dispatched_events = true;
115 }
116 result
117 });
118
119 map_err!(
120 event_loop.handle().register_dispatcher(wayland_dispatcher.clone()),
121 WaylandError::Calloop
122 )?;
123
124 let pending_user_events = Rc::new(RefCell::new(Vec::new()));
126 let pending_user_events_clone = pending_user_events.clone();
127 let (user_events_sender, user_events_channel) = calloop::channel::channel();
128 let result = event_loop
129 .handle()
130 .insert_source(user_events_channel, move |event, _, winit_state: &mut WinitState| {
131 if let calloop::channel::Event::Msg(msg) = event {
132 winit_state.dispatched_events = true;
133 pending_user_events_clone.borrow_mut().push(msg);
134 }
135 })
136 .map_err(|error| error.error);
137 map_err!(result, WaylandError::Calloop)?;
138
139 let (event_loop_awakener, event_loop_awakener_source) = map_err!(
141 calloop::ping::make_ping()
142 .map_err(|error| CalloopError::OtherError(Box::new(error).into())),
143 WaylandError::Calloop
144 )?;
145
146 let result = event_loop
147 .handle()
148 .insert_source(event_loop_awakener_source, move |_, _, winit_state: &mut WinitState| {
149 winit_state.dispatched_events = true;
151 })
152 .map_err(|error| error.error);
153 map_err!(result, WaylandError::Calloop)?;
154
155 let window_target = ActiveEventLoop {
156 connection: connection.clone(),
157 wayland_dispatcher: wayland_dispatcher.clone(),
158 event_loop_awakener,
159 queue_handle,
160 control_flow: Cell::new(ControlFlow::default()),
161 exit: Cell::new(None),
162 state: RefCell::new(winit_state),
163 };
164
165 let event_loop = Self {
166 loop_running: false,
167 compositor_updates: Vec::new(),
168 buffer_sink: EventSink::default(),
169 window_ids: Vec::new(),
170 connection,
171 wayland_dispatcher,
172 user_events_sender,
173 pending_user_events,
174 event_loop,
175 window_target: RootActiveEventLoop {
176 p: PlatformActiveEventLoop::Wayland(window_target),
177 _marker: PhantomData,
178 },
179 pump_event_notifier: None,
180 };
181
182 Ok(event_loop)
183 }
184
185 pub fn run_on_demand<F>(&mut self, mut event_handler: F) -> Result<(), EventLoopError>
186 where
187 F: FnMut(Event<T>, &RootActiveEventLoop),
188 {
189 let exit = loop {
190 match self.pump_events(None, &mut event_handler) {
191 PumpStatus::Exit(0) => {
192 break Ok(());
193 },
194 PumpStatus::Exit(code) => {
195 break Err(EventLoopError::ExitFailure(code));
196 },
197 _ => {
198 continue;
199 },
200 }
201 };
202
203 let _ = self.roundtrip().map_err(EventLoopError::Os);
208
209 exit
210 }
211
212 pub fn pump_events<F>(&mut self, timeout: Option<Duration>, mut callback: F) -> PumpStatus
213 where
214 F: FnMut(Event<T>, &RootActiveEventLoop),
215 {
216 if !self.loop_running {
217 self.loop_running = true;
218
219 self.single_iteration(&mut callback, StartCause::Init);
221 }
222
223 if !self.exiting() {
226 self.poll_events_with_timeout(timeout, &mut callback);
227 }
228 if let Some(code) = self.exit_code() {
229 self.loop_running = false;
230
231 callback(Event::LoopExiting, self.window_target());
232
233 PumpStatus::Exit(code)
234 } else {
235 if timeout.is_some() && self.pump_event_notifier.is_none() {
238 let awakener = match &self.window_target.p {
239 PlatformActiveEventLoop::Wayland(window_target) => {
240 window_target.event_loop_awakener.clone()
241 },
242 #[cfg(x11_platform)]
243 PlatformActiveEventLoop::X(_) => unreachable!(),
244 };
245
246 self.pump_event_notifier =
247 Some(PumpEventNotifier::spawn(self.connection.clone(), awakener));
248 }
249
250 if let Some(pump_event_notifier) = self.pump_event_notifier.as_ref() {
251 *pump_event_notifier.control.0.lock().unwrap() = PumpEventNotifierAction::Monitor;
253 pump_event_notifier.control.1.notify_one();
254 }
255
256 PumpStatus::Continue
257 }
258 }
259
260 pub fn poll_events_with_timeout<F>(&mut self, mut timeout: Option<Duration>, mut callback: F)
261 where
262 F: FnMut(Event<T>, &RootActiveEventLoop),
263 {
264 let cause = loop {
265 let start = Instant::now();
266
267 timeout = {
268 let control_flow_timeout = match self.control_flow() {
269 ControlFlow::Wait => None,
270 ControlFlow::Poll => Some(Duration::ZERO),
271 ControlFlow::WaitUntil(wait_deadline) => {
272 Some(wait_deadline.saturating_duration_since(start))
273 },
274 };
275 min_timeout(control_flow_timeout, timeout)
276 };
277
278 if self.connection.flush().is_err() {
285 self.set_exit_code(1);
286 return;
287 }
288
289 if let Err(error) = self.loop_dispatch(timeout) {
290 let exit_code = error.raw_os_error().unwrap_or(1);
298 self.set_exit_code(exit_code);
299 return;
300 }
301
302 let cause = match self.control_flow() {
305 ControlFlow::Poll => StartCause::Poll,
306 ControlFlow::Wait => StartCause::WaitCancelled { start, requested_resume: None },
307 ControlFlow::WaitUntil(deadline) => {
308 if Instant::now() < deadline {
309 StartCause::WaitCancelled { start, requested_resume: Some(deadline) }
310 } else {
311 StartCause::ResumeTimeReached { start, requested_resume: deadline }
312 }
313 },
314 };
315
316 let dispatched_events = self.with_state(|state| state.dispatched_events);
318 if matches!(cause, StartCause::WaitCancelled { .. })
319 && !dispatched_events
320 && timeout.is_none()
321 {
322 continue;
323 }
324
325 break cause;
326 };
327
328 self.single_iteration(&mut callback, cause);
329 }
330
331 fn single_iteration<F>(&mut self, callback: &mut F, cause: StartCause)
332 where
333 F: FnMut(Event<T>, &RootActiveEventLoop),
334 {
335 let mut compositor_updates = std::mem::take(&mut self.compositor_updates);
342 let mut buffer_sink = std::mem::take(&mut self.buffer_sink);
343 let mut window_ids = std::mem::take(&mut self.window_ids);
344
345 callback(Event::NewEvents(cause), &self.window_target);
346
347 if cause == StartCause::Init {
350 callback(Event::Resumed, &self.window_target);
351 }
352
353 for user_event in self.pending_user_events.borrow_mut().drain(..) {
356 callback(Event::UserEvent(user_event), &self.window_target);
357 }
358
359 self.with_state(|state| compositor_updates.append(&mut state.window_compositor_updates));
361
362 for mut compositor_update in compositor_updates.drain(..) {
363 let window_id = compositor_update.window_id;
364 if compositor_update.scale_changed {
365 let (physical_size, scale_factor) = self.with_state(|state| {
366 let windows = state.windows.get_mut();
367 let window = windows.get(&window_id).unwrap().lock().unwrap();
368 let scale_factor = window.scale_factor();
369 let size = logical_to_physical_rounded(window.inner_size(), scale_factor);
370 (size, scale_factor)
371 });
372
373 let old_physical_size = physical_size;
375
376 let new_inner_size = Arc::new(Mutex::new(physical_size));
377 callback(
378 Event::WindowEvent {
379 window_id: crate::window::WindowId(window_id),
380 event: WindowEvent::ScaleFactorChanged {
381 scale_factor,
382 inner_size_writer: InnerSizeWriter::new(Arc::downgrade(
383 &new_inner_size,
384 )),
385 },
386 },
387 &self.window_target,
388 );
389
390 let physical_size = *new_inner_size.lock().unwrap();
391 drop(new_inner_size);
392
393 if old_physical_size != physical_size {
395 self.with_state(|state| {
396 let windows = state.windows.get_mut();
397 let mut window = windows.get(&window_id).unwrap().lock().unwrap();
398
399 let new_logical_size: LogicalSize<f64> =
400 physical_size.to_logical(scale_factor);
401 window.request_inner_size(new_logical_size.into());
402 });
403
404 compositor_update.resized = true;
406 }
407 }
408
409 if compositor_update.resized || compositor_update.scale_changed {
412 let physical_size = self.with_state(|state| {
413 let windows = state.windows.get_mut();
414 let window = windows.get(&window_id).unwrap().lock().unwrap();
415
416 let scale_factor = window.scale_factor();
417 let size = logical_to_physical_rounded(window.inner_size(), scale_factor);
418
419 state
421 .window_requests
422 .get_mut()
423 .get_mut(&window_id)
424 .unwrap()
425 .redraw_requested
426 .store(true, Ordering::Relaxed);
427
428 size
429 });
430
431 callback(
432 Event::WindowEvent {
433 window_id: crate::window::WindowId(window_id),
434 event: WindowEvent::Resized(physical_size),
435 },
436 &self.window_target,
437 );
438 }
439
440 if compositor_update.close_window {
441 callback(
442 Event::WindowEvent {
443 window_id: crate::window::WindowId(window_id),
444 event: WindowEvent::CloseRequested,
445 },
446 &self.window_target,
447 );
448 }
449 }
450
451 self.with_state(|state| {
453 buffer_sink.append(&mut state.window_events_sink.lock().unwrap());
454 });
455 for event in buffer_sink.drain() {
456 let event = event.map_nonuser_event().unwrap();
457 callback(event, &self.window_target);
458 }
459
460 self.with_state(|state| {
462 buffer_sink.append(&mut state.events_sink);
463 });
464 for event in buffer_sink.drain() {
465 let event = event.map_nonuser_event().unwrap();
466 callback(event, &self.window_target);
467 }
468
469 self.with_state(|state| {
471 window_ids.extend(state.window_requests.get_mut().keys());
472 });
473
474 for window_id in window_ids.iter() {
475 let event = self.with_state(|state| {
476 let window_requests = state.window_requests.get_mut();
477 if window_requests.get(window_id).unwrap().take_closed() {
478 mem::drop(window_requests.remove(window_id));
479 mem::drop(state.windows.get_mut().remove(window_id));
480 return Some(WindowEvent::Destroyed);
481 }
482
483 let mut window =
484 state.windows.get_mut().get_mut(window_id).unwrap().lock().unwrap();
485
486 if window.frame_callback_state() == FrameCallbackState::Requested {
487 return None;
488 }
489
490 window.frame_callback_reset();
492 let mut redraw_requested =
493 window_requests.get(window_id).unwrap().take_redraw_requested();
494
495 redraw_requested |= window.refresh_frame();
497
498 redraw_requested.then_some(WindowEvent::RedrawRequested)
499 });
500
501 if let Some(event) = event {
502 callback(
503 Event::WindowEvent { window_id: crate::window::WindowId(*window_id), event },
504 &self.window_target,
505 );
506 }
507 }
508
509 self.with_state(|state| {
511 state.dispatched_events = false;
512 });
513
514 callback(Event::AboutToWait, &self.window_target);
516
517 let mut wake_up = false;
519 for window_id in window_ids.drain(..) {
520 wake_up |= self.with_state(|state| match state.windows.get_mut().get_mut(&window_id) {
521 Some(window) => {
522 let refresh = window.lock().unwrap().refresh_frame();
523 if refresh {
524 state
525 .window_requests
526 .get_mut()
527 .get_mut(&window_id)
528 .unwrap()
529 .redraw_requested
530 .store(true, Ordering::Relaxed);
531 }
532
533 refresh
534 },
535 None => false,
536 });
537 }
538
539 if wake_up {
544 match &self.window_target.p {
545 PlatformActiveEventLoop::Wayland(window_target) => {
546 window_target.event_loop_awakener.ping();
547 },
548 #[cfg(x11_platform)]
549 PlatformActiveEventLoop::X(_) => unreachable!(),
550 }
551 }
552
553 std::mem::swap(&mut self.compositor_updates, &mut compositor_updates);
554 std::mem::swap(&mut self.buffer_sink, &mut buffer_sink);
555 std::mem::swap(&mut self.window_ids, &mut window_ids);
556 }
557
558 #[inline]
559 pub fn create_proxy(&self) -> EventLoopProxy<T> {
560 EventLoopProxy::new(self.user_events_sender.clone())
561 }
562
563 #[inline]
564 pub fn window_target(&self) -> &RootActiveEventLoop {
565 &self.window_target
566 }
567
568 fn with_state<'a, U: 'a, F: FnOnce(&'a mut WinitState) -> U>(&'a mut self, callback: F) -> U {
569 let state = match &mut self.window_target.p {
570 PlatformActiveEventLoop::Wayland(window_target) => window_target.state.get_mut(),
571 #[cfg(x11_platform)]
572 _ => unreachable!(),
573 };
574
575 callback(state)
576 }
577
578 fn loop_dispatch<D: Into<Option<std::time::Duration>>>(&mut self, timeout: D) -> IOResult<()> {
579 let state = match &mut self.window_target.p {
580 PlatformActiveEventLoop::Wayland(window_target) => window_target.state.get_mut(),
581 #[cfg(feature = "x11")]
582 _ => unreachable!(),
583 };
584
585 self.event_loop.dispatch(timeout, state).map_err(|error| {
586 tracing::error!("Error dispatching event loop: {}", error);
587 error.into()
588 })
589 }
590
591 fn roundtrip(&mut self) -> Result<usize, RootOsError> {
592 let state = match &mut self.window_target.p {
593 PlatformActiveEventLoop::Wayland(window_target) => window_target.state.get_mut(),
594 #[cfg(feature = "x11")]
595 _ => unreachable!(),
596 };
597
598 let mut wayland_source = self.wayland_dispatcher.as_source_mut();
599 let event_queue = wayland_source.queue();
600 event_queue.roundtrip(state).map_err(|error| {
601 os_error!(OsError::WaylandError(Arc::new(WaylandError::Dispatch(error))))
602 })
603 }
604
605 fn control_flow(&self) -> ControlFlow {
606 self.window_target.p.control_flow()
607 }
608
609 fn exiting(&self) -> bool {
610 self.window_target.p.exiting()
611 }
612
613 fn set_exit_code(&self, code: i32) {
614 self.window_target.p.set_exit_code(code)
615 }
616
617 fn exit_code(&self) -> Option<i32> {
618 self.window_target.p.exit_code()
619 }
620}
621
622impl<T> AsFd for EventLoop<T> {
623 fn as_fd(&self) -> BorrowedFd<'_> {
624 self.event_loop.as_fd()
625 }
626}
627
628impl<T> AsRawFd for EventLoop<T> {
629 fn as_raw_fd(&self) -> RawFd {
630 self.event_loop.as_raw_fd()
631 }
632}
633
634pub struct ActiveEventLoop {
635 pub event_loop_awakener: Ping,
637
638 pub queue_handle: QueueHandle<WinitState>,
640
641 pub(crate) control_flow: Cell<ControlFlow>,
643
644 pub(crate) exit: Cell<Option<i32>>,
646
647 pub state: RefCell<WinitState>,
650
651 pub wayland_dispatcher: WaylandDispatcher,
653
654 pub connection: Connection,
656}
657
658impl ActiveEventLoop {
659 pub(crate) fn set_control_flow(&self, control_flow: ControlFlow) {
660 self.control_flow.set(control_flow)
661 }
662
663 pub(crate) fn control_flow(&self) -> ControlFlow {
664 self.control_flow.get()
665 }
666
667 pub(crate) fn exit(&self) {
668 self.exit.set(Some(0))
669 }
670
671 pub(crate) fn clear_exit(&self) {
672 self.exit.set(None)
673 }
674
675 pub(crate) fn exiting(&self) -> bool {
676 self.exit.get().is_some()
677 }
678
679 pub(crate) fn set_exit_code(&self, code: i32) {
680 self.exit.set(Some(code))
681 }
682
683 pub(crate) fn exit_code(&self) -> Option<i32> {
684 self.exit.get()
685 }
686
687 #[inline]
688 pub fn listen_device_events(&self, _allowed: DeviceEvents) {}
689
690 pub(crate) fn create_custom_cursor(&self, cursor: CustomCursorSource) -> RootCustomCursor {
691 RootCustomCursor {
692 inner: PlatformCustomCursor::Wayland(OnlyCursorImage(Arc::from(cursor.inner.0))),
693 }
694 }
695
696 #[cfg(feature = "rwh_05")]
697 #[inline]
698 pub fn raw_display_handle_rwh_05(&self) -> rwh_05::RawDisplayHandle {
699 use sctk::reexports::client::Proxy;
700
701 let mut display_handle = rwh_05::WaylandDisplayHandle::empty();
702 display_handle.display = self.connection.display().id().as_ptr() as *mut _;
703 rwh_05::RawDisplayHandle::Wayland(display_handle)
704 }
705
706 #[cfg(feature = "rwh_06")]
707 #[inline]
708 pub fn raw_display_handle_rwh_06(
709 &self,
710 ) -> Result<rwh_06::RawDisplayHandle, rwh_06::HandleError> {
711 use sctk::reexports::client::Proxy;
712
713 Ok(rwh_06::WaylandDisplayHandle::new({
714 let ptr = self.connection.display().id().as_ptr();
715 std::ptr::NonNull::new(ptr as *mut _).expect("wl_display should never be null")
716 })
717 .into())
718 }
719}
720
721#[derive(Debug)]
722struct PumpEventNotifier {
723 control: Arc<(Mutex<PumpEventNotifierAction>, Condvar)>,
725 worker_waker: Option<OwnedFd>,
727 handle: Option<JoinHandle<()>>,
729}
730
731impl Drop for PumpEventNotifier {
732 fn drop(&mut self) {
733 if let Some(worker_waker) = self.worker_waker.as_ref() {
735 let _ = rustix::io::write(worker_waker.as_fd(), &[0u8]);
736 }
737 *self.control.0.lock().unwrap() = PumpEventNotifierAction::Shutdown;
738 self.control.1.notify_one();
739
740 if let Some(handle) = self.handle.take() {
741 let _ = handle.join();
742 }
743 }
744}
745
746impl PumpEventNotifier {
747 fn spawn(connection: Connection, awakener: Ping) -> Self {
748 let control = Arc::new((Mutex::new(PumpEventNotifierAction::Pause), Condvar::new()));
750 let control_thread = Arc::clone(&control);
751
752 let (read, write) = match pipe::pipe_with(PipeFlags::CLOEXEC | PipeFlags::NONBLOCK) {
753 Ok((read, write)) => (read, write),
754 Err(_) => return Self { control, handle: None, worker_waker: None },
755 };
756
757 let handle =
758 std::thread::Builder::new().name(String::from("pump_events mon")).spawn(move || {
759 let (lock, cvar) = &*control_thread;
760 'outer: loop {
761 let mut wait = lock.lock().unwrap();
762 while *wait == PumpEventNotifierAction::Pause {
763 wait = cvar.wait(wait).unwrap();
764 }
765
766 if *wait == PumpEventNotifierAction::Shutdown {
770 break 'outer;
771 }
772
773 *wait = PumpEventNotifierAction::Pause;
775 drop(wait);
776
777 while let Some(read_guard) = connection.prepare_read() {
778 let _ = connection.flush();
779 let poll_fd = PollFd::from_borrowed_fd(connection.as_fd(), PollFlags::IN);
780 let pipe_poll_fd = PollFd::from_borrowed_fd(read.as_fd(), PollFlags::IN);
781 if Ok(1) == rustix::io::read(read.as_fd(), &mut [0u8; 1]) {
783 break 'outer;
784 }
785 let _ = rustix::event::poll(&mut [poll_fd, pipe_poll_fd], -1);
786 let _ = read_guard.read_without_dispatch();
788 }
789
790 awakener.ping();
791 }
792 });
793
794 if let Some(err) = handle.as_ref().err() {
795 warn!("failed to spawn pump_events wake-up thread: {err}");
796 }
797
798 PumpEventNotifier { control, handle: handle.ok(), worker_waker: Some(write) }
799 }
800}
801
802#[derive(Debug, PartialEq, Eq)]
803enum PumpEventNotifierAction {
804 Monitor,
806 Pause,
808 Shutdown,
810}