use std::error::Error; use std::net::SocketAddr; use std::time::{Duration, SystemTime}; use futures::stream::{SplitSink, SplitStream}; use futures::{FutureExt, SinkExt, StreamExt}; use hyper::upgrade::Upgraded; use log::{error, info, warn}; use tokio::sync::mpsc::Receiver; use tokio_tungstenite::WebSocketStream; use tungstenite::Message; use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State}; use starkingdoms_protocol::GoodbyeReason::PingPongTimeout; use crate::handler::{ClientHandlerMessage, ClientManager}; use crate::{send, recv}; pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; let mut username = String::new(); let mut ping_timeout = SystemTime::now() + Duration::from_secs(5); loop { if let Some(msg) = rx.recv().await { match msg { ClientHandlerMessage::Tick => {} // this intentionally does nothing, ClientHandlerMessage::ChatMessage { from, message } => { send!(client_tx, &MessageS2C::Chat { message, from }).await?; } } } else { info!("channel closed, shutting down"); break; } if ping_timeout < SystemTime::now() { warn!("[{}] ping timeout", remote_addr); send!(client_tx, &MessageS2C::Goodbye { reason: PingPongTimeout }).await?; break; } if let Some(pkt) = recv!(client_rx)? { match state { State::Handshake => { match pkt { MessageC2S::Hello { version, requested_username, next_state } => { if !matches!(next_state, State::Play) { error!("client sent unexpected state {:?} (expected: Play)", next_state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedNextState, }).await?; break; } // check version if version != PROTOCOL_VERSION { error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnsupportedProtocol { supported: PROTOCOL_VERSION, got: version, }, }).await?; break; } // determine if we can give them that username { if mgr.usernames.read().await.values().any(|u| *u == requested_username) { error!("client requested username {} but it is in use", requested_username); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UsernameTaken, }).await?; break; } } // username is fine { mgr.usernames.write().await.insert(remote_addr, requested_username.clone()); } send!(client_tx, &MessageS2C::Hello { version, given_username: requested_username.clone(), next_state, }).await?; state = next_state; username = requested_username; }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; }, _ => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, }).await?; break; } } } State::Play => { match pkt { MessageC2S::Hello { .. } => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, }).await?; break; }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; }, MessageC2S::Chat { message } => { info!("[{}] CHAT: [{}] {}", remote_addr, username, message); for (_addr, client_thread) in mgr.handlers.read().await.iter() { match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await { Ok(_) => (), Err(e) => { error!("unable to update a client thread: {}", e); } } } }, MessageC2S::Ping {} => { send!(client_tx, &MessageS2C::Pong {}).await?; ping_timeout = SystemTime::now() + Duration::from_secs(5); }, } } } } } Ok(()) }