perf: double-buffered RDP frames + frontend rAF throttling
All checks were successful
Build & Sign Wraith / Build Windows + Sign (push) Successful in 4m20s

Root cause: RDP was unresponsive due to frame pipeline bottleneck.
- get_frame() held tokio::Mutex while cloning 8.3MB, blocking the
  RDP session thread from writing new frames (mutex contention)
- Frontend fetched on every backend event with no coalescing
- Every GraphicsUpdate emitted an IPC event, flooding the frontend

Fix:
- Double-buffer: back_buffer (tokio::Mutex, write path) and
  front_buffer (std::sync::RwLock, read path) — reads never block writes
- get_frame() now synchronous, reads from front_buffer via RwLock
- Backend throttles frame events to every other GraphicsUpdate
- Frontend coalesces events via requestAnimationFrame
- RdpView props now reactive (computed) for correct resize behavior
- rdp_get_frame command no longer async (no .await needed)
- screenshot_png_base64 no longer async

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
This commit is contained in:
Vantz Stockwell 2026-03-30 09:19:40 -04:00
parent c2afb6a50f
commit a2770d3edf
7 changed files with 133 additions and 54 deletions

View File

@ -25,11 +25,11 @@ pub fn connect_rdp(
/// Pixel format: RGBA, 4 bytes per pixel, row-major, top-left origin.
/// Returns empty payload if frame hasn't changed since last call.
#[tauri::command]
pub async fn rdp_get_frame(
pub fn rdp_get_frame(
session_id: String,
state: State<'_, AppState>,
) -> Result<Response, String> {
let frame = state.rdp.get_frame(&session_id).await?;
let frame = state.rdp.get_frame(&session_id)?;
Ok(Response::new(frame))
}

View File

@ -434,27 +434,49 @@ impl ConnectionService {
/// Batch-update sort_order for a list of connection IDs.
pub fn reorder_connections(&self, ids: &[i64]) -> Result<(), String> {
let conn = self.db.conn();
for (i, id) in ids.iter().enumerate() {
conn.execute(
"UPDATE connections SET sort_order = ?1 WHERE id = ?2",
params![i as i64, id],
)
.map_err(|e| format!("Failed to reorder connection {id}: {e}"))?;
conn.execute_batch("BEGIN")
.map_err(|e| format!("Failed to begin reorder transaction: {e}"))?;
let result = (|| {
for (i, id) in ids.iter().enumerate() {
conn.execute(
"UPDATE connections SET sort_order = ?1 WHERE id = ?2",
params![i as i64, id],
)
.map_err(|e| format!("Failed to reorder connection {id}: {e}"))?;
}
Ok(())
})();
if result.is_err() {
let _ = conn.execute_batch("ROLLBACK");
} else {
conn.execute_batch("COMMIT")
.map_err(|e| format!("Failed to commit reorder transaction: {e}"))?;
}
Ok(())
result
}
/// Batch-update sort_order for a list of group IDs.
pub fn reorder_groups(&self, ids: &[i64]) -> Result<(), String> {
let conn = self.db.conn();
for (i, id) in ids.iter().enumerate() {
conn.execute(
"UPDATE groups SET sort_order = ?1 WHERE id = ?2",
params![i as i64, id],
)
.map_err(|e| format!("Failed to reorder group {id}: {e}"))?;
conn.execute_batch("BEGIN")
.map_err(|e| format!("Failed to begin reorder transaction: {e}"))?;
let result = (|| {
for (i, id) in ids.iter().enumerate() {
conn.execute(
"UPDATE groups SET sort_order = ?1 WHERE id = ?2",
params![i as i64, id],
)
.map_err(|e| format!("Failed to reorder group {id}: {e}"))?;
}
Ok(())
})();
if result.is_err() {
let _ = conn.execute_batch("ROLLBACK");
} else {
conn.execute_batch("COMMIT")
.map_err(|e| format!("Failed to commit reorder transaction: {e}"))?;
}
Ok(())
result
}
}

View File

@ -187,7 +187,7 @@ async fn handle_screenshot(
AxumState(state): AxumState<Arc<McpServerState>>,
Json(req): Json<ScreenshotRequest>,
) -> Json<McpResponse<String>> {
match state.rdp.screenshot_png_base64(&req.session_id).await {
match state.rdp.screenshot_png_base64(&req.session_id) {
Ok(b64) => ok_response(b64),
Err(e) => err_response(e),
}

View File

@ -71,8 +71,15 @@ struct RdpSessionHandle {
hostname: String,
width: u16,
height: u16,
frame_buffer: Arc<TokioMutex<Vec<u8>>>,
/// Double-buffered: back_buffer is written by the RDP thread,
/// front_buffer is read by the IPC command. Swap on GraphicsUpdate
/// so reads never block writes. Arcs kept alive via struct ownership.
front_buffer: Arc<std::sync::RwLock<Vec<u8>>>,
#[allow(dead_code)]
back_buffer: Arc<TokioMutex<Vec<u8>>>,
frame_dirty: Arc<AtomicBool>,
#[allow(dead_code)]
frame_generation: Arc<std::sync::atomic::AtomicU64>,
input_tx: mpsc::UnboundedSender<InputEvent>,
}
@ -99,8 +106,10 @@ impl RdpService {
for pixel in initial_buf.chunks_exact_mut(4) {
pixel[3] = 255;
}
let frame_buffer = Arc::new(TokioMutex::new(initial_buf));
let front_buffer = Arc::new(std::sync::RwLock::new(initial_buf.clone()));
let back_buffer = Arc::new(TokioMutex::new(initial_buf));
let frame_dirty = Arc::new(AtomicBool::new(false));
let frame_generation = Arc::new(std::sync::atomic::AtomicU64::new(0));
let (input_tx, input_rx) = mpsc::unbounded_channel();
@ -109,8 +118,10 @@ impl RdpService {
hostname: hostname.clone(),
width,
height,
frame_buffer: frame_buffer.clone(),
front_buffer: front_buffer.clone(),
back_buffer: back_buffer.clone(),
frame_dirty: frame_dirty.clone(),
frame_generation: frame_generation.clone(),
input_tx,
});
@ -156,8 +167,10 @@ impl RdpService {
if let Err(e) = run_active_session(
connection_result,
framed,
frame_buffer,
back_buffer,
front_buffer,
frame_dirty,
frame_generation,
input_rx,
width as u16,
height as u16,
@ -200,27 +213,27 @@ impl RdpService {
Ok(session_id)
}
pub async fn get_frame(&self, session_id: &str) -> Result<Vec<u8>, String> {
pub fn get_frame(&self, session_id: &str) -> Result<Vec<u8>, String> {
let handle = self.sessions.get(session_id).ok_or_else(|| format!("RDP session {} not found", session_id))?;
if !handle.frame_dirty.swap(false, Ordering::Relaxed) {
if !handle.frame_dirty.swap(false, Ordering::Acquire) {
return Ok(Vec::new()); // No change — return empty
}
let buf = handle.frame_buffer.lock().await;
let buf = handle.front_buffer.read().unwrap_or_else(|e| e.into_inner());
Ok(buf.clone())
}
pub async fn get_frame_raw(&self, session_id: &str) -> Result<Vec<u8>, String> {
pub fn get_frame_raw(&self, session_id: &str) -> Result<Vec<u8>, String> {
let handle = self.sessions.get(session_id).ok_or_else(|| format!("RDP session {} not found", session_id))?;
let buf = handle.frame_buffer.lock().await;
let buf = handle.front_buffer.read().unwrap_or_else(|e| e.into_inner());
Ok(buf.clone())
}
/// Capture the current RDP frame as a base64-encoded PNG.
pub async fn screenshot_png_base64(&self, session_id: &str) -> Result<String, String> {
pub fn screenshot_png_base64(&self, session_id: &str) -> Result<String, String> {
let handle = self.sessions.get(session_id).ok_or_else(|| format!("RDP session {} not found", session_id))?;
let width = handle.width as u32;
let height = handle.height as u32;
let buf = handle.frame_buffer.lock().await;
let buf = handle.front_buffer.read().unwrap_or_else(|e| e.into_inner());
// Encode RGBA raw bytes to PNG (fast compression for speed)
let mut png_data = Vec::new();
@ -336,7 +349,7 @@ async fn establish_connection(config: connector::Config, hostname: &str, port: u
Ok((connection_result, upgraded_framed))
}
async fn run_active_session(connection_result: ConnectionResult, framed: UpgradedFramed, frame_buffer: Arc<TokioMutex<Vec<u8>>>, frame_dirty: Arc<AtomicBool>, mut input_rx: mpsc::UnboundedReceiver<InputEvent>, width: u16, height: u16, app_handle: tauri::AppHandle, session_id: String) -> Result<(), String> {
async fn run_active_session(connection_result: ConnectionResult, framed: UpgradedFramed, back_buffer: Arc<TokioMutex<Vec<u8>>>, front_buffer: Arc<std::sync::RwLock<Vec<u8>>>, frame_dirty: Arc<AtomicBool>, frame_generation: Arc<std::sync::atomic::AtomicU64>, mut input_rx: mpsc::UnboundedReceiver<InputEvent>, width: u16, height: u16, app_handle: tauri::AppHandle, session_id: String) -> Result<(), String> {
let (mut reader, mut writer) = split_tokio_framed(framed);
let mut image = DecodedImage::new(PixelFormat::RgbA32, width, height);
let mut active_stage = ActiveStage::new(connection_result);
@ -395,12 +408,24 @@ async fn run_active_session(connection_result: ConnectionResult, framed: Upgrade
match out {
ActiveStageOutput::ResponseFrame(frame) => { writer.write_all(&frame).await.map_err(|e| format!("Failed to write RDP response frame: {}", e))?; }
ActiveStageOutput::GraphicsUpdate(_region) => {
let mut buf = frame_buffer.lock().await;
let src = image.data();
if src.len() == buf.len() { buf.copy_from_slice(src); } else { *buf = src.to_vec(); }
frame_dirty.store(true, Ordering::Relaxed);
// Push frame notification to frontend — no data, just a signal to fetch
let _ = app_handle.emit(&format!("rdp:frame:{}", session_id), ());
// Write to back buffer (async mutex, no contention with reads)
{
let mut buf = back_buffer.lock().await;
let src = image.data();
if src.len() == buf.len() { buf.copy_from_slice(src); } else { *buf = src.to_vec(); }
}
// Swap into front buffer (RwLock write — brief, readers use read lock)
{
let back = back_buffer.lock().await;
let mut front = front_buffer.write().unwrap_or_else(|e| e.into_inner());
front.copy_from_slice(&back);
}
let frame_gen = frame_generation.fetch_add(1, Ordering::Release);
frame_dirty.store(true, Ordering::Release);
// Throttle: only emit event every other frame to avoid flooding IPC
if frame_gen % 2 == 0 {
let _ = app_handle.emit(&format!("rdp:frame:{}", session_id), ());
}
}
ActiveStageOutput::Terminate(reason) => { info!("RDP session terminated: {:?}", reason); return Ok(()); }
ActiveStageOutput::DeactivateAll(_) => { warn!("RDP server sent DeactivateAll — reconnection not yet implemented"); return Ok(()); }

View File

@ -64,11 +64,36 @@ fn service_name(port: u16) -> &'static str {
}
}
/// Validate that `subnet` contains exactly three dot-separated octet groups,
/// each consisting only of 13 ASCII digits (e.g. "192.168.1").
/// Returns an error string if the format is invalid.
fn validate_subnet(subnet: &str) -> Result<(), String> {
let parts: Vec<&str> = subnet.split('.').collect();
if parts.len() != 3 {
return Err(format!(
"Invalid subnet '{}': expected three octets (e.g. 192.168.1)",
subnet
));
}
for part in &parts {
if part.is_empty() || part.len() > 3 || !part.chars().all(|c| c.is_ascii_digit()) {
return Err(format!(
"Invalid subnet '{}': each octet must be 13 decimal digits",
subnet
));
}
}
Ok(())
}
/// Discover hosts on the remote network using ARP table and ping sweep.
pub async fn scan_network(
handle: &Arc<TokioMutex<Handle<SshClient>>>,
subnet: &str,
) -> Result<Vec<DiscoveredHost>, String> {
// Validate subnet format before using it in remote shell commands.
validate_subnet(subnet)?;
// Script that works on Linux and macOS:
// 1. Ping sweep the subnet to populate ARP cache
// 2. Read ARP table for IP/MAC pairs

View File

@ -28,7 +28,7 @@
</template>
<script setup lang="ts">
import { ref, onMounted, onBeforeUnmount, watch } from "vue";
import { ref, computed, onMounted, onBeforeUnmount, watch } from "vue";
import { useRdp, MouseFlag } from "@/composables/useRdp";
const props = defineProps<{
@ -42,8 +42,8 @@ const containerRef = ref<HTMLElement | null>(null);
const canvasWrapper = ref<HTMLElement | null>(null);
const canvasRef = ref<HTMLCanvasElement | null>(null);
const rdpWidth = props.width ?? 1920;
const rdpHeight = props.height ?? 1080;
const rdpWidth = computed(() => props.width ?? 1920);
const rdpHeight = computed(() => props.height ?? 1080);
const {
connected,
@ -76,8 +76,8 @@ function toRdpCoords(e: MouseEvent): { x: number; y: number } | null {
if (!canvas) return null;
const rect = canvas.getBoundingClientRect();
const scaleX = rdpWidth / rect.width;
const scaleY = rdpHeight / rect.height;
const scaleX = rdpWidth.value / rect.width;
const scaleY = rdpHeight.value / rect.height;
return {
x: Math.floor((e.clientX - rect.left) * scaleX),
@ -155,7 +155,7 @@ function handleKeyUp(e: KeyboardEvent): void {
onMounted(() => {
if (canvasRef.value) {
startFrameLoop(props.sessionId, canvasRef.value, rdpWidth, rdpHeight);
startFrameLoop(props.sessionId, canvasRef.value, rdpWidth.value, rdpHeight.value);
}
});

View File

@ -299,30 +299,37 @@ export function useRdp(): UseRdpReturn {
canvas.height = height;
let fetchPending = false;
let rafScheduled = false;
// Fetch frame when backend signals a new frame is ready
async function onFrameReady(): Promise<void> {
if (fetchPending) return; // Don't stack fetches
fetchPending = true;
const imageData = await fetchFrame(sessionId, width, height);
fetchPending = false;
if (imageData && ctx) {
ctx.putImageData(imageData, 0, 0);
if (!connected.value) connected.value = true;
}
// Fetch frame when backend signals a new frame is ready.
// Uses rAF to coalesce rapid events into one fetch per display frame.
function scheduleFrameFetch(): void {
if (rafScheduled) return;
rafScheduled = true;
animFrameId = requestAnimationFrame(async () => {
rafScheduled = false;
if (fetchPending) return;
fetchPending = true;
const imageData = await fetchFrame(sessionId, width, height);
fetchPending = false;
if (imageData && ctx) {
ctx.putImageData(imageData, 0, 0);
if (!connected.value) connected.value = true;
}
});
}
// Listen for frame events from the backend (push model)
import("@tauri-apps/api/event").then(({ listen }) => {
listen(`rdp:frame:${sessionId}`, () => {
onFrameReady();
scheduleFrameFetch();
}).then((unlisten) => {
unlistenFrame = unlisten;
});
});
// Also do an initial poll in case frames arrived before listener was set up
onFrameReady();
// Initial poll in case frames arrived before listener was set up
scheduleFrameFetch();
}
/**