use std::error::Error; use std::net::SocketAddr; use async_std::io::WriteExt; use async_std::sync::Arc; use async_std::net::{TcpListener, TcpStream}; use manager::PhysicsData; use nalgebra::vector; use planet::Planets; use rapier2d_f64::prelude::{MultibodyJointSet, ImpulseJointSet, ColliderSet, RigidBodySet, NarrowPhase, BroadPhase, IslandManager, CCDSolver, IntegrationParameters}; use lazy_static::lazy_static; use log::{error, info, Level, warn}; use serde::{Deserialize, Serialize}; use crate::manager::{ClientHandler, ClientManager}; use crate::timer::timer_main; use async_std::sync::RwLock; use futures::StreamExt; use starkingdoms_protocol::PROTOCOL_VERSION; use crate::handler::handle_client; pub mod handler; pub mod manager; pub mod timer; #[macro_use] pub mod macros; pub mod planet; pub mod orbit; const SCALE: f64 = 1.0; async fn handle_request(conn: TcpStream, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc>) { match _handle_request(conn, remote_addr, mgr, physics_data).await { Ok(_) => (), Err(e) => { error!("[{}] error in handler thread: {}", remote_addr, e); } } } async fn _handle_request(mut conn: TcpStream, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc>) -> Result<(), Box> { let mut peek_buf = [0u8; 9]; loop { let read = conn.peek(&mut peek_buf).await?; if read == 9 { break; } } if peek_buf == *b"GET /ping" { info!("[{}] incoming http connection", remote_addr); let ping_resp = serde_json::to_string(&ServerPingResponse { version: ServerPingResponseVersion { name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs number: env!("STK_VERSION").to_string(), // Set by build.rs protocol: PROTOCOL_VERSION, }, players: CMGR.usernames.read().await.len() as u32, description: env!("STK_SLP_DESCRIPTION").to_string(), }).unwrap(); let resp_str = format!("HTTP/1.0 200 OK\nAccess-Control-Allow-Origin: *\nContent-Length: {}\n\n{}", ping_resp.len(), ping_resp); let http_resp = resp_str.as_bytes(); conn.write_all(http_resp).await?; info!("[{}] sent ping response (200 OK {} bytes)", remote_addr, ping_resp.len()); return Ok(()); } info!("[{}] incoming websocket connection", remote_addr); // if its not GET /ping, assume its websocket and attempt to handshake with them let ws_stream = async_tungstenite::accept_async(conn).await?; let (ws_write, ws_read) = ws_stream.split(); let (tx, rx) = async_std::channel::unbounded(); let client = ClientHandler { tx }; // Acquire the write lock in a small scope, so it's dropped as quickly as possible { mgr.handlers.write().await.insert(remote_addr, client); } info!("[{}] passing to client handler", remote_addr); //forward the stream to the sink to achieve echo match handle_client(mgr.clone(), physics_data.clone(), remote_addr, rx, ws_write, ws_read).await { Ok(_) => (), Err(e) if e.is::() => { let e = e.downcast::().unwrap(); if matches!(*e, async_tungstenite::tungstenite::Error::ConnectionClosed) { info!("[{}] connection closed normally", remote_addr); } else { error!("[{}] error in client thread: {}", remote_addr, e); } }, Err(e) => { error!("[{}] error in client thread: {}", remote_addr, e); } } // clean up values left over { mgr.handlers.write().await.remove(&remote_addr); mgr.usernames.write().await.remove(&remote_addr); // remove player physics body let mut data = physics_data.write().await; let mut rigid_body_set = data.rigid_body_set.clone(); let mut island_manager = data.island_manager.clone(); let mut collider_set = data.collider_set.clone(); let mut impulse_joint_set = data.impulse_joint_set.clone(); let mut multibody_joint_set = data.multibody_joint_set.clone(); let handle = match mgr.players.read().await.get(&remote_addr) { Some(s) => s.handle, None => { warn!("[{}] player missing from mgr.players", remote_addr); return Err("Player missing from mgr.players".into()); } }; rigid_body_set.remove(handle, &mut island_manager, &mut collider_set, &mut impulse_joint_set, &mut multibody_joint_set, true); data.rigid_body_set = rigid_body_set; data.collider_set = collider_set; data.island_manager = island_manager; data.impulse_joint_set = impulse_joint_set; data.multibody_joint_set = multibody_joint_set; mgr.players.write().await.remove(&remote_addr); } Ok(()) } /* async fn handle_request(mut request: Request, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc>) -> Result, Infallible> { match (request.uri().path(), request.headers().contains_key(header::UPGRADE)) { //if the request is ws_echo and the request headers contains an Upgrade key ("/ws", true) => { info!("received connection from {}", remote_addr); //assume request is a handshake, so create the handshake response let response = match handshake::server::create_response_with_body(&request, Body::empty) { Ok(response) => { //in case the handshake response creation succeeds, //spawn a task to handle the websocket connection tokio::spawn(async move { //using the hyper feature of upgrading a connection match upgrade::on(&mut request).await { //if successfully upgraded Ok(upgraded) => { info!("[{}] connection upgraded", remote_addr); //create a websocket stream from the upgraded object let ws_stream = WebSocketStream::from_raw_socket( //pass the upgraded object //as the base layer stream of the Websocket upgraded, tungstenite::protocol::Role::Server, None, ).await; //we can split the stream into a sink and a stream let (ws_write, ws_read) = ws_stream.split(); let (tx, rx) = tokio::sync::mpsc::channel(128); let client = ClientHandler { tx, }; // Acquire the write lock in a small scope, so it's dropped as quickly as possible { mgr.handlers.write().await.insert(remote_addr, client); } info!("[{}] passing to client handler", remote_addr); //forward the stream to the sink to achieve echo match handle_client(mgr.clone(), physics_data.clone(), remote_addr, rx, ws_write, ws_read).await { Ok(_) => {}, Err(e) => error!("error on WS connection {}: {}", remote_addr, e), }; // clean up values left over { mgr.handlers.write().await.remove(&remote_addr); mgr.usernames.write().await.remove(&remote_addr); // remove player physics body let mut data = physics_data.write().await; let mut rigid_body_set = data.rigid_body_set.clone(); let mut island_manager = data.island_manager.clone(); let mut collider_set = data.collider_set.clone(); let mut impulse_joint_set = data.impulse_joint_set.clone(); let mut multibody_joint_set = data.multibody_joint_set.clone(); let handle = match mgr.players.read().await.get(&remote_addr) { Some(s) => s.handle, None => {error!("looks like somebody forgot to make their player"); return} }; rigid_body_set.remove(handle, &mut island_manager, &mut collider_set, &mut impulse_joint_set, &mut multibody_joint_set, true); data.rigid_body_set = rigid_body_set; data.collider_set = collider_set; data.island_manager = island_manager; data.impulse_joint_set = impulse_joint_set; data.multibody_joint_set = multibody_joint_set; mgr.players.write().await.remove(&remote_addr); } }, Err(e) => { error!("error upgrading connection from {} to WS: {}", remote_addr, e); } } }); //return the response to the handshake request response }, Err(e) => { //probably the handshake request is not up to spec for websocket error!("error creating websocket response to {}: {}", remote_addr, e); let mut res = Response::new(Body::from(format!("Failed to create websocket: {}", e))); *res.status_mut() = StatusCode::BAD_REQUEST; return Ok(res); } }; Ok::<_, Infallible>(response) }, ("/ws", false) => { Ok(Response::builder().status(400).body(Body::from("Connection-Upgrade header missing")).unwrap()) }, ("/ping", false) => { Ok(Response::builder().status(200).header("Access-Control-Allow-Origin", "*").body(Body::from( serde_json::to_string(&ServerPingResponse { version: ServerPingResponseVersion { name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs number: env!("STK_VERSION").to_string(), // Set by build.rs protocol: PROTOCOL_VERSION, }, players: CMGR.usernames.read().await.len() as u32, description: env!("STK_SLP_DESCRIPTION").to_string(), }).unwrap() )).unwrap()) }, (_url, false) => { // typical HTTP file request // TODO Ok(Response::new(Body::empty())) }, (_, true) => { // http upgrade on non-/ws endpoint Ok(Response::builder().status(400).body(Body::from("Incorrect WebSocket endpoint")).unwrap()) } } } */ lazy_static! { static ref CMGR: ClientManager = ClientManager { handlers: Arc::new(RwLock::new(Default::default())), usernames: Arc::new(RwLock::new(Default::default())), players: Arc::new(RwLock::new(Default::default())), }; static ref DATA: Arc> = Arc::new(RwLock::new(PhysicsData { gravity: vector![0.0, 0.0], integration_parameters: IntegrationParameters { dt: 1.0 / 20.0, ..Default::default() }, island_manager: IslandManager::new(), broad_phase: BroadPhase::new(), narrow_phase: NarrowPhase::new(), rigid_body_set: RigidBodySet::new(), collider_set: ColliderSet::new(), impulse_joint_set: ImpulseJointSet::new(), multibody_joint_set: MultibodyJointSet::new(), ccd_solver: CCDSolver::new(), })); static ref PLANETS: Arc> = Arc::new(RwLock::new(Planets::default())); } #[async_std::main] async fn main() { simple_logger::init_with_level(Level::Debug).expect("Unable to start logging service"); let addr = SocketAddr::from(([0, 0, 0, 0], 3000)); info!("Listening on {} for HTTP/WebSocket connections", addr); // make earth { let mut data_handle = DATA.write().await; let mut rigid_body_set = data_handle.rigid_body_set.clone(); let mut collider_set = data_handle.collider_set.clone(); let mut planets = PLANETS.write().await; planets.planets = Planets::new(&mut rigid_body_set, &mut collider_set).planets; data_handle.rigid_body_set = rigid_body_set; data_handle.collider_set = collider_set; } let mgr_timer = CMGR.clone(); let physics_data = DATA.clone(); let world_data = PLANETS.clone(); let _timer_thread = async_std::task::spawn(async move { timer_main(mgr_timer, physics_data, world_data).await; }); let try_socket = TcpListener::bind(&addr).await; let listener = match try_socket { Ok(l) => l, Err(e) => { error!("error binding to socket: {}", e); std::process::exit(1); } }; while let Ok((stream, peer_addr)) = listener.accept().await { async_std::task::spawn(handle_request(stream, peer_addr, CMGR.clone(), DATA.clone())); } } #[derive(Serialize, Deserialize)] pub struct ServerPingResponse { pub version: ServerPingResponseVersion, pub players: u32, pub description: String } #[derive(Serialize, Deserialize)] pub struct ServerPingResponseVersion { pub name: String, pub number: String, pub protocol: u32 }