use std::error::Error; use std::net::SocketAddr; use std::sync::Arc; use std::time::{Duration, SystemTime}; use futures::stream::{SplitSink, SplitStream}; use futures::{FutureExt, SinkExt, StreamExt}; use log::{debug, error, info}; use nalgebra::{vector, point}; use rapier2d_f64::prelude::{RigidBodyBuilder, RigidBodyType, ColliderBuilder, MassProperties, Collider}; use tungstenite::Message; use starkingdoms_protocol::goodbye_reason::GoodbyeReason; use starkingdoms_protocol::message_s2c::{MessageS2CChat, MessageS2CGoodbye, MessageS2CHello, MessageS2CPlanetData, MessageS2CPlayersUpdate, MessageS2CPong}; use starkingdoms_protocol::{MessageS2C, MessageC2S, PROTOCOL_VERSION}; use starkingdoms_protocol::state::State; use crate::manager::{ClientHandlerMessage, ClientManager, PhysicsData, Player}; use crate::{send, recv, SCALE}; use async_std::{sync::RwLock, channel::Receiver}; use async_std::net::TcpStream; use async_tungstenite::WebSocketStream; pub async fn handle_client(mgr: ClientManager, data: Arc>, remote_addr: SocketAddr, rx: Receiver, mut client_tx: SplitSink, Message>, mut client_rx: SplitStream>) -> Result<(), Box> { let mut state = State::Handshake; let mut username = String::new(); let mut ping_timeout = SystemTime::now() + Duration::from_secs(10); loop { if let Ok(msg) = rx.recv().await { match msg { ClientHandlerMessage::Tick => {} // this intentionally does nothing, ClientHandlerMessage::ChatMessage { from, message } => { if matches!(state, State::Play) { let msg = MessageS2C::Chat(MessageS2CChat { from, message, special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; } } ClientHandlerMessage::PlayersUpdate { players } => { if matches!(state, State::Play) { let msg = MessageS2C::PlayersUpdate(MessageS2CPlayersUpdate { players, special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; } } ClientHandlerMessage::PlanetData { planets } => { if matches!(state, State::Play) { let msg = MessageS2C::PlanetData(MessageS2CPlanetData { planets, special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; } } } } else { info!("channel closed, shutting down"); break; } if ping_timeout < SystemTime::now() { error!("[{}] ping timeout", remote_addr); let msg = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::PingPongTimeout.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; } if let Some(pkt) = recv!(client_rx)? { match state { State::Handshake => { match pkt { MessageC2S::Hello(pkt) => { info!("client sent hello"); if !matches!(pkt.next_state.unwrap(), State::Play) { error!("client sent unexpected state {:?} (expected: Play)", pkt.next_state); let msg = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::UnexpectedNextState.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; } // check version if pkt.version != PROTOCOL_VERSION { error!("client sent incompatible version {} (expected: {})", pkt.version, PROTOCOL_VERSION); let msg = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::UnsupportedProtocol.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; } // determine if we can give them that username { if mgr.usernames.read().await.values().any(|u| *u == pkt.requested_username) { error!("client requested username {} but it is in use", pkt.requested_username); let msg: Vec = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::UsernameTaken.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; } } // username is fine { mgr.usernames.write().await.insert(remote_addr, pkt.requested_username.clone()); } let msg = MessageS2C::Hello(MessageS2CHello { version: pkt.version, given_username: pkt.requested_username.clone(), special_fields: Default::default(), next_state: pkt.next_state, }).try_into()?; send!(client_tx, msg).await?; state = pkt.next_state.unwrap(); username = pkt.requested_username; // make player rigidbody { let mut data_handle = data.write().await; let mut rigid_body_set = data_handle.rigid_body_set.clone(); let mut collider_set = data_handle.collider_set.clone(); let player_body = RigidBodyBuilder::new(RigidBodyType::Dynamic) .translation(vector![0.0, 2100.0/SCALE]) .build(); let player_collider: Collider = ColliderBuilder::cuboid(25.0 / SCALE, 25.0 / SCALE) .mass_properties(MassProperties::new(point![0.0, 0.0], 250.0, 61250.0)) //.mass(75.0) .build(); let player_handle = rigid_body_set.insert(player_body); collider_set.insert_with_parent(player_collider, player_handle, &mut rigid_body_set); data_handle.rigid_body_set = rigid_body_set; data_handle.collider_set = collider_set; mgr.players.write().await.insert(remote_addr, Player { handle: player_handle, input: Default::default() }); } }, MessageC2S::Goodbye(pkt) => { info!("client sent goodbye: {:?}", pkt.reason); break; }, _ => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); let msg = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::UnexpectedPacket.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; } } } State::Play => { match pkt { MessageC2S::Hello { .. } => { error!("client sent unexpected packet {:?} for state {:?}", pkt, state); let msg = MessageS2C::Goodbye(MessageS2CGoodbye { reason: GoodbyeReason::UnexpectedPacket.into(), special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; break; }, MessageC2S::Goodbye(pkt) => { info!("client sent goodbye: {:?}", pkt.reason); break; }, MessageC2S::Chat(pkt) => { info!("[{}] CHAT: [{}] {}", remote_addr, username, pkt.message); for (_addr, client_thread) in mgr.handlers.read().await.iter() { match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: pkt.message.clone() }).await { Ok(_) => (), Err(e) => { error!("unable to update a client thread: {}", e); } } } }, MessageC2S::Ping(_) => { let msg = MessageS2C::Pong(MessageS2CPong { special_fields: Default::default(), }).try_into()?; send!(client_tx, msg).await?; ping_timeout = SystemTime::now() + Duration::from_secs(10); }, MessageC2S::Input(p) => { let mut players = mgr.players.write().await; let me = players.get_mut(&remote_addr).expect("player disconnected but continued to send packets"); me.input.up = p.up_pressed; me.input.down = p.down_pressed; me.input.left = p.left_pressed; me.input.right = p.right_pressed; } } } } } } Ok(()) }