Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

vnc proxy #5342

Draft
wants to merge 8 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

261 changes: 256 additions & 5 deletions nexus/src/app/instance.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ use sled_agent_client::types::InstancePutStateBody;
use std::matches;
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use uuid::Uuid;

type SledAgentClientError =
Expand Down Expand Up @@ -1710,7 +1710,7 @@ impl super::Nexus {
instance_lookup: &lookup::Instance<'_>,
params: &params::InstanceSerialConsoleStreamRequest,
) -> Result<(), Error> {
let client_addr = match self
let propolis_addr = match self
.propolis_addr_for_instance(
opctx,
instance_lookup,
Expand All @@ -1737,7 +1737,7 @@ impl super::Nexus {
};

match propolis_client::support::InstanceSerialConsoleHelper::new(
client_addr,
propolis_addr,
offset,
Some(self.log.clone()),
)
Expand Down Expand Up @@ -1765,6 +1765,58 @@ impl super::Nexus {
}
}

pub(crate) async fn instance_vnc_stream(
&self,
opctx: &OpContext,
mut client_stream: WebSocketStream<
impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
>,
instance_lookup: &lookup::Instance<'_>,
) -> Result<(), Error> {
const VNC_PORT: u16 = 5900;

let propolis_ip = match self
.propolis_addr_for_instance(
opctx,
instance_lookup,
authz::Action::Modify,
)
.await
{
Ok(x) => x.ip(),
Err(e) => {
let _ = client_stream
.close(Some(CloseFrame {
code: CloseCode::Error,
reason: e.to_string().into(),
}))
.await
.is_ok();
return Err(e);
}
};

match tokio::net::TcpStream::connect((propolis_ip, VNC_PORT)).await {
Ok(propolis_conn) => {
Self::proxy_tcp_socket_ws(client_stream, propolis_conn)
.await
.map_err(|e| Error::internal_error(&format!("{}", e)))
}
Err(e) => {
let message =
format!("socket connection to instance VNC failed: {}", e);
let _ = client_stream
.close(Some(CloseFrame {
code: CloseCode::Error,
reason: message.clone().into(),
}))
.await
.is_ok();
Err(Error::internal_error(&message))
}
}
}

async fn propolis_addr_for_instance(
&self,
opctx: &OpContext,
Expand Down Expand Up @@ -1803,8 +1855,7 @@ impl super::Nexus {
}
} else {
Err(Error::invalid_request(format!(
"instance is {} and has no active serial console \
server",
"instance is {} and has no active console server",
instance.runtime().nexus_state
)))
}
Expand Down Expand Up @@ -1976,6 +2027,113 @@ impl super::Nexus {
Ok(())
}

/// Trivially pack data read from a TcpStream into binary websocket frames,
/// and unpack those received from the client accordingly.
/// NoVNC (a web VNC client) calls their version of this "websockify".
async fn proxy_tcp_socket_ws(
client_stream: WebSocketStream<
impl AsyncRead + AsyncWrite + Unpin + Send + 'static,
>,
propolis_conn: tokio::net::TcpStream,
) -> Result<(), propolis_client::support::tungstenite::Error> {
let (mut nexus_sink, mut nexus_stream) = client_stream.split();
let (mut propolis_reader, mut propolis_writer) =
propolis_conn.into_split();
let (closed_tx, mut closed_rx) = tokio::sync::oneshot::channel::<()>();

let mut jh = tokio::spawn(async move {
// big enough for 1024x768 32bpp and then some
let mut read_buffer = vec![0u8; 4 * 1024 * 1024];
loop {
tokio::select! {
_ = &mut closed_rx => break,
num_bytes_res = propolis_reader.read(&mut read_buffer) => {
let Ok(num_bytes) = num_bytes_res else {
let _ = nexus_sink.send(WebSocketMessage::Close(None)).await.is_ok();
break;
};
let data = Vec::from(&read_buffer[..num_bytes]);
match nexus_sink.send(WebSocketMessage::Binary(data)).await {
Ok(_) => {}
Err(_e) => break,
}
}
}
}
Ok::<_, Error>(nexus_sink)
});

let mut close_frame = None;
let mut task_joined = false;
loop {
tokio::select! {
res = &mut jh => {
task_joined = true;
if let Ok(Ok(mut nexus_sink)) = res {
// .take() here avoids borrow collision in the cleanup code
// below the loop where we also join the task if it hasn't been
let _ = nexus_sink
.send(WebSocketMessage::Close(close_frame.take()))
.await
.is_ok();
}
break;
}
msg = nexus_stream.next() => {
match msg {
None => {
// websocket connection to nexus client closed unexpectedly
break;
}
Some(Err(e)) => {
// error in websocket connection to nexus client
return Err(e);
}
Some(Ok(WebSocketMessage::Close(_details))) => {
// websocket connection to nexus client closed normally
break;
}
Some(Ok(WebSocketMessage::Text(_text))) => {
// TODO: json payloads specifying client-sent metadata?
}
Some(Ok(WebSocketMessage::Binary(data))) => {
let mut start = 0;
while start < data.len() {
match propolis_writer.write(&data[start..]).await {
Ok(num_bytes) => {
start += num_bytes;
}
Err(e) => {
close_frame = Some(CloseFrame {
code: CloseCode::Error,
reason: e.to_string().into(),
});
break;
}
}
}
}
// Frame won't exist at this level, and ping reply is handled by tungstenite
Some(Ok(WebSocketMessage::Frame(_) | WebSocketMessage::Ping(_) | WebSocketMessage::Pong(_))) => {}
}
}
}
}

// double-joining a task handle is a panic
if !task_joined {
let _ = closed_tx.send(()).is_ok();
if let Ok(Ok(mut nexus_sink)) = jh.await {
let _ = nexus_sink
.send(WebSocketMessage::Close(close_frame))
.await
.is_ok();
}
}

Ok(())
}

/// Attach an ephemeral IP to an instance.
pub(crate) async fn instance_attach_ephemeral_ip(
self: &Arc<Self>,
Expand Down Expand Up @@ -2099,6 +2257,7 @@ mod tests {
InstanceSerialConsoleHelper, WSClientOffset,
};
use std::net::{Ipv4Addr, SocketAddr, SocketAddrV4};
use tokio::io::{AsyncReadExt, AsyncWriteExt};

#[tokio::test]
async fn test_serial_console_stream_proxying() {
Expand Down Expand Up @@ -2191,4 +2350,96 @@ mod tests {
.expect("proxy task exited successfully");
logctx.cleanup_successful();
}

#[tokio::test]
async fn test_tcp_stream_proxying() {
let logctx = test_setup_log("test_tcp_stream_proxying");
let (nexus_client_conn, nexus_server_conn) = tokio::io::duplex(1024);
let propolis_listener = tokio::net::TcpListener::bind("127.0.0.1:0")
.await
.expect("couldn't make TcpListener");

let addr = propolis_listener.local_addr().unwrap();

let jh = tokio::spawn(async move { propolis_listener.accept().await });

let propolis_client_conn = tokio::net::TcpStream::connect(addr)
.await
.expect("couldn't open TcpStream connection to TcpListener");

let mut propolis_server_conn = jh
.await
.expect("couldn't join")
.expect("couldn't accept client connection from TcpListener")
.0;

let jh = tokio::spawn(async move {
let nexus_client_stream = WebSocketStream::from_raw_socket(
nexus_server_conn,
Role::Server,
None,
)
.await;
Nexus::proxy_tcp_socket_ws(
nexus_client_stream,
propolis_client_conn,
)
.await
});
let mut nexus_client_ws = WebSocketStream::from_raw_socket(
nexus_client_conn,
Role::Client,
None,
)
.await;

slog::info!(logctx.log, "sending messages to nexus client");
let sent1 = WebSocketMessage::Binary(vec![1, 2, 3, 42, 5]);
nexus_client_ws.send(sent1.clone()).await.unwrap();
let sent2 = WebSocketMessage::Binary(vec![5, 42, 3, 2, 1]);
nexus_client_ws.send(sent2.clone()).await.unwrap();
slog::info!(
logctx.log,
"messages sent, receiving them via propolis server"
);
let received =
tokio::time::timeout(std::time::Duration::from_secs(10), async {
let mut buf = [0u8; 1024];
let mut received = Vec::<u8>::new();
while received.len() < 10 {
let bytes =
propolis_server_conn.read(&mut buf).await.unwrap();
received.extend(&buf[..bytes]);
}
received
})
.await
.expect("timed out receiving");
assert_eq!(received, vec![1, 2, 3, 42, 5, 5, 42, 3, 2, 1]);

slog::info!(logctx.log, "sending data to propolis server");
let sent3 = vec![6, 7, 8, 90, 90, 8, 7, 6];
propolis_server_conn.write_all(&sent3).await.unwrap();
slog::info!(logctx.log, "data sent, receiving it via nexus client");
let received3 = nexus_client_ws.next().await.unwrap().unwrap();
assert_eq!(WebSocketMessage::Binary(sent3), received3);

slog::info!(logctx.log, "sending close message to nexus client");
let sent = WebSocketMessage::Close(Some(CloseFrame {
code: CloseCode::Normal,
reason: std::borrow::Cow::from("test done"),
}));
nexus_client_ws.send(sent.clone()).await.unwrap();
slog::info!(
logctx.log,
"sent close message, waiting \
1s for proxy task to shut down"
);
tokio::time::timeout(Duration::from_secs(1), jh)
.await
.expect("proxy task shut down within 1s")
.expect("task successfully completed")
.expect("proxy task exited successfully");
logctx.cleanup_successful();
}
}
48 changes: 48 additions & 0 deletions nexus/src/external_api/http_entrypoints.rs
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,7 @@ pub(crate) fn external_api() -> NexusApiDescription {
api.register(instance_disk_detach)?;
api.register(instance_serial_console)?;
api.register(instance_serial_console_stream)?;
api.register(instance_vnc)?;
api.register(instance_ssh_public_key_list)?;

api.register(image_list)?;
Expand Down Expand Up @@ -2787,6 +2788,53 @@ async fn instance_serial_console_stream(
}
}

/// Stream instance VNC framebuffer
#[channel {
protocol = WEBSOCKETS,
path = "/v1/instances/{instance}/vnc",
tags = ["instances"],
}]
async fn instance_vnc(
rqctx: RequestContext<Arc<ServerContext>>,
path_params: Path<params::InstancePath>,
query_params: Query<params::OptionalProjectSelector>,
conn: WebsocketConnection,
) -> WebsocketChannelResult {
let apictx = rqctx.context();
let nexus = &apictx.nexus;
let path = path_params.into_inner();
let query = query_params.into_inner();
let opctx = crate::context::op_context_for_external_api(&rqctx).await?;
let instance_selector = params::InstanceSelector {
project: query.project.clone(),
instance: path.instance,
};
let mut client_stream = WebSocketStream::from_raw_socket(
conn.into_inner(),
WebSocketRole::Server,
None,
)
.await;
match nexus.instance_lookup(&opctx, instance_selector) {
Ok(instance_lookup) => {
nexus
.instance_vnc_stream(&opctx, client_stream, &instance_lookup)
.await?;
Ok(())
}
Err(e) => {
let _ = client_stream
.close(Some(CloseFrame {
code: CloseCode::Error,
reason: e.to_string().into(),
}))
.await
.is_ok();
Err(e.into())
}
}
}

/// List SSH public keys for instance
///
/// List SSH public keys injected via cloud-init during instance creation. Note
Expand Down
1 change: 1 addition & 0 deletions nexus/tests/output/nexus_tags.txt
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ instance_ssh_public_key_list GET /v1/instances/{instance}/ssh-p
instance_start POST /v1/instances/{instance}/start
instance_stop POST /v1/instances/{instance}/stop
instance_view GET /v1/instances/{instance}
instance_vnc GET /v1/instances/{instance}/vnc

API operations found with tag "login"
OPERATION ID METHOD URL PATH
Expand Down
Loading
Loading