~starkingdoms/starkingdoms

4888116cf42217fe6af99c48a6fd6e8f0de6d361 — core 2 years ago 443b64e
stop mainloop when it errors - fix pingpong
6 files changed, 76 insertions(+), 15 deletions(-)

M Cargo.lock
M client/Cargo.toml
M client/src/lib.rs
M protocol/src/lib.rs
M server/src/client_handler.rs
M web/play.html
M Cargo.lock => Cargo.lock +11 -0
@@ 124,6 124,16 @@ dependencies = [
]

[[package]]
name = "console_error_panic_hook"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a06aeb73f470f66dcdbf7223caeebb85984942f22f1adb2a088cf9668146bbbc"
dependencies = [
 "cfg-if",
 "wasm-bindgen",
]

[[package]]
name = "console_log"
version = "1.0.0"
source = "registry+https://github.com/rust-lang/crates.io-index"


@@ 725,6 735,7 @@ name = "starkingdoms-client"
version = "0.1.0"
dependencies = [
 "async-recursion",
 "console_error_panic_hook",
 "console_log",
 "futures",
 "js-sys",

M client/Cargo.toml => client/Cargo.toml +1 -1
@@ 23,7 23,7 @@ serde = { version = "1", features = ["derive"] }
lazy_static = "1.4"
markdown = "1.0.0-alpha.7" # DO NOT DOWNGRADE
async-recursion = "1"

console_error_panic_hook = "0.1"

[dependencies.web-sys]
version = "0.3.4"

M client/src/lib.rs => client/src/lib.rs +31 -6
@@ 1,7 1,8 @@
use std::error::Error;
use std::ops::Add;
use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
use log::{error, info, Level, trace, warn};
use log::{debug, error, info, Level, trace, warn};
use wasm_bindgen::prelude::*;
use ws_stream_wasm::{WsErr, WsMessage, WsMeta, WsStream};
use starkingdoms_protocol::State;


@@ 12,13 13,12 @@ use futures::SinkExt;
use lazy_static::lazy_static;
use std::sync::Arc;
use std::sync::RwLock;


use std::time::{Duration, SystemTime, UNIX_EPOCH};
use async_recursion::async_recursion;
use futures::FutureExt;

use wasm_bindgen_futures::JsFuture;
use web_sys::Window;
use starkingdoms_protocol::GoodbyeReason::PingPongTimeout;

#[macro_use]
pub mod macros;


@@ 31,10 31,13 @@ extern {

#[wasm_bindgen]
pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> {
    console_error_panic_hook::set_once();

    set_status("Starting logger...");

    console_log::init_with_level(Level::Debug).unwrap();


    info!("Logger setup successfully");

    match main(gateway, username, 1).await {


@@ 57,9 60,12 @@ pub struct Client {
pub struct ClientData {
    pub state: State,
    pub tx: SplitSink<WsStream, WsMessage>,
    pub rx: SplitStream<WsStream>
    pub rx: SplitStream<WsStream>,
    pub pong_timeout: u64
}

pub const PONG_MAX_TIMEOUT: u64 = 5;

lazy_static! {
    pub static ref CLIENT: Arc<RwLock<Client>> = Arc::new(RwLock::new(Client {
        client_data: None


@@ 107,7 113,8 @@ pub async fn main(gateway: &str, username: &str, backoff: i32) -> Result<(), Box
    let mut client_data = ClientData {
        state: State::Handshake,
        tx,
        rx
        rx,
        pong_timeout: (js_sys::Date::now() as u64 / 1000) + 5
    };

    trace!("Split stream, handshaking with server");


@@ 159,6 166,21 @@ pub async fn update_socket() -> Result<(), JsError> {

    let client_data = client.client_data.as_mut().unwrap();

    if client_data.pong_timeout < (js_sys::Date::now() as u64 / 1000) {
        error!("Connection timed out");
        send!(client_data.tx, &MessageC2S::Goodbye {
                reason: PingPongTimeout
            }).await?;
        client.client_data = None;
        set_status("Connection timed out. Reload to reconnect");
        return Err(JsError::new("Connection timed out"));
    }

    if client_data.pong_timeout - 4 < (js_sys::Date::now() as u64 / 1000) {
        // send ping
        send!(client_data.tx, &MessageC2S::Ping {}).await?;
    }

    let maybe_msg: Option<MessageS2C> = match recv!(client_data.rx) {
        Ok(r) => r,
        Err(e) => {


@@ 188,6 210,9 @@ pub async fn update_socket() -> Result<(), JsError> {

                chatbox.append_child(&new_elem).unwrap();
            },
            MessageS2C::Pong {} => {
                client_data.pong_timeout = (js_sys::Date::now() as u64 / 1000) + PONG_MAX_TIMEOUT
            },
            _ => {
                warn!("server sent unexpected packet {:?}, ignoring", msg);
            }

M protocol/src/lib.rs => protocol/src/lib.rs +8 -3
@@ 22,7 22,9 @@ pub enum MessageC2S {

    Chat {
        message: String
    }
    },

    Ping {}
}

#[derive(Serialize, Deserialize, Debug, Clone)]


@@ 40,7 42,9 @@ pub enum MessageS2C {
    Chat {
        from: String,
        message: String
    }
    },

    Pong {}
}

#[derive(Serialize, Deserialize, Debug, Clone)]


@@ 49,7 53,8 @@ pub enum GoodbyeReason {
    UnexpectedPacket,
    UnexpectedNextState,
    UsernameTaken,
    Done
    PingPongTimeout,
    Done,
}

pub fn pc2s(pkt: &MessageC2S) -> Vec<u8> {

M server/src/client_handler.rs => server/src/client_handler.rs +17 -2
@@ 1,19 1,22 @@
use std::error::Error;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use hyper::upgrade::Upgraded;
use log::{error, info};
use log::{error, info, warn};
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State};
use starkingdoms_protocol::GoodbyeReason::PingPongTimeout;
use crate::handler::{ClientHandlerMessage, ClientManager};
use crate::{send, recv};

pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
    let mut state = State::Handshake;
    let mut username = String::new();
    let mut ping_timeout = SystemTime::now() + Duration::from_secs(5);

    loop {
        if let Some(msg) = rx.recv().await {


@@ 31,6 34,14 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: 
            break;
        }

        if ping_timeout < SystemTime::now() {
            warn!("[{}] ping timeout", remote_addr);
            send!(client_tx, &MessageS2C::Goodbye {
                reason: PingPongTimeout
            }).await?;
            break;
        }

        if let Some(pkt) = recv!(client_rx)? {
            match state {
                State::Handshake => {


@@ 84,7 95,7 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: 
                            info!("client sent goodbye: {:?}", reason);
                            break;
                        },
                        MessageC2S::Chat { .. } => {
                        _ => {
                            error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
                            send!(client_tx, &MessageS2C::Goodbye {
                                    reason: GoodbyeReason::UnexpectedPacket,


@@ 117,6 128,10 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: 
                                    }
                                }
                            }
                        },
                        MessageC2S::Ping {} => {
                            send!(client_tx, &MessageS2C::Pong {}).await?;
                            ping_timeout = SystemTime::now() + Duration::from_secs(5);
                        }
                    }
                }

M web/play.html => web/play.html +8 -3
@@ 21,7 21,7 @@
        <script type="module">
            // If you're getting build errors here | you need to run `just build_client_bundle` first, to compile client code
            //                                     v
            import init, { rust_init, send_chat, update_socket } from "./dist/starkingdoms_client.js";
            import init, { rust_init, send_chat, update_socket, set_status } from "./dist/starkingdoms_client.js";
            init().then(() => {
                const urlSearchParams = new URLSearchParams(window.location.search);



@@ 30,8 30,13 @@
                        send_chat(document.getElementById("chat-value").value);
                    });

                    setInterval(() => {
                        update_socket();
                    let interval_id;
                    interval_id = setInterval(() => {
                        update_socket().catch((e) => {
                            clearInterval(interval_id);
                            set_status("There was an error. Reload the page to reconnect.")
                            throw e;
                        });
                    }, 5);
                });
            })