~starkingdoms/starkingdoms

8040f221e2a80d88a7a5ba849aa4eb8912388f71 — ghostlyzsh 2 years ago 6d234cf + f1bc9b7
yay merged the file swap (why git why)
5 files changed, 169 insertions(+), 167 deletions(-)

M .gitlab-ci.yml
M server/src/handler.rs
M server/src/main.rs
M server/src/manager.rs
M server/src/timer.rs
M .gitlab-ci.yml => .gitlab-ci.yml +5 -1
@@ 2,9 2,13 @@ image: "rust:latest"
before_script:
  - apt-get update -yqq
  - apt-get install -yqq --no-install-recommends build-essential
  - rustup component add clippy
  - cargo install just
  - just install_tooling

check:cargo:
  script:
    - rustc --version && cargo --version
    - cargo check --workspace --verbose --locked
    - just build_client_bundle
    - cargo check --locked --bin starkingdoms-server
    - cargo clippy --workspace --verbose --locked
\ No newline at end of file

M server/src/handler.rs => server/src/handler.rs +134 -26
@@ 1,35 1,143 @@
use std::collections::HashMap;

use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use hyper::upgrade::Upgraded;
use log::{error, info, warn};
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State};
use starkingdoms_protocol::GoodbyeReason::PingPongTimeout;
use crate::manager::{ClientHandlerMessage, ClientManager};
use crate::{send, recv};

use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;
pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
    let mut state = State::Handshake;
    let mut username = String::new();
    let mut ping_timeout = SystemTime::now() + Duration::from_secs(5);

    loop {
        if let Some(msg) = rx.recv().await {
            match msg {
                ClientHandlerMessage::Tick => {} // this intentionally does nothing,
                ClientHandlerMessage::ChatMessage { from, message } => {
                    send!(client_tx, &MessageS2C::Chat {
                        message,
                        from
                    }).await?;
                }
            }
        } else {
            info!("channel closed, shutting down");
            break;
        }

        if ping_timeout < SystemTime::now() {
            warn!("[{}] ping timeout", remote_addr);
            send!(client_tx, &MessageS2C::Goodbye {
                reason: PingPongTimeout
            }).await?;
            break;
        }

#[derive(Clone)]
pub struct ClientManager {
    pub handlers: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
    pub usernames: Arc<RwLock<HashMap<SocketAddr, String>>>,
    pub players: Arc<RwLock<HashMap<SocketAddr, Player>>>
}
        if let Some(pkt) = recv!(client_rx)? {
            match state {
                State::Handshake => {
                    match pkt {
                        MessageC2S::Hello { version, requested_username, next_state } => {
                            if !matches!(next_state, State::Play) {
                                error!("client sent unexpected state {:?} (expected: Play)", next_state);
                                send!(client_tx, &MessageS2C::Goodbye {
                                        reason: GoodbyeReason::UnexpectedNextState,
                                    }).await?;
                                break;
                            }

#[derive(Default)]
pub struct Player {
    pub id: u16,
    pub x: f64,
    pub y: f64,
    pub vel_x: f64,
    pub vel_y: f64,
}
                            // check version
                            if version != PROTOCOL_VERSION {
                                error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION);
                                send!(client_tx, &MessageS2C::Goodbye {
                                        reason: GoodbyeReason::UnsupportedProtocol {
                                            supported: PROTOCOL_VERSION,
                                            got: version,
                                        },
                                    }).await?;
                                break;
                            }

#[derive(Clone)]
pub struct ClientHandler {
    pub tx: Sender<ClientHandlerMessage>
}
                            // determine if we can give them that username
                            {
                                if mgr.usernames.read().await.values().any(|u| *u == requested_username) {
                                    error!("client requested username {} but it is in use", requested_username);
                                    send!(client_tx, &MessageS2C::Goodbye {
                                            reason: GoodbyeReason::UsernameTaken,
                                        }).await?;
                                    break;
                                }
                            }

                            // username is fine
                            {
                                mgr.usernames.write().await.insert(remote_addr, requested_username.clone());
                            }

                            send!(client_tx, &MessageS2C::Hello {
                                    version,
                                    given_username: requested_username.clone(),
                                    next_state,
                                }).await?;
                            state = next_state;
                            username = requested_username;
                        },
                        MessageC2S::Goodbye { reason } => {
                            info!("client sent goodbye: {:?}", reason);
                            break;
                        },
                        _ => {
                            error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
                            send!(client_tx, &MessageS2C::Goodbye {
                                    reason: GoodbyeReason::UnexpectedPacket,
                                }).await?;
                            break;
                        }
                    }
                }
                State::Play => {
                    match pkt {
                        MessageC2S::Hello { .. } => {
                            error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
                            send!(client_tx, &MessageS2C::Goodbye {
                                    reason: GoodbyeReason::UnexpectedPacket,
                                }).await?;
                            break;
                        },
                        MessageC2S::Goodbye { reason } => {
                            info!("client sent goodbye: {:?}", reason);
                            break;
                        },
                        MessageC2S::Chat { message } => {
                            info!("[{}] CHAT: [{}] {}", remote_addr, username, message);

                            for (_addr, client_thread) in mgr.handlers.read().await.iter() {
                                match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await {
                                    Ok(_) => (),
                                    Err(e) => {
                                        error!("unable to update a client thread: {}", e);
                                    }
                                }
                            }
                        },
                        MessageC2S::Ping {} => {
                            send!(client_tx, &MessageS2C::Pong {}).await?;
                            ping_timeout = SystemTime::now() + Duration::from_secs(5);
                        }
                    }
                }
            }
        }
    }

pub enum ClientHandlerMessage {
    Tick,
    ChatMessage { from: String, message: String }
    Ok(())
}

M server/src/main.rs => server/src/main.rs +3 -3
@@ 11,12 11,12 @@ use log::{error, info, Level};
use serde::{Deserialize, Serialize};
use tokio::sync::RwLock;
use starkingdoms_protocol::{PROTOCOL_VERSION};
use crate::handler::{ClientHandler, ClientManager};
use crate::manager::handle_client;
use crate::manager::{ClientHandler, ClientManager};
use crate::handler::handle_client;
use crate::timer::timer_main;

pub mod manager;
pub mod handler;
pub mod manager;
pub mod timer;
#[macro_use]
pub mod macros;

M server/src/manager.rs => server/src/manager.rs +26 -136
@@ 1,143 1,33 @@
use std::error::Error;
use std::net::SocketAddr;
use std::time::{Duration, SystemTime};
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use hyper::upgrade::Upgraded;
use log::{error, info, warn};
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State};
use starkingdoms_protocol::GoodbyeReason::PingPongTimeout;
use crate::handler::{ClientHandlerMessage, ClientManager};
use crate::{send, recv};

pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
    let mut state = State::Handshake;
    let mut username = String::new();
    let mut ping_timeout = SystemTime::now() + Duration::from_secs(5);

    loop {
        if let Some(msg) = rx.recv().await {
            match msg {
                ClientHandlerMessage::Tick => {} // this intentionally does nothing,
                ClientHandlerMessage::ChatMessage { from, message } => {
                    send!(client_tx, &MessageS2C::Chat {
                        message,
                        from
                    }).await?;
                }
            }
        } else {
            info!("channel closed, shutting down");
            break;
        }

        if ping_timeout < SystemTime::now() {
            warn!("[{}] ping timeout", remote_addr);
            send!(client_tx, &MessageS2C::Goodbye {
                reason: PingPongTimeout
            }).await?;
            break;
        }

        if let Some(pkt) = recv!(client_rx)? {
            match state {
                State::Handshake => {
                    match pkt {
                        MessageC2S::Hello { version, requested_username, next_state } => {
                            if !matches!(next_state, State::Play) {
                                error!("client sent unexpected state {:?} (expected: Play)", next_state);
                                send!(client_tx, &MessageS2C::Goodbye {
                                        reason: GoodbyeReason::UnexpectedNextState,
                                    }).await?;
                                break;
                            }
use std::collections::HashMap;

                            // check version
                            if version != PROTOCOL_VERSION {
                                error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION);
                                send!(client_tx, &MessageS2C::Goodbye {
                                        reason: GoodbyeReason::UnsupportedProtocol {
                                            supported: PROTOCOL_VERSION,
                                            got: version,
                                        },
                                    }).await?;
                                break;
                            }
use std::net::SocketAddr;
use std::sync::Arc;

                            // determine if we can give them that username
                            {
                                if mgr.usernames.read().await.values().any(|u| *u == requested_username) {
                                    error!("client requested username {} but it is in use", requested_username);
                                    send!(client_tx, &MessageS2C::Goodbye {
                                            reason: GoodbyeReason::UsernameTaken,
                                        }).await?;
                                    break;
                                }
                            }
use tokio::sync::mpsc::Sender;
use tokio::sync::RwLock;

                            // username is fine
                            {
                                mgr.usernames.write().await.insert(remote_addr, requested_username.clone());
                            }
#[derive(Clone)]
pub struct ClientManager {
    pub handlers: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
    pub usernames: Arc<RwLock<HashMap<SocketAddr, String>>>,
    pub players: Arc<RwLock<HashMap<SocketAddr, Player>>>
}

                            send!(client_tx, &MessageS2C::Hello {
                                    version,
                                    given_username: requested_username.clone(),
                                    next_state,
                                }).await?;
                            state = next_state;
                            username = requested_username;
                        },
                        MessageC2S::Goodbye { reason } => {
                            info!("client sent goodbye: {:?}", reason);
                            break;
                        },
                        _ => {
                            error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
                            send!(client_tx, &MessageS2C::Goodbye {
                                    reason: GoodbyeReason::UnexpectedPacket,
                                }).await?;
                            break;
                        }
                    }
                }
                State::Play => {
                    match pkt {
                        MessageC2S::Hello { .. } => {
                            error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
                            send!(client_tx, &MessageS2C::Goodbye {
                                    reason: GoodbyeReason::UnexpectedPacket,
                                }).await?;
                            break;
                        },
                        MessageC2S::Goodbye { reason } => {
                            info!("client sent goodbye: {:?}", reason);
                            break;
                        },
                        MessageC2S::Chat { message } => {
                            info!("[{}] CHAT: [{}] {}", remote_addr, username, message);
#[derive(Default)]
pub struct Player {
    pub id: u16,
    pub x: f64,
    pub y: f64,
    pub vel_x: f64,
    pub vel_y: f64,
}

                            for (_addr, client_thread) in mgr.handlers.read().await.iter() {
                                match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await {
                                    Ok(_) => (),
                                    Err(e) => {
                                        error!("unable to update a client thread: {}", e);
                                    }
                                }
                            }
                        },
                        MessageC2S::Ping {} => {
                            send!(client_tx, &MessageS2C::Pong {}).await?;
                            ping_timeout = SystemTime::now() + Duration::from_secs(5);
                        },
                    }
                }
            }
        }
    }
#[derive(Clone)]
pub struct ClientHandler {
    pub tx: Sender<ClientHandlerMessage>
}

    Ok(())
pub enum ClientHandlerMessage {
    Tick,
    ChatMessage { from: String, message: String }
}

M server/src/timer.rs => server/src/timer.rs +1 -1
@@ 3,7 3,7 @@ use std::time::Duration;
use log::{error};

use tokio::time::sleep;
use crate::handler::{ClientHandlerMessage, ClientManager};
use crate::manager::{ClientHandlerMessage, ClientManager};

pub async fn timer_main(mgr: ClientManager) {
    loop {