use std::error::Error; use std::net::SocketAddr; use futures::stream::{SplitSink, SplitStream}; use futures::{SinkExt, StreamExt}; use hyper::upgrade::Upgraded; use log::{error, info}; use tokio::sync::mpsc::Receiver; use tokio_tungstenite::WebSocketStream; use tungstenite::Message; use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, State}; use crate::handler::{ClientHandler, ClientHandlerMessage, ClientManager}; pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver, mut write: SplitSink, Message>, mut read: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; loop { if let Some(msg) = rx.recv().await { match msg { ClientHandlerMessage::Tick => {} // this intentionally does nothing } } else { info!("channel closed, shutting down"); break; } if let Some(msg) = read.next().await { let msg = msg?; if msg.is_binary() { // try to deserialize the msg let pkt: MessageC2S = rmp_serde::from_slice(&msg.into_data())?; match state { State::Handshake => { match pkt { MessageC2S::Hello { version, requested_username, next_state } => { if !matches!(next_state, State::Play) { error!("client sent unexpected state {:?} (expected: Play)", next_state); write.send(Message::from(ps2c(&MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedNextState, }))).await?; break; } // check version if version != PROTOCOL_VERSION { error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION); write.send(Message::from(ps2c(&MessageS2C::Goodbye { reason: GoodbyeReason::UnsupportedProtocol { supported: PROTOCOL_VERSION, got: version, }, }))).await?; break; } // determine if we can give them that username { if mgr.usernames.read().await.values().into_iter().any(|u| *u == requested_username) { error!("client requested username {} but it is in use", requested_username); write.send(Message::from(ps2c(&MessageS2C::Goodbye { reason: GoodbyeReason::UsernameTaken, }))).await?; break; } } // username is fine { mgr.usernames.write().await.insert(remote_addr, requested_username.clone()); } write.send(Message::from(ps2c(&MessageS2C::Hello { version, given_username: requested_username, next_state, }))).await?; }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; } } } State::Play => { match pkt { MessageC2S::Hello { .. } => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); write.send(Message::from(ps2c(&MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, }))).await?; break; } MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; } } } } } } } Ok(()) }