use std::error::Error; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime}; use futures::stream::{SplitSink, SplitStream}; use futures::{FutureExt, SinkExt, StreamExt}; use hyper::upgrade::Upgraded; use log::{error, info, warn}; use nalgebra::vector; use rapier2d_f64::prelude::{RigidBodyBuilder, RigidBodyType, ColliderBuilder}; use tokio::sync::RwLock; use tokio::sync::mpsc::Receiver; use tokio_tungstenite::WebSocketStream; use tungstenite::Message; use starkingdoms_protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, State}; use starkingdoms_protocol::GoodbyeReason::PingPongTimeout; use crate::manager::{ClientHandlerMessage, ClientManager, PhysicsData, Player}; use crate::{send, recv, SCALE}; pub async fn handle_client(mgr: ClientManager, data: Arc>, remote_addr: SocketAddr, mut rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; let mut username = String::new(); let mut ping_timeout = SystemTime::now() + Duration::from_secs(5); loop { if let Some(msg) = rx.recv().await { match msg { ClientHandlerMessage::Tick => {} // this intentionally does nothing, ClientHandlerMessage::ChatMessage { from, message } => { send!(client_tx, &MessageS2C::Chat { message, from }).await?; } ClientHandlerMessage::Position { x, y } => { send!(client_tx, &MessageS2C::Position { x, y }).await?; } ClientHandlerMessage::PlanetData { planets } => { send!(client_tx, &MessageS2C::PlanetData { planets }).await?; } } } else { info!("channel closed, shutting down"); break; } if ping_timeout < SystemTime::now() { warn!("[{}] ping timeout", remote_addr); send!(client_tx, &MessageS2C::Goodbye { reason: PingPongTimeout }).await?; break; } if let Some(pkt) = recv!(client_rx)? { match state { State::Handshake => { match pkt { MessageC2S::Hello { version, requested_username, next_state } => { if !matches!(next_state, State::Play) { error!("client sent unexpected state {:?} (expected: Play)", next_state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedNextState, }).await?; break; } // check version if version != PROTOCOL_VERSION { error!("client sent incompatible version {} (expected: {})", version, PROTOCOL_VERSION); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnsupportedProtocol { supported: PROTOCOL_VERSION, got: version, }, }).await?; break; } // determine if we can give them that username { if mgr.usernames.read().await.values().any(|u| *u == requested_username) { error!("client requested username {} but it is in use", requested_username); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UsernameTaken, }).await?; break; } } // username is fine { mgr.usernames.write().await.insert(remote_addr, requested_username.clone()); } send!(client_tx, &MessageS2C::Hello { version, given_username: requested_username.clone(), next_state, }).await?; state = next_state; username = requested_username; // make player rigidbody { let mut data_handle = data.write().await; let mut rigid_body_set = data_handle.rigid_body_set.clone(); let mut collider_set = data_handle.collider_set.clone(); let player_body = RigidBodyBuilder::new(RigidBodyType::Dynamic) .translation(vector![0.0, 2100.0/SCALE]) .build(); let player_collider = ColliderBuilder::cuboid(1.0 / SCALE, 1.0 / SCALE).build(); let player_handle = rigid_body_set.insert(player_body); collider_set.insert_with_parent(player_collider, player_handle, &mut rigid_body_set); data_handle.rigid_body_set = rigid_body_set; data_handle.collider_set = collider_set; mgr.players.write().await.insert(remote_addr, Player { handle: player_handle }); } }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; }, _ => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, }).await?; break; } } } State::Play => { match pkt { MessageC2S::Hello { .. } => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); send!(client_tx, &MessageS2C::Goodbye { reason: GoodbyeReason::UnexpectedPacket, }).await?; break; }, MessageC2S::Goodbye { reason } => { info!("client sent goodbye: {:?}", reason); break; }, MessageC2S::Chat { message } => { info!("[{}] CHAT: [{}] {}", remote_addr, username, message); for (_addr, client_thread) in mgr.handlers.read().await.iter() { match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: message.clone() }).await { Ok(_) => (), Err(e) => { error!("unable to update a client thread: {}", e); } } } }, MessageC2S::Ping {} => { send!(client_tx, &MessageS2C::Pong {}).await?; ping_timeout = SystemTime::now() + Duration::from_secs(5); } } } } } } Ok(()) }