From 4888116cf42217fe6af99c48a6fd6e8f0de6d361 Mon Sep 17 00:00:00 2001 From: core Date: Mon, 10 Apr 2023 17:01:29 -0400 Subject: [PATCH] stop mainloop when it errors - fix pingpong --- Cargo.lock | 11 +++++++++++ client/Cargo.toml | 2 +- client/src/lib.rs | 37 ++++++++++++++++++++++++++++++------ protocol/src/lib.rs | 11 ++++++++--- server/src/client_handler.rs | 19 ++++++++++++++++-- web/play.html | 11 ++++++++--- 6 files changed, 76 insertions(+), 15 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 91d07b773bb040a480007098f69470060ed68188..f4341d9f9816fc6f851d4bddbeb2c0cc1c63ac16 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -123,6 +123,16 @@ dependencies = [ "winapi", ] +[[package]] +name = "console_error_panic_hook" +version = "0.1.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc" +dependencies = [ + "cfg-if", + "wasm-bindgen", +] + [[package]] name = "console_log" version = "1.0.0" @@ -725,6 +735,7 @@ name = "starkingdoms-client" version = "0.1.0" dependencies = [ "async-recursion", + "console_error_panic_hook", "console_log", "futures", "js-sys", diff --git a/client/Cargo.toml b/client/Cargo.toml index f132cc824bc3efc3a3e7e9c0fff73eac9a9ef1f9..2e1fc08c748017a2a9befe1cc91aa6ae80b91ae9 100644 --- a/client/Cargo.toml +++ b/client/Cargo.toml @@ -23,7 +23,7 @@ serde = { version = "1", features = ["derive"] } lazy_static = "1.4" markdown = "1.0.0-alpha.7" # DO NOT DOWNGRADE async-recursion = "1" - +console_error_panic_hook = "0.1" [dependencies.web-sys] version = "0.3.4" diff --git a/client/src/lib.rs b/client/src/lib.rs index 9680b50a98e5fb42f348b57ef50f7a9383a3d122..e6c63e96d57d8de46af84e139f7996e47469864a 100644 --- a/client/src/lib.rs +++ b/client/src/lib.rs @@ -1,7 +1,8 @@ use std::error::Error; +use std::ops::Add; use futures::stream::{SplitSink, SplitStream}; use futures::StreamExt; -use log::{error, info, Level, trace, warn}; +use log::{debug, error, info, Level, trace, warn}; use wasm_bindgen::prelude::*; use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream}; use starkingdoms_protocol::State; @@ -12,13 +13,12 @@ use futures::SinkExt; use lazy_static::lazy_static; use std::sync::Arc; use std::sync::RwLock; - - +use std::time::{Duration, SystemTime, UNIX_EPOCH}; use async_recursion::async_recursion; use futures::FutureExt; - use wasm_bindgen_futures::JsFuture; use web_sys::Window; +use starkingdoms_protocol::GoodbyeReason::PingPongTimeout; #[macro_use] pub mod macros; @@ -31,10 +31,13 @@ extern { #[wasm_bindgen] pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> { + console_error_panic_hook::set_once(); + set_status("Starting logger..."); console_log::init_with_level(Level::Debug).unwrap(); + info!("Logger setup successfully"); match main(gateway, username, 1).await { @@ -57,9 +60,12 @@ pub struct Client { pub struct ClientData { pub state: State, pub tx: SplitSink, - pub rx: SplitStream + pub rx: SplitStream, + pub pong_timeout: u64 } +pub const PONG_MAX_TIMEOUT: u64 = 5; + lazy_static! { pub static ref CLIENT: Arc> = Arc::new(RwLock::new(Client { client_data: None @@ -107,7 +113,8 @@ pub async fn main(gateway: &str, username: &str, backoff: i32) -> Result<(), Box let mut client_data = ClientData { state: State::Handshake, tx, - rx + rx, + pong_timeout: (js_sys::Date::now() as u64 / 1000) + 5 }; trace!("Split stream, handshaking with server"); @@ -159,6 +166,21 @@ pub async fn update_socket() -> Result<(), JsError> { let client_data = client.client_data.as_mut().unwrap(); + if client_data.pong_timeout < (js_sys::Date::now() as u64 / 1000) { + error!("Connection timed out"); + send!(client_data.tx, &MessageC2S::Goodbye { + reason: PingPongTimeout + }).await?; + client.client_data = None; + set_status("Connection timed out. Reload to reconnect"); + return Err(JsError::new("Connection timed out")); + } + + if client_data.pong_timeout - 4 < (js_sys::Date::now() as u64 / 1000) { + // send ping + send!(client_data.tx, &MessageC2S::Ping {}).await?; + } + let maybe_msg: Option = match recv!(client_data.rx) { Ok(r) => r, Err(e) => { @@ -188,6 +210,9 @@ pub async fn update_socket() -> Result<(), JsError> { chatbox.append_child(&new_elem).unwrap(); }, + MessageS2C::Pong {} => { + client_data.pong_timeout = (js_sys::Date::now() as u64 / 1000) + PONG_MAX_TIMEOUT + }, _ => { warn!("server sent unexpected packet {:?}, ignoring", msg); } diff --git a/protocol/src/lib.rs b/protocol/src/lib.rs index c21e0669b137977a5e405805ba9456637e10e556..5ed1bc05111ead5a0c769c297f7eb2b169727f70 100644 --- a/protocol/src/lib.rs +++ b/protocol/src/lib.rs @@ -22,7 +22,9 @@ pub enum MessageC2S { Chat { message: String - } + }, + + Ping {} } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -40,7 +42,9 @@ pub enum MessageS2C { Chat { from: String, message: String - } + }, + + Pong {} } #[derive(Serialize, Deserialize, Debug, Clone)] @@ -49,7 +53,8 @@ pub enum GoodbyeReason { UnexpectedPacket, UnexpectedNextState, UsernameTaken, - Done + PingPongTimeout, + Done, } pub fn pc2s(pkt: &MessageC2S) -> Vec { diff --git a/server/src/client_handler.rs b/server/src/client_handler.rs index 05768990473157343876036104004e2e43bfb692..14efe33ea79b8f0df4cace6aa8ed9fb6d977f911 100644 --- a/server/src/client_handler.rs +++ b/server/src/client_handler.rs @@ -1,19 +1,22 @@ use std::error::Error; use std::net::SocketAddr; +use std::time::{Duration, SystemTime}; use futures::stream::{SplitSink, SplitStream}; use futures::{FutureExt, SinkExt, StreamExt}; use hyper::upgrade::Upgraded; -use log::{error, info}; +use log::{error, info, warn}; use tokio::sync::mpsc::Receiver; use tokio_tungstenite::WebSocketStream; use tungstenite::Message; use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State}; +use starkingdoms_protocol::GoodbyeReason::PingPongTimeout; use crate::handler::{ClientHandlerMessage, ClientManager}; use crate::{send, recv}; pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; let mut username = String::new(); + let mut ping_timeout = SystemTime::now() + Duration::from_secs(5); loop { if let Some(msg) = rx.recv().await { @@ -31,6 +34,14 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: break; } + if ping_timeout < SystemTime::now() { + warn!("[{}] ping timeout", remote_addr); + send!(client_tx, &MessageS2C::Goodbye { + reason: PingPongTimeout + }).await?; + break; + } + if let Some(pkt) = recv!(client_rx)? { match state { State::Handshake => { @@ -84,7 +95,7 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: info!("client sent goodbye: {:?}", reason); break; }, - MessageC2S::Chat { .. } => { + _ => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, @@ -117,6 +128,10 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: } } } + }, + MessageC2S::Ping {} => { + send!(client_tx, &MessageS2C::Pong {}).await?; + ping_timeout = SystemTime::now() + Duration::from_secs(5); } } } diff --git a/web/play.html b/web/play.html index 70d9997331989aba521fdf556501d81391758956..ee3310022d133575441a7c553df144b52d60b511 100644 --- a/web/play.html +++ b/web/play.html @@ -21,7 +21,7 @@