use std::error::Error;
use std::net::SocketAddr;
use futures::stream::{SplitSink, SplitStream};
use futures::{SinkExt, StreamExt};
use hyper::upgrade::Upgraded;
use log::{error, info};
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, State};
use crate::handler::{ClientHandler, ClientHandlerMessage, ClientManager};
pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut write: SplitSink<WebSocketStream<Upgraded>, Message>, mut read: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
let mut state = State::Handshake;
loop {
if let Some(msg) = rx.recv().await {
match msg {
ClientHandlerMessage::Tick => {} // this intentionally does nothing
}
} else {
info!("channel closed, shutting down");
break;
}
if let Some(msg) = read.next().await {
let msg = msg?;
if msg.is_binary() {
// try to deserialize the msg
let pkt: MessageC2S = rmp_serde::from_slice(&msg.into_data())?;
match state {
State::Handshake => {
match pkt {
MessageC2S::Hello { version, requested_username, next_state } => {
if !matches!(next_state, State::Play) {
error!("client sent unexpected state {:?} (expected: Play)", next_state);
write.send(Message::from(ps2c(&MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedNextState,
}))).await?;
break;
}
// check version
if version != PROTOCOL_VERSION {
error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION);
write.send(Message::from(ps2c(&MessageS2C::Goodbye {
reason: GoodbyeReason::UnsupportedProtocol {
supported: PROTOCOL_VERSION,
got: version,
},
}))).await?;
break;
}
// determine if we can give them that username
{
if mgr.usernames.read().await.values().into_iter().any(|u| *u == requested_username) {
error!("client requested username {} but it is in use", requested_username);
write.send(Message::from(ps2c(&MessageS2C::Goodbye {
reason: GoodbyeReason::UsernameTaken,
}))).await?;
break;
}
}
// username is fine
{
mgr.usernames.write().await.insert(remote_addr, requested_username.clone());
}
write.send(Message::from(ps2c(&MessageS2C::Hello {
version,
given_username: requested_username,
next_state,
}))).await?;
},
MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason);
break;
}
}
}
State::Play => {
match pkt {
MessageC2S::Hello { .. } => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
write.send(Message::from(ps2c(&MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedPacket,
}))).await?;
break;
}
MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason);
break;
}
}
}
}
}
}
}
Ok(())
}