//! RDP session manager — connects to Windows RDP servers via IronRDP, //! maintains an RGBA frame buffer per session, and exposes input injection. //! //! Architecture: //! - `RdpService` holds a `DashMap` of active sessions. //! - Each session spawns a tokio task that runs the IronRDP active stage loop, //! reading frames from the server and updating a shared `Vec` (RGBA). //! - The frontend fetches frames via a Tauri command that reads the buffer. //! - Mouse/keyboard input is sent to the session via an mpsc channel. pub mod input; use std::sync::Arc; use base64::Engine; use dashmap::DashMap; use log::{debug, error, info, warn}; use serde::{Deserialize, Serialize}; use tokio::io::{AsyncRead, AsyncWrite}; use tokio::net::TcpStream; use tokio::sync::mpsc; use tokio::sync::Mutex as TokioMutex; use ironrdp::connector::{self, ClientConnector, ConnectionResult, Credentials, DesktopSize}; use ironrdp::graphics::image_processing::PixelFormat; use ironrdp::input::{self as rdp_input, MouseButton, MousePosition, Operation, Scancode, WheelRotations}; use ironrdp::pdu::gcc::KeyboardType; use ironrdp::pdu::rdp::capability_sets::MajorPlatformType; use ironrdp::pdu::rdp::client_info::{PerformanceFlags, TimezoneInfo}; use ironrdp::session::image::DecodedImage; use ironrdp::session::{ActiveStage, ActiveStageOutput}; use ironrdp_tokio::reqwest::ReqwestNetworkClient; use ironrdp_tokio::{split_tokio_framed, FramedWrite, TokioFramed}; use self::input::mouse_flags; // ── Public types ────────────────────────────────────────────────────────────── #[derive(Debug, Deserialize)] #[serde(rename_all = "camelCase")] pub struct RdpConfig { pub hostname: String, pub port: u16, pub username: String, pub password: String, pub domain: Option, pub width: u16, pub height: u16, } #[derive(Debug, Serialize, Clone)] #[serde(rename_all = "camelCase")] pub struct RdpSessionInfo { pub id: String, pub hostname: String, pub width: u16, pub height: u16, pub connected: bool, } /// Input events sent from the frontend to a session's background task. enum InputEvent { Mouse { x: u16, y: u16, flags: u32, }, Key { scancode: u16, pressed: bool, }, Disconnect, } // ── Session handle ──────────────────────────────────────────────────────────── /// A handle to a running RDP session. The actual IronRDP connection runs in a /// background tokio task. This struct holds the shared frame buffer and an input /// channel. struct RdpSessionHandle { id: String, hostname: String, width: u16, height: u16, /// RGBA pixel data — updated by the background task, read by the frontend. frame_buffer: Arc>>, /// Send input events to the background task. input_tx: mpsc::UnboundedSender, } // ── Service ─────────────────────────────────────────────────────────────────── pub struct RdpService { sessions: DashMap>, } impl RdpService { pub fn new() -> Self { Self { sessions: DashMap::new(), } } /// Connect to an RDP server. Returns the session UUID on success. /// /// The entire RDP connection (handshake + active session loop) runs in a /// dedicated thread with its own tokio runtime. This avoids Send/lifetime /// issues with ironrdp's internal trait objects and tokio::spawn. pub fn connect(&self, config: RdpConfig) -> Result { let session_id = uuid::Uuid::new_v4().to_string(); let width = config.width; let height = config.height; let hostname = config.hostname.clone(); // Create shared frame buffer — initialized to opaque black. let buf_size = (width as usize) * (height as usize) * 4; let mut initial_buf = vec![0u8; buf_size]; for pixel in initial_buf.chunks_exact_mut(4) { pixel[3] = 255; } let frame_buffer = Arc::new(TokioMutex::new(initial_buf)); // Create input channel. let (input_tx, input_rx) = mpsc::unbounded_channel(); // Build session handle (accessible from main thread for frame reads + input sends). let handle = Arc::new(RdpSessionHandle { id: session_id.clone(), hostname: hostname.clone(), width, height, frame_buffer: frame_buffer.clone(), input_tx, }); self.sessions.insert(session_id.clone(), handle); // Spawn dedicated thread for the RDP connection + session loop. let sid = session_id.clone(); let sessions_ref = self.sessions.clone(); let (ready_tx, ready_rx) = std::sync::mpsc::channel::>(); std::thread::spawn(move || { let rt = tokio::runtime::Builder::new_current_thread() .enable_all() .build() .unwrap(); rt.block_on(async move { // Build connector config. let connector_config = match build_connector_config(&config) { Ok(c) => c, Err(e) => { let _ = ready_tx.send(Err(format!("Failed to build RDP config: {}", e))); sessions_ref.remove(&sid); return; } }; // Establish connection (TCP + TLS + CredSSP + RDP handshake). let (connection_result, framed) = match establish_connection(connector_config, &config.hostname, config.port).await { Ok(r) => r, Err(e) => { let _ = ready_tx.send(Err(format!("RDP connection failed: {}", e))); sessions_ref.remove(&sid); return; } }; info!("RDP connection established to {}:{} (session {})", config.hostname, config.port, sid); let _ = ready_tx.send(Ok(())); // Run active session loop until disconnect. if let Err(e) = run_active_session( connection_result, framed, frame_buffer, input_rx, width as u16, height as u16, ) .await { error!("RDP session {} error: {}", sid, e); } info!("RDP session {} ended", sid); sessions_ref.remove(&sid); }); }); // Wait for the connection to establish or fail. match ready_rx.recv() { Ok(Ok(())) => {} Ok(Err(e)) => { self.sessions.remove(&session_id); return Err(e); } Err(_) => { self.sessions.remove(&session_id); return Err("RDP connection thread died unexpectedly".into()); } } Ok(session_id) } /// Get the current frame buffer as base64-encoded RGBA data. pub async fn get_frame(&self, session_id: &str) -> Result { let handle = self .sessions .get(session_id) .ok_or_else(|| format!("RDP session {} not found", session_id))?; let buf = handle.frame_buffer.lock().await; let encoded = base64::engine::general_purpose::STANDARD.encode(&*buf); Ok(encoded) } /// Get the raw frame buffer bytes (for potential future optimization). pub async fn get_frame_raw(&self, session_id: &str) -> Result, String> { let handle = self .sessions .get(session_id) .ok_or_else(|| format!("RDP session {} not found", session_id))?; let buf = handle.frame_buffer.lock().await; Ok(buf.clone()) } /// Send a mouse event to the RDP session. /// /// The `flags` parameter uses MS-RDPBCGR mouse event flags (see `input::mouse_flags`). /// The frontend should construct these from DOM mouse events. pub fn send_mouse(&self, session_id: &str, x: u16, y: u16, flags: u32) -> Result<(), String> { let handle = self .sessions .get(session_id) .ok_or_else(|| format!("RDP session {} not found", session_id))?; handle .input_tx .send(InputEvent::Mouse { x, y, flags }) .map_err(|_| format!("RDP session {} input channel closed", session_id)) } /// Send a keyboard event to the RDP session. /// /// `scancode` is the RDP hardware scancode (use `input::js_key_to_scancode` /// on the frontend side or pass it through). `pressed` indicates key-down /// vs key-up. pub fn send_key(&self, session_id: &str, scancode: u16, pressed: bool) -> Result<(), String> { let handle = self .sessions .get(session_id) .ok_or_else(|| format!("RDP session {} not found", session_id))?; handle .input_tx .send(InputEvent::Key { scancode, pressed }) .map_err(|_| format!("RDP session {} input channel closed", session_id)) } /// Disconnect an RDP session. pub fn disconnect(&self, session_id: &str) -> Result<(), String> { let handle = self .sessions .get(session_id) .ok_or_else(|| format!("RDP session {} not found", session_id))?; // Send disconnect signal — the background task will clean up. let _ = handle.input_tx.send(InputEvent::Disconnect); // Remove from map immediately so no new commands target it. drop(handle); self.sessions.remove(session_id); info!("RDP session {} disconnect requested", session_id); Ok(()) } /// List all active RDP sessions. pub fn list_sessions(&self) -> Vec { self.sessions .iter() .map(|entry| { let h = entry.value(); RdpSessionInfo { id: h.id.clone(), hostname: h.hostname.clone(), width: h.width, height: h.height, connected: !h.input_tx.is_closed(), } }) .collect() } } // Clone the DashMap reference for use in spawned tasks. impl Clone for RdpService { fn clone(&self) -> Self { // This is intentionally a shallow clone — we want to share the same // sessions map. But since DashMap doesn't implement Clone directly in // a way we can use here, we use a different approach: the service // itself is stored in AppState and accessed via State. // The Clone here is only needed if we want to pass a reference to // spawned tasks, which we handle via Arc internally. unreachable!("RdpService should not be cloned — access via State"); } } // ── Connection establishment ────────────────────────────────────────────────── /// Build the IronRDP `connector::Config` from our simplified `RdpConfig`. fn build_connector_config(config: &RdpConfig) -> Result { Ok(connector::Config { credentials: Credentials::UsernamePassword { username: config.username.clone(), password: config.password.clone(), }, domain: config.domain.clone(), enable_tls: false, enable_credssp: true, keyboard_type: KeyboardType::IbmEnhanced, keyboard_subtype: 0, keyboard_layout: 0, keyboard_functional_keys_count: 12, ime_file_name: String::new(), dig_product_id: String::new(), desktop_size: DesktopSize { width: config.width, height: config.height, }, bitmap: None, client_build: 0, client_name: "Wraith Desktop".to_owned(), client_dir: "C:\\Windows\\System32\\mstscax.dll".to_owned(), #[cfg(windows)] platform: MajorPlatformType::WINDOWS, #[cfg(target_os = "macos")] platform: MajorPlatformType::MACINTOSH, #[cfg(target_os = "linux")] platform: MajorPlatformType::UNIX, #[cfg(not(any(windows, target_os = "macos", target_os = "linux")))] platform: MajorPlatformType::UNIX, enable_server_pointer: true, pointer_software_rendering: true, request_data: None, autologon: false, enable_audio_playback: false, performance_flags: PerformanceFlags::default(), desktop_scale_factor: 0, hardware_id: None, license_cache: None, timezone_info: TimezoneInfo::default(), }) } /// Trait alias for types that implement both AsyncRead and AsyncWrite. trait AsyncReadWrite: AsyncRead + AsyncWrite + 'static {} impl AsyncReadWrite for T {} type UpgradedFramed = TokioFramed>; /// Perform the full RDP connection: TCP -> TLS upgrade -> CredSSP -> RDP handshake. async fn establish_connection( config: connector::Config, hostname: &str, port: u16, ) -> Result<(ConnectionResult, UpgradedFramed), String> { // Resolve and connect TCP. let addr = format!("{}:{}", hostname, port); let stream = TcpStream::connect(&addr) .await .map_err(|e| format!("TCP connect to {} failed: {}", addr, e))?; let client_addr = stream .local_addr() .map_err(|e| format!("Failed to get local address: {}", e))?; let mut framed = TokioFramed::new(stream); let mut connector = ClientConnector::new(config, client_addr); // Phase 1: Initial connection (pre-TLS). let should_upgrade = ironrdp_tokio::connect_begin(&mut framed, &mut connector) .await .map_err(|e| format!("RDP connect_begin failed: {}", e))?; debug!("RDP TLS upgrade starting for {}", hostname); // Phase 2: TLS upgrade. let (initial_stream, leftover_bytes) = framed.into_inner(); let (tls_stream, tls_cert) = ironrdp_tls::upgrade(initial_stream, hostname) .await .map_err(|e| format!("TLS upgrade failed: {}", e))?; let upgraded = ironrdp_tokio::mark_as_upgraded(should_upgrade, &mut connector); // Wrap the TLS stream in an erased box for the framed type. let erased_stream: Box = Box::new(tls_stream); let mut upgraded_framed = TokioFramed::new_with_leftover(erased_stream, leftover_bytes); // Phase 3: CredSSP + finalize. let server_public_key = ironrdp_tls::extract_tls_server_public_key(&tls_cert) .ok_or_else(|| "Failed to extract TLS server public key".to_string())? .to_owned(); let connection_result = ironrdp_tokio::connect_finalize( upgraded, connector, &mut upgraded_framed, &mut ReqwestNetworkClient::new(), hostname.into(), server_public_key, None, // No Kerberos config ) .await .map_err(|e| format!("RDP connect_finalize failed: {}", e))?; debug!("RDP connection finalized for {}", hostname); Ok((connection_result, upgraded_framed)) } // ── Active session loop ─────────────────────────────────────────────────────── /// Run the active RDP session loop — processes incoming frames and outgoing input. async fn run_active_session( connection_result: ConnectionResult, framed: UpgradedFramed, frame_buffer: Arc>>, mut input_rx: mpsc::UnboundedReceiver, width: u16, height: u16, ) -> Result<(), String> { let (mut reader, mut writer) = split_tokio_framed(framed); let mut image = DecodedImage::new(PixelFormat::RgbA32, width, height); let mut active_stage = ActiveStage::new(connection_result); let mut input_db = rdp_input::Database::new(); loop { let outputs = tokio::select! { // Read a PDU from the server. frame = reader.read_pdu() => { let (action, payload) = frame .map_err(|e| format!("Failed to read RDP frame: {}", e))?; active_stage .process(&mut image, action, &payload) .map_err(|e| format!("Failed to process RDP frame: {}", e))? } // Receive input from the frontend. input_event = input_rx.recv() => { match input_event { Some(InputEvent::Disconnect) | None => { info!("RDP session disconnect signal received"); // Attempt graceful shutdown. match active_stage.graceful_shutdown() { Ok(outputs) => { for out in outputs { if let ActiveStageOutput::ResponseFrame(frame) = out { let _ = writer.write_all(&frame).await; } } } Err(e) => { warn!("Graceful RDP shutdown failed: {}", e); } } return Ok(()); } Some(InputEvent::Mouse { x, y, flags }) => { let ops = translate_mouse_flags(x, y, flags); let events = input_db.apply(ops); active_stage .process_fastpath_input(&mut image, &events) .map_err(|e| format!("Failed to process mouse input: {}", e))? } Some(InputEvent::Key { scancode, pressed }) => { let sc = Scancode::from_u16(scancode); let op = if pressed { Operation::KeyPressed(sc) } else { Operation::KeyReleased(sc) }; let events = input_db.apply([op]); active_stage .process_fastpath_input(&mut image, &events) .map_err(|e| format!("Failed to process keyboard input: {}", e))? } } } }; // Process outputs from the active stage. for out in outputs { match out { ActiveStageOutput::ResponseFrame(frame) => { writer .write_all(&frame) .await .map_err(|e| format!("Failed to write RDP response frame: {}", e))?; } ActiveStageOutput::GraphicsUpdate(_region) => { // Copy the decoded image data into the shared frame buffer. let mut buf = frame_buffer.lock().await; let src = image.data(); let dst_len = buf.len(); if src.len() == dst_len { buf.copy_from_slice(src); } else { // Desktop size may have changed — resize the buffer. *buf = src.to_vec(); } } ActiveStageOutput::Terminate(reason) => { info!("RDP session terminated: {:?}", reason); return Ok(()); } ActiveStageOutput::DeactivateAll(_connection_activation) => { // The server requested deactivation-reactivation. For now, // log and continue — a full implementation would re-run // the connection activation sequence. warn!("RDP server sent DeactivateAll — reconnection not yet implemented"); return Ok(()); } // Pointer events — we could emit these to the frontend for // custom cursor rendering, but for now we just log them. ActiveStageOutput::PointerDefault => { debug!("RDP pointer: default"); } ActiveStageOutput::PointerHidden => { debug!("RDP pointer: hidden"); } ActiveStageOutput::PointerPosition { x, y } => { debug!("RDP pointer position: ({}, {})", x, y); } ActiveStageOutput::PointerBitmap(_) => { debug!("RDP pointer bitmap received"); } _ => { // Future variants (MultitransportRequest, etc.) } } } } } // ── Input translation ───────────────────────────────────────────────────────── /// Translate MS-RDPBCGR mouse flags into IronRDP `Operation` values. /// /// The frontend sends raw MS-RDPBCGR flags so this mapping is straightforward. fn translate_mouse_flags(x: u16, y: u16, flags: u32) -> Vec { let mut ops = Vec::new(); let pos = MousePosition { x, y }; // Always include a move operation if the MOVE flag is set. if flags & mouse_flags::MOVE != 0 { ops.push(Operation::MouseMove(pos)); } // Check for button press/release. let is_down = flags & mouse_flags::DOWN != 0; if flags & mouse_flags::BUTTON1 != 0 { if is_down { ops.push(Operation::MouseButtonPressed(MouseButton::Left)); } else { ops.push(Operation::MouseButtonReleased(MouseButton::Left)); } } if flags & mouse_flags::BUTTON2 != 0 { if is_down { ops.push(Operation::MouseButtonPressed(MouseButton::Right)); } else { ops.push(Operation::MouseButtonReleased(MouseButton::Right)); } } if flags & mouse_flags::BUTTON3 != 0 { if is_down { ops.push(Operation::MouseButtonPressed(MouseButton::Middle)); } else { ops.push(Operation::MouseButtonReleased(MouseButton::Middle)); } } // Wheel events. if flags & mouse_flags::WHEEL != 0 { let negative = flags & mouse_flags::WHEEL_NEG != 0; let units: i16 = if negative { -120 } else { 120 }; ops.push(Operation::WheelRotations(WheelRotations { is_vertical: true, rotation_units: units, })); } if flags & mouse_flags::HWHEEL != 0 { let negative = flags & mouse_flags::WHEEL_NEG != 0; let units: i16 = if negative { -120 } else { 120 }; ops.push(Operation::WheelRotations(WheelRotations { is_vertical: false, rotation_units: units, })); } // If no specific operation was generated but we have coordinates, treat // it as a plain mouse move (some frontends send move without the flag). if ops.is_empty() { ops.push(Operation::MouseMove(pos)); } ops }