use std::error::Error;
use std::f64::consts::PI;
use std::net::SocketAddr;
use std::sync::Arc;
use std::time::{Duration, SystemTime};
use futures::stream::{SplitSink, SplitStream};
use futures::{FutureExt, SinkExt, StreamExt};
use log::{error, info, debug, warn};
use nalgebra::{vector, point};
use rand::Rng;
use rapier2d_f64::prelude::{RigidBodyBuilder, RigidBodyType, ColliderBuilder, MassProperties, Collider};
use tungstenite::Message;
use starkingdoms_protocol::goodbye_reason::GoodbyeReason;
use starkingdoms_protocol::message_s2c::{MessageS2CChat, MessageS2CGoodbye, MessageS2CHello, MessageS2CPlanetData, MessageS2CPlayersUpdate, MessageS2CPong, MessageS2CModulesUpdate};
use starkingdoms_protocol::{MessageS2C, MessageC2S, PROTOCOL_VERSION};
use starkingdoms_protocol::state::State;
use crate::entity::{EntityHandler, get_entity_id, Entity};
use crate::manager::{ClientHandlerMessage, ClientManager, PhysicsData, Player};
use crate::{send, recv, SCALE};
use async_std::{sync::RwLock, channel::Receiver};
use async_std::net::TcpStream;
use async_tungstenite::WebSocketStream;
use crate::api::{load_player_data_from_api, save_player_data_to_api};
pub async fn handle_client(mgr: ClientManager, entities: Arc<RwLock<EntityHandler>>, data: Arc<RwLock<PhysicsData>>,
remote_addr: SocketAddr, rx: Receiver<ClientHandlerMessage>,
mut client_tx: SplitSink<WebSocketStream<TcpStream>, Message>, mut client_rx: SplitStream<WebSocketStream<TcpStream>>
) -> Result<(), Box<dyn Error>> {
let mut state = State::Handshake;
let mut username = String::new();
let mut ping_timeout = SystemTime::now() + Duration::from_secs(10);
loop {
if let Ok(msg) = rx.recv().await {
match msg {
ClientHandlerMessage::Tick => {} // this intentionally does nothing,
ClientHandlerMessage::ChatMessage { from, message } => {
if matches!(state, State::Play) {
let msg = MessageS2C::Chat(MessageS2CChat {
from,
message,
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
}
}
ClientHandlerMessage::PlayersUpdate { players } => {
if matches!(state, State::Play) {
let msg = MessageS2C::PlayersUpdate(MessageS2CPlayersUpdate {
players,
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
}
}
ClientHandlerMessage::PlanetData { planets } => {
if matches!(state, State::Play) {
let msg = MessageS2C::PlanetData(MessageS2CPlanetData {
planets,
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
}
}
ClientHandlerMessage::ModulesUpdate { modules } => {
if matches!(state, State::Play) {
let msg = MessageS2C::ModulesUpdate(MessageS2CModulesUpdate {
modules,
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
}
}
}
} else {
info!("channel closed, shutting down");
break;
}
if ping_timeout < SystemTime::now() {
error!("[{}] ping timeout", remote_addr);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::PingPongTimeout.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
if let Some(pkt) = recv!(client_rx)? {
match state {
State::Handshake => {
match pkt {
MessageC2S::Hello(pkt) => {
info!("client sent hello");
if !matches!(pkt.next_state.unwrap(), State::Play) {
error!("client sent unexpected state {:?} (expected: Play)", pkt.next_state);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::UnexpectedNextState.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
// check version
if pkt.version != PROTOCOL_VERSION {
error!("client sent incompatible version {} (expected: {})", pkt.version, PROTOCOL_VERSION);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::UnsupportedProtocol.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
// determine if we can give them that username
{
if mgr.usernames.read().await.values().any(|u| *u == pkt.requested_username) {
error!("client requested username {} but it is in use", pkt.requested_username);
let msg: Vec<u8> = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::UsernameTaken.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
}
// username is fine
{
mgr.usernames.write().await.insert(remote_addr, pkt.requested_username.clone());
}
let msg = MessageS2C::Hello(MessageS2CHello {
version: pkt.version,
given_username: pkt.requested_username.clone(),
special_fields: Default::default(),
next_state: pkt.next_state,
}).try_into()?;
send!(client_tx, msg).await?;
state = pkt.next_state.unwrap();
username = pkt.requested_username;
// make player rigidbody
{
let mut data_handle = data.write().await;
let mut rigid_body_set = data_handle.rigid_body_set.clone();
let mut collider_set = data_handle.collider_set.clone();
let angle: f64 = {
let mut rng = rand::thread_rng();
rng.gen::<f64>() * PI * 2.
};
let player_body = RigidBodyBuilder::new(RigidBodyType::Dynamic)
.translation(vector![angle.cos() * 2050. / SCALE, angle.sin() * 2050.0/SCALE])
.rotation(angle + PI / 2.)
.build();
let player_collider: Collider = ColliderBuilder::cuboid(25.0 / SCALE, 25.0 / SCALE)
.mass_properties(MassProperties::new(point![0.0, 0.0], 120.0, 122500.0))
.build();
let player_handle = rigid_body_set.insert(player_body);
collider_set.insert_with_parent(player_collider, player_handle, &mut rigid_body_set);
data_handle.rigid_body_set = rigid_body_set;
data_handle.collider_set = collider_set;
let mut player = Player {
handle: player_handle,
input: Default::default(),
addr: remote_addr,
auth_token: pkt.token.clone(),
auth_user: pkt.user.clone()
};
let mut e_write_handle = entities.write().await;
if let Some(user) = pkt.user {
if let Some(token) = pkt.token {
info!("[{}] * Beamin: beaming in {} as {} with token {}", remote_addr, username, user, token);
let player_data = match load_player_data_from_api(&token, &user, &std::env::var("STK_API_KEY").unwrap()).await {
Ok(d) => d,
Err(e) => {
warn!("[{}] * Beamin: ABORTED. API returned error: {}", remote_addr, e);
e_write_handle.entities.insert(get_entity_id(), Entity::Player(player));
continue;
}
};
player.load_api_data(&player_data);
}
}
e_write_handle.entities.insert(get_entity_id(), Entity::Player(player));
debug!("running");
}
},
MessageC2S::Goodbye(pkt) => {
info!("client sent goodbye: {:?}", pkt.reason);
break;
},
_ => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::UnexpectedPacket.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
}
}
State::Play => {
match pkt {
MessageC2S::Hello { .. } => {
error!("client sent unexpected packet {:?} for state {:?}", pkt, state);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::UnexpectedPacket.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
},
MessageC2S::Goodbye(pkt) => {
info!("client sent goodbye: {:?}", pkt.reason);
break;
},
MessageC2S::Chat(pkt) => {
info!("[{}] CHAT: [{}] {}", remote_addr, username, pkt.message);
for (_addr, client_thread) in mgr.handlers.read().await.iter() {
match client_thread.tx.send(ClientHandlerMessage::ChatMessage { from: username.clone(), message: pkt.message.clone() }).await {
Ok(_) => (),
Err(e) => {
error!("unable to update a client thread: {}", e);
}
}
}
},
MessageC2S::Ping(_) => {
let msg = MessageS2C::Pong(MessageS2CPong {
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
ping_timeout = SystemTime::now() + Duration::from_secs(10);
},
MessageC2S::Input(p) => {
let mut handle = entities.write().await;
let id = handle.get_player_id(remote_addr)
.expect("could not get player id");
if let Entity::Player(ref mut me) = handle.entities.get_mut(&id)
.expect("player disconnected but continued to send packets") {
me.input.up = p.up_pressed;
me.input.down = p.down_pressed;
me.input.left = p.left_pressed;
me.input.right = p.right_pressed;
}
},
MessageC2S::AuthenticateAndBeamOut(p) => {
info!("[{}] * Beaming out {} as {} with realm token {}", remote_addr, username, p.user_id, p.token);
let player = entities.read().await.get_player(remote_addr).expect("Player sending messages after disconnect");
if Some(p.token) != player.auth_token || Some(p.user_id) != player.auth_user {
warn!("[{}] invalid beamout packet, ignoring", remote_addr);
continue;
}
match save_player_data_to_api(&player.as_api_data(), &player.auth_token.unwrap(), &player.auth_user.unwrap(), &std::env::var("STK_API_KEY").unwrap()).await {
Ok(_) => {
info!("[{}] * Beamed out successfully", remote_addr);
let msg = MessageS2C::Goodbye(MessageS2CGoodbye {
reason: GoodbyeReason::Done.into(),
special_fields: Default::default(),
}).try_into()?;
send!(client_tx, msg).await?;
break;
}
Err(e) => {
error!("[{}] error beaming out: {}", remote_addr, e);
}
}
}
}
}
}
}
}
Ok(())
}