use std::convert::Infallible; use std::net::SocketAddr; use std::sync::Arc; use hyper::{Body, header, Request, Response, Server, server::conn::AddrStream, StatusCode, upgrade}; use hyper::service::{make_service_fn, service_fn}; use manager::PhysicsData; use nalgebra::vector; use rapier2d::prelude::{MultibodyJointSet, ImpulseJointSet, ColliderSet, RigidBodySet, NarrowPhase, BroadPhase, IslandManager, CCDSolver, IntegrationParameters}; use tokio_tungstenite::WebSocketStream; use tungstenite::{handshake}; use futures::stream::StreamExt; use lazy_static::lazy_static; use log::{error, info, Level}; use serde::{Deserialize, Serialize}; use tokio::sync::RwLock; use starkingdoms_protocol::{PROTOCOL_VERSION}; use crate::manager::{ClientHandler, ClientManager}; use crate::handler::handle_client; use crate::timer::timer_main; pub mod handler; pub mod manager; pub mod timer; #[macro_use] pub mod macros; const SCALE: f32 = 10.0; async fn handle_request(mut request: Request, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc>) -> Result, Infallible> { match (request.uri().path(), request.headers().contains_key(header::UPGRADE)) { //if the request is ws_echo and the request headers contains an Upgrade key ("/ws", true) => { info!("received connection from {}", remote_addr); //assume request is a handshake, so create the handshake response let response = match handshake::server::create_response_with_body(&request, Body::empty) { Ok(response) => { //in case the handshake response creation succeeds, //spawn a task to handle the websocket connection tokio::spawn(async move { //using the hyper feature of upgrading a connection match upgrade::on(&mut request).await { //if successfully upgraded Ok(upgraded) => { info!("[{}] connection upgraded", remote_addr); //create a websocket stream from the upgraded object let ws_stream = WebSocketStream::from_raw_socket( //pass the upgraded object //as the base layer stream of the Websocket upgraded, tokio_tungstenite::tungstenite::protocol::Role::Server, None, ).await; //we can split the stream into a sink and a stream let (ws_write, ws_read) = ws_stream.split(); let (tx, rx) = tokio::sync::mpsc::channel(128); let client = ClientHandler { tx, }; // Acquire the write lock in a small scope, so it's dropped as quickly as possible { mgr.handlers.write().await.insert(remote_addr, client); } info!("[{}] passing to client handler", remote_addr); //forward the stream to the sink to achieve echo match handle_client(mgr.clone(), physics_data.clone(), remote_addr, rx, ws_write, ws_read).await { Ok(_) => {}, Err(e) => error!("error on WS connection {}: {}", remote_addr, e), }; // clean up values left over { mgr.handlers.write().await.remove(&remote_addr); mgr.usernames.write().await.remove(&remote_addr); // remove player physics body let mut data = physics_data.write().await; let mut rigid_body_set = data.rigid_body_set.clone(); let mut island_manager = data.island_manager.clone(); let mut collider_set = data.collider_set.clone(); let mut impulse_joint_set = data.impulse_joint_set.clone(); let mut multibody_joint_set = data.multibody_joint_set.clone(); let handle = match mgr.players.read().await.get(&remote_addr) { Some(s) => s.handle, None => {error!("looks like somebody forgot to make their player"); return} }; rigid_body_set.remove(handle, &mut island_manager, &mut collider_set, &mut impulse_joint_set, &mut multibody_joint_set, true); data.rigid_body_set = rigid_body_set; data.collider_set = collider_set; data.island_manager = island_manager; data.impulse_joint_set = impulse_joint_set; data.multibody_joint_set = multibody_joint_set; mgr.players.write().await.remove(&remote_addr); } }, Err(e) => { error!("error upgrading connection from {} to WS: {}", remote_addr, e); } } }); //return the response to the handshake request response }, Err(e) => { //probably the handshake request is not up to spec for websocket error!("error creating websocket response to {}: {}", remote_addr, e); let mut res = Response::new(Body::from(format!("Failed to create websocket: {}", e))); *res.status_mut() = StatusCode::BAD_REQUEST; return Ok(res); } }; Ok::<_, Infallible>(response) }, ("/ws", false) => { Ok(Response::builder().status(400).body(Body::from("Connection-Upgrade header missing")).unwrap()) }, ("/ping", false) => { Ok(Response::builder().status(200).header("Access-Control-Allow-Origin", "*").body(Body::from( serde_json::to_string(&ServerPingResponse { version: ServerPingResponseVersion { name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs number: env!("STK_VERSION").to_string(), // Set by build.rs protocol: PROTOCOL_VERSION, }, players: CMGR.usernames.read().await.len() as u32, description: env!("STK_SLP_DESCRIPTION").to_string(), }).unwrap() )).unwrap()) }, (_url, false) => { // typical HTTP file request // TODO Ok(Response::new(Body::empty())) }, (_, true) => { // http upgrade on non-/ws endpoint Ok(Response::builder().status(400).body(Body::from("Incorrect WebSocket endpoint")).unwrap()) } } } lazy_static! { static ref CMGR: ClientManager = ClientManager { handlers: Arc::new(RwLock::new(Default::default())), usernames: Arc::new(RwLock::new(Default::default())), players: Arc::new(RwLock::new(Default::default())), }; static ref DATA: Arc> = Arc::new(RwLock::new(PhysicsData { gravity: vector![0.0, 0.0], integration_parameters: IntegrationParameters { dt: 1.0 / 20.0, ..Default::default() }, island_manager: IslandManager::new(), broad_phase: BroadPhase::new(), narrow_phase: NarrowPhase::new(), rigid_body_set: RigidBodySet::new(), collider_set: ColliderSet::new(), impulse_joint_set: ImpulseJointSet::new(), multibody_joint_set: MultibodyJointSet::new(), ccd_solver: CCDSolver::new(), })); } #[tokio::main] async fn main() { simple_logger::init_with_level(Level::Debug).expect("Unable to start logging service"); let addr = SocketAddr::from(([127, 0, 0, 1], 3000)); info!("Listening on {} for HTTP/WebSocket connections", addr); let make_svc = make_service_fn(|conn: &AddrStream| { let remote_addr = conn.remote_addr(); async move { Ok::<_, Infallible>(service_fn({ move |request: Request| { handle_request(request, remote_addr, CMGR.clone(), DATA.clone()) } })) } }); let mgr_timer = CMGR.clone(); let physics_data = DATA.clone(); let _timer_thread = tokio::spawn(async move { timer_main(mgr_timer, physics_data).await; }); let server = Server::bind(&addr).serve(make_svc); if let Err(e) = server.await { error!("error in server thread: {}", e); } } #[derive(Serialize, Deserialize)] pub struct ServerPingResponse { pub version: ServerPingResponseVersion, pub players: u32, pub description: String } #[derive(Serialize, Deserialize)] pub struct ServerPingResponseVersion { pub name: String, pub number: String, pub protocol: u32 }