diff --git a/src-tauri/src/rdp/mod.rs b/src-tauri/src/rdp/mod.rs index 1e03fd0..a75a1ab 100644 --- a/src-tauri/src/rdp/mod.rs +++ b/src-tauri/src/rdp/mod.rs @@ -13,7 +13,7 @@ use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::sync::mpsc; -use tokio::sync::Mutex as TokioMutex; + use ironrdp::connector::{self, ClientConnector, ConnectionResult, Credentials, DesktopSize}; use ironrdp::graphics::image_processing::PixelFormat; @@ -71,15 +71,10 @@ struct RdpSessionHandle { hostname: String, width: u16, height: u16, - /// Double-buffered: back_buffer is written by the RDP thread, - /// front_buffer is read by the IPC command. Swap on GraphicsUpdate - /// so reads never block writes. Arcs kept alive via struct ownership. + /// Frame buffer: RDP thread writes via RwLock write, IPC reads via RwLock read. + /// Brief write-lock per GraphicsUpdate, concurrent reads for get_frame. front_buffer: Arc>>, - #[allow(dead_code)] - back_buffer: Arc>>, frame_dirty: Arc, - #[allow(dead_code)] - frame_generation: Arc, input_tx: mpsc::UnboundedSender, } @@ -106,10 +101,8 @@ impl RdpService { for pixel in initial_buf.chunks_exact_mut(4) { pixel[3] = 255; } - let front_buffer = Arc::new(std::sync::RwLock::new(initial_buf.clone())); - let back_buffer = Arc::new(TokioMutex::new(initial_buf)); + let front_buffer = Arc::new(std::sync::RwLock::new(initial_buf)); let frame_dirty = Arc::new(AtomicBool::new(false)); - let frame_generation = Arc::new(std::sync::atomic::AtomicU64::new(0)); let (input_tx, input_rx) = mpsc::unbounded_channel(); @@ -119,9 +112,7 @@ impl RdpService { width, height, front_buffer: front_buffer.clone(), - back_buffer: back_buffer.clone(), frame_dirty: frame_dirty.clone(), - frame_generation: frame_generation.clone(), input_tx, }); @@ -167,10 +158,8 @@ impl RdpService { if let Err(e) = run_active_session( connection_result, framed, - back_buffer, front_buffer, frame_dirty, - frame_generation, input_rx, width as u16, height as u16, @@ -319,7 +308,11 @@ fn build_connector_config(config: &RdpConfig) -> Result>>, front_buffer: Arc>>, frame_dirty: Arc, frame_generation: Arc, mut input_rx: mpsc::UnboundedReceiver, width: u16, height: u16, app_handle: tauri::AppHandle, session_id: String) -> Result<(), String> { +async fn run_active_session(connection_result: ConnectionResult, framed: UpgradedFramed, front_buffer: Arc>>, frame_dirty: Arc, mut input_rx: mpsc::UnboundedReceiver, width: u16, height: u16, app_handle: tauri::AppHandle, session_id: String) -> Result<(), String> { let (mut reader, mut writer) = split_tokio_framed(framed); let mut image = DecodedImage::new(PixelFormat::RgbA32, width, height); let mut active_stage = ActiveStage::new(connection_result); @@ -408,24 +401,16 @@ async fn run_active_session(connection_result: ConnectionResult, framed: Upgrade match out { ActiveStageOutput::ResponseFrame(frame) => { writer.write_all(&frame).await.map_err(|e| format!("Failed to write RDP response frame: {}", e))?; } ActiveStageOutput::GraphicsUpdate(_region) => { - // Write to back buffer (async mutex, no contention with reads) + // Write decoded image directly to front buffer. + // Single RwLock write — readers use read lock, no contention. { - let mut buf = back_buffer.lock().await; let src = image.data(); - if src.len() == buf.len() { buf.copy_from_slice(src); } else { *buf = src.to_vec(); } - } - // Swap into front buffer (RwLock write — brief, readers use read lock) - { - let back = back_buffer.lock().await; let mut front = front_buffer.write().unwrap_or_else(|e| e.into_inner()); - front.copy_from_slice(&back); + if src.len() == front.len() { front.copy_from_slice(src); } else { *front = src.to_vec(); } } - let frame_gen = frame_generation.fetch_add(1, Ordering::Release); frame_dirty.store(true, Ordering::Release); - // Throttle: only emit event every other frame to avoid flooding IPC - if frame_gen % 2 == 0 { - let _ = app_handle.emit(&format!("rdp:frame:{}", session_id), ()); - } + // Signal frontend — rAF coalescing prevents flood + let _ = app_handle.emit(&format!("rdp:frame:{}", session_id), ()); } ActiveStageOutput::Terminate(reason) => { info!("RDP session terminated: {:?}", reason); return Ok(()); } ActiveStageOutput::DeactivateAll(_) => { warn!("RDP server sent DeactivateAll — reconnection not yet implemented"); return Ok(()); }