use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use hyper::upgrade::Upgraded;
use log::{error, info, warn};
use nalgebra::vector;
use rapier2d_f64::prelude::{RigidBodyBuilder, RigidBodyType, ColliderBuilder};
use tokio::sync::RwLock;
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, ProtocolPlanet, PlanetType, PROTOCOL_VERSION, State};
use starkingdoms_protocol::GoodbyeReason::PingPongTimeout;
use crate::manager::{ClientHandlerMessage, ClientManager, PhysicsData, Player};
use crate::{send, recv, SCALE};
pub async fn handle_client(mgr: ClientManager, data: Arc<RwLock<PhysicsData>>, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
let mut state = State::Handshake;
let mut username = String::new();
let mut ping_timeout = SystemTime::now() + Duration::from_secs(5);
loop {
if let Some(msg) = rx.recv().await {
match msg {
ClientHandlerMessage::Tick => {} // this intentionally does nothing,
ClientHandlerMessage::ChatMessage { from, message } => {
send!(client_tx, &MessageS2C::Chat {
message,
from
}).await?;
}
ClientHandlerMessage::Position { x, y } => {
send!(client_tx, &MessageS2C::Position {
x,
y
}).await?;
}
ClientHandlerMessage::PlanetData { planets } => {
send!(client_tx, &MessageS2C::PlanetData {
planets
}).await?;
}
}
} else {
info!("channel closed, shutting down");
break;
}
if ping_timeout < SystemTime::now() {
warn!("[{}] ping timeout", remote_addr);
send!(client_tx, &MessageS2C::Goodbye {
reason: PingPongTimeout
}).await?;
break;
}
if let Some(pkt) = recv!(client_rx)? {
match state {
State::Handshake => {
match pkt {
MessageC2S::Hello { version, requested_username, next_state } => {
if !matches!(next_state, State::Play) {
error!("client sent unexpected state {:?} (expected: Play)", next_state);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedNextState,
}).await?;
break;
}
// check version
if version != PROTOCOL_VERSION {
error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UnsupportedProtocol {
supported: PROTOCOL_VERSION,
got: version,
},
}).await?;
break;
}
// determine if we can give them that username
{
if mgr.usernames.read().await.values().any(|u| *u == requested_username) {
error!("client requested username {} but it is in use", requested_username);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UsernameTaken,
}).await?;
break;
}
}
// username is fine
{
mgr.usernames.write().await.insert(remote_addr, requested_username.clone());
}
send!(client_tx, &MessageS2C::Hello {
version,
given_username: requested_username.clone(),
next_state,
}).await?;
state = next_state;
username = requested_username;
// make player rigidbody
{
let mut data_handle = data.write().await;
let mut rigid_body_set = data_handle.rigid_body_set.clone();
let mut collider_set = data_handle.collider_set.clone();
let player_body = RigidBodyBuilder::new(RigidBodyType::Dynamic)
.translation(vector![0.0, 2100.0/SCALE])
.build();
let player_collider = ColliderBuilder::cuboid(1.0 / SCALE, 1.0 / SCALE).build();
let player_handle = rigid_body_set.insert(player_body);
collider_set.insert_with_parent(player_collider, player_handle, &mut rigid_body_set);
data_handle.rigid_body_set = rigid_body_set;
data_handle.collider_set = collider_set;
mgr.players.write().await.insert(remote_addr, Player { handle: player_handle });
}
},
MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason);
break;
},
_ => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedPacket,
}).await?;
break;
}
}
}
State::Play => {
match pkt {
MessageC2S::Hello { .. } => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
send!(client_tx, &MessageS2C::Goodbye {
reason: GoodbyeReason::UnexpectedPacket,
}).await?;
break;
},
MessageC2S::Goodbye { reason } => {
info!("client sent goodbye: {:?}", reason);
break;
},
MessageC2S::Chat { message } => {
info!("[{}] CHAT: [{}] {}", remote_addr, username, message);
for (_addr, client_thread) in mgr.handlers.read().await.iter() {
match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await {
Ok(_) => (),
Err(e) => {
error!("unable to update a client thread: {}", e);
}
}
}
},
MessageC2S::Ping {} => {
send!(client_tx, &MessageS2C::Pong {}).await?;
ping_timeout = SystemTime::now() + Duration::from_secs(5);
}
}
}
}
}
}
Ok(())
}