~starkingdoms/starkingdoms

3e4a3a4f9303e56a129c864904a867ad739086f7 — core 2 years ago ed12e3e
move tcp handler into its own file
4 files changed, 184 insertions(+), 177 deletions(-)

M server/src/entity.rs
M server/src/main.rs
A server/src/tcp_handler.rs
M server/src/timer.rs
M server/src/entity.rs => server/src/entity.rs +0 -2
@@ 1,6 1,4 @@
use std::{collections::HashMap, net::SocketAddr, sync::atomic::AtomicU32};

use log::debug;
use nalgebra::Vector2;
use protobuf::SpecialFields;
use starkingdoms_protocol::planet::PlanetType;

M server/src/main.rs => server/src/main.rs +4 -171
@@ 14,16 14,12 @@
#![allow(clippy::missing_safety_doc)]
#![allow(clippy::missing_panics_doc)]

use crate::entity::Entity;
use crate::handler::handle_client;
use crate::manager::{ClientHandler, ClientManager};
use crate::manager::{ClientManager};
use crate::timer::timer_main;
use async_std::io::WriteExt;
use async_std::net::{TcpListener, TcpStream};
use async_std::net::{TcpListener};
use async_std::sync::Arc;
use async_std::sync::RwLock;
use entity::EntityHandler;
use futures::StreamExt;
use lazy_static::lazy_static;
use log::{error, info, warn, Level};
use manager::PhysicsData;


@@ 34,12 30,11 @@ use rapier2d_f64::prelude::{
    MultibodyJointSet, NarrowPhase, RigidBodySet,
};
use serde::{Deserialize, Serialize};
use starkingdoms_protocol::PROTOCOL_VERSION;
use std::collections::HashMap;
use std::error::Error;
use std::net::SocketAddr;
use std::thread;
use std::time::Duration;
use crate::tcp_handler::handle_request;

pub mod handler;
pub mod manager;


@@ 51,172 46,10 @@ pub mod entity;
pub mod module;
pub mod orbit;
pub mod planet;
pub mod tcp_handler;

const SCALE: f64 = 10.0;

async fn handle_request(
    conn: TcpStream,
    remote_addr: SocketAddr,
    mgr: ClientManager,
    entities: Arc<RwLock<EntityHandler>>,
    physics_data: Arc<RwLock<PhysicsData>>,
) {
    match _handle_request(conn, remote_addr, mgr, entities, physics_data).await {
        Ok(_) => (),
        Err(e) => {
            error!("[{}] error in handler thread: {}", remote_addr, e);
        }
    }
}

async fn _handle_request(
    mut conn: TcpStream,
    remote_addr: SocketAddr,
    mgr: ClientManager,
    entities: Arc<RwLock<EntityHandler>>,
    physics_data: Arc<RwLock<PhysicsData>>,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let mut peek_buf = [0u8; 9];

    loop {
        let read = conn.peek(&mut peek_buf).await?;
        if read == 9 {
            break;
        }
    }

    if peek_buf == *b"GET /ping" {
        info!("[{}] incoming http connection", remote_addr);
        let ping_resp = serde_json::to_string(&ServerPingResponse {
            version: ServerPingResponseVersion {
                name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs
                number: env!("STK_VERSION").to_string(),    // Set by build.rs
                protocol: PROTOCOL_VERSION,
                channel: env!("STK_CHANNEL").to_string(),
                build: env!("STK_BUILD").to_string(),
            },
            players: u32::try_from(CMGR.usernames.read().await.len())?,
            description: env!("STK_SLP_DESCRIPTION").to_string(),
        })?;

        let resp_str = format!(
            "HTTP/1.0 200 OK\nAccess-Control-Allow-Origin: *\nContent-Length: {}\n\n{}",
            ping_resp.len(),
            ping_resp
        );
        let http_resp = resp_str.as_bytes();

        conn.write_all(http_resp).await?;
        info!(
            "[{}] sent ping response (200 OK {} bytes)",
            remote_addr,
            ping_resp.len()
        );
        return Ok(());
    }
    info!("[{}] incoming websocket connection", remote_addr);

    // if its not GET /ping, assume its websocket and attempt to handshake with them
    let ws_stream = async_tungstenite::accept_async(conn).await?;

    let (ws_write, ws_read) = ws_stream.split();

    let (tx, rx) = async_std::channel::unbounded();

    let client = ClientHandler { tx };

    // Acquire the write lock in a small scope, so it's dropped as quickly as possible
    {
        mgr.handlers.write().await.insert(remote_addr, client);
    }

    info!("[{}] passing to client handler", remote_addr);

    //forward the stream to the sink to achieve echo
    match handle_client(
        mgr.clone(),
        entities.clone(),
        physics_data.clone(),
        remote_addr,
        rx,
        ws_write,
        ws_read,
    )
    .await
    {
        Ok(_) => (),
        Err(e) if e.is::<async_tungstenite::tungstenite::error::Error>() => {
            #[allow(clippy::expect_used)]
            let e = {
                e.downcast::<async_tungstenite::tungstenite::error::Error>()
                    .expect("unable to convert between types safely")
            };

            if matches!(*e, async_tungstenite::tungstenite::Error::ConnectionClosed) {
                info!("[{}] connection closed normally", remote_addr);
            } else {
                error!("[{}] error in client thread: {}", remote_addr, e);
            }
        }
        Err(e) => {
            error!("[{}] error in client thread: {}", remote_addr, e);
        }
    }

    // clean up values left over
    {
        mgr.handlers.write().await.remove(&remote_addr);
        mgr.usernames.write().await.remove(&remote_addr);
        // remove player physics body
        let mut entity_write = entities.write().await;
        let mut data = physics_data.write().await;
        let mut rigid_body_set = data.rigid_body_set.clone();
        let mut island_manager = data.island_manager.clone();
        let mut collider_set = data.collider_set.clone();
        let mut impulse_joint_set = data.impulse_joint_set.clone();
        let mut multibody_joint_set = data.multibody_joint_set.clone();
        let Some(player_id) = entity_write.get_player_id(remote_addr) else {
                warn!("[{}] player missing from entities.players", remote_addr);
                return Err("Player missing from entities.players".into());
        };

        if let Some(Entity::Player(player)) = entity_write.entities.get(&player_id) {
            rigid_body_set.remove(
                player.handle,
                &mut island_manager,
                &mut collider_set,
                &mut impulse_joint_set,
                &mut multibody_joint_set,
                true,
            );
            for module in player.search_modules(&entity_write) {
                rigid_body_set.remove(
                    module.handle,
                    &mut island_manager,
                    &mut collider_set,
                    &mut impulse_joint_set,
                    &mut multibody_joint_set,
                    true,
                );
                let module_id = entity_write
                    .get_id_from_attached(&module)
                    .ok_or("Tried to remove nonexistent module")?;
                entity_write.entities.remove(&module_id);
            }
        } else {
            warn!("tried to remove player that doesnt exist: #{}", player_id);
        }
        data.rigid_body_set = rigid_body_set;
        data.collider_set = collider_set;
        data.island_manager = island_manager;
        data.impulse_joint_set = impulse_joint_set;
        data.multibody_joint_set = multibody_joint_set;
        entity_write.entities.remove(&player_id);
    }

    Ok(())
}

lazy_static! {
    static ref CMGR: ClientManager = ClientManager {
        handlers: Arc::new(RwLock::new(HashMap::default())),

A server/src/tcp_handler.rs => server/src/tcp_handler.rs +176 -0
@@ 0,0 1,176 @@
use std::error::Error;
use std::net::SocketAddr;
use std::sync::Arc;
use async_std::io::WriteExt;
use async_std::net::TcpStream;
use async_std::sync::RwLock;
use log::{error, info, warn};
use starkingdoms_protocol::PROTOCOL_VERSION;
use crate::entity::{Entity, EntityHandler};
use crate::manager::{ClientHandler, ClientManager, PhysicsData};
use crate::{CMGR, ServerPingResponse, ServerPingResponseVersion};
use crate::handler::handle_client;
use futures::StreamExt;

pub async fn handle_request(
    conn: TcpStream,
    remote_addr: SocketAddr,
    mgr: ClientManager,
    entities: Arc<RwLock<EntityHandler>>,
    physics_data: Arc<RwLock<PhysicsData>>,
) {
    match _handle_request(conn, remote_addr, mgr, entities, physics_data).await {
        Ok(_) => (),
        Err(e) => {
            error!("[{}] error in handler thread: {}", remote_addr, e);
        }
    }
}

async fn _handle_request(
    mut conn: TcpStream,
    remote_addr: SocketAddr,
    mgr: ClientManager,
    entities: Arc<RwLock<EntityHandler>>,
    physics_data: Arc<RwLock<PhysicsData>>,
) -> Result<(), Box<dyn Error + Sync + Send>> {
    let mut peek_buf = [0u8; 9];

    loop {
        let read = conn.peek(&mut peek_buf).await?;
        if read == 9 {
            break;
        }
    }

    if peek_buf == *b"GET /ping" {
        info!("[{}] incoming http connection", remote_addr);
        let ping_resp = serde_json::to_string(&ServerPingResponse {
            version: ServerPingResponseVersion {
                name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs
                number: env!("STK_VERSION").to_string(),    // Set by build.rs
                protocol: PROTOCOL_VERSION,
                channel: env!("STK_CHANNEL").to_string(),
                build: env!("STK_BUILD").to_string(),
            },
            players: u32::try_from(CMGR.usernames.read().await.len())?,
            description: env!("STK_SLP_DESCRIPTION").to_string(),
        })?;

        let resp_str = format!(
            "HTTP/1.0 200 OK\nAccess-Control-Allow-Origin: *\nContent-Length: {}\n\n{}",
            ping_resp.len(),
            ping_resp
        );
        let http_resp = resp_str.as_bytes();

        conn.write_all(http_resp).await?;
        info!(
            "[{}] sent ping response (200 OK {} bytes)",
            remote_addr,
            ping_resp.len()
        );
        return Ok(());
    }
    info!("[{}] incoming websocket connection", remote_addr);

    // if its not GET /ping, assume its websocket and attempt to handshake with them
    let ws_stream = async_tungstenite::accept_async(conn).await?;

    let (ws_write, ws_read) = ws_stream.split();

    let (tx, rx) = async_std::channel::unbounded();

    let client = ClientHandler { tx };

    // Acquire the write lock in a small scope, so it's dropped as quickly as possible
    {
        mgr.handlers.write().await.insert(remote_addr, client);
    }

    info!("[{}] passing to client handler", remote_addr);

    //forward the stream to the sink to achieve echo
    match handle_client(
        mgr.clone(),
        entities.clone(),
        physics_data.clone(),
        remote_addr,
        rx,
        ws_write,
        ws_read,
    )
        .await
    {
        Ok(_) => (),
        Err(e) if e.is::<async_tungstenite::tungstenite::error::Error>() => {
            #[allow(clippy::expect_used)]
                let e = {
                e.downcast::<async_tungstenite::tungstenite::error::Error>()
                    .expect("unable to convert between types safely")
            };

            if matches!(*e, async_tungstenite::tungstenite::Error::ConnectionClosed) {
                info!("[{}] connection closed normally", remote_addr);
            } else {
                error!("[{}] error in client thread: {}", remote_addr, e);
            }
        }
        Err(e) => {
            error!("[{}] error in client thread: {}", remote_addr, e);
        }
    }

    // clean up values left over
    {
        mgr.handlers.write().await.remove(&remote_addr);
        mgr.usernames.write().await.remove(&remote_addr);
        // remove player physics body
        let mut entity_write = entities.write().await;
        let mut data = physics_data.write().await;
        let mut rigid_body_set = data.rigid_body_set.clone();
        let mut island_manager = data.island_manager.clone();
        let mut collider_set = data.collider_set.clone();
        let mut impulse_joint_set = data.impulse_joint_set.clone();
        let mut multibody_joint_set = data.multibody_joint_set.clone();
        let Some(player_id) = entity_write.get_player_id(remote_addr) else {
            warn!("[{}] player missing from entities.players", remote_addr);
            return Err("Player missing from entities.players".into());
        };

        if let Some(Entity::Player(player)) = entity_write.entities.get(&player_id) {
            rigid_body_set.remove(
                player.handle,
                &mut island_manager,
                &mut collider_set,
                &mut impulse_joint_set,
                &mut multibody_joint_set,
                true,
            );
            for module in player.search_modules(&entity_write) {
                rigid_body_set.remove(
                    module.handle,
                    &mut island_manager,
                    &mut collider_set,
                    &mut impulse_joint_set,
                    &mut multibody_joint_set,
                    true,
                );
                let module_id = entity_write
                    .get_id_from_attached(&module)
                    .ok_or("Tried to remove nonexistent module")?;
                entity_write.entities.remove(&module_id);
            }
        } else {
            warn!("tried to remove player that doesnt exist: #{}", player_id);
        }
        data.rigid_body_set = rigid_body_set;
        data.collider_set = collider_set;
        data.island_manager = island_manager;
        data.impulse_joint_set = impulse_joint_set;
        data.multibody_joint_set = multibody_joint_set;
        entity_write.entities.remove(&player_id);
    }

    Ok(())
}
\ No newline at end of file

M server/src/timer.rs => server/src/timer.rs +4 -4
@@ 5,12 5,12 @@ use crate::orbit::orbit::{calculate_point_on_orbit, calculate_world_position_of_
use crate::{
    entity::{get_entity_id, Entity},
    manager::{ClientHandlerMessage, ClientManager, PhysicsData},
    planet::{Planet, Planets},
    planet::{Planets},
    SCALE,
};
use async_std::sync::RwLock;
use async_std::task::sleep;
use log::{warn, debug};
use log::{warn};
use nalgebra::{point, vector};
use protobuf::SpecialFields;
use rand::Rng;


@@ 88,7 88,7 @@ pub async fn timer_main(
                        .rigid_body_set
                        .get_mut(moon.body_handle)
                        .ok_or("moon does not exist")?;
                    moon_body.set_next_kinematic_translation(new_moon_position.into());
                    moon_body.set_next_kinematic_translation(new_moon_position);
                    moon.position = (
                        moon_body.translation()[0],
                        moon_body.translation()[1],


@@ 108,7 108,7 @@ pub async fn timer_main(
                            .rigid_body_set
                            .get_mut(mars.body_handle)
                            .ok_or("mars does not exist")?;
                        mars_body.set_next_kinematic_translation(new_mars_position.into());
                        mars_body.set_next_kinematic_translation(new_mars_position);
                        mars.position = (
                            mars_body.translation()[0],
                            mars_body.translation()[1],