use std::error::Error;
use std::net::SocketAddr;
use async_std::io::WriteExt;
use async_std::sync::Arc;
use async_std::net::{TcpListener, TcpStream};
use manager::PhysicsData;
use nalgebra::vector;
use planet::Planets;
use rapier2d_f64::prelude::{MultibodyJointSet, ImpulseJointSet, ColliderSet, RigidBodySet, NarrowPhase, BroadPhase, IslandManager, CCDSolver, IntegrationParameters};
use lazy_static::lazy_static;
use log::{error, info, Level, warn};
use serde::{Deserialize, Serialize};
use crate::manager::{ClientHandler, ClientManager};
use crate::timer::timer_main;
use async_std::sync::RwLock;
use futures::StreamExt;
use starkingdoms_protocol::PROTOCOL_VERSION;
use crate::handler::handle_client;
pub mod handler;
pub mod manager;
pub mod timer;
#[macro_use]
pub mod macros;
pub mod planet;
pub mod orbit;
const SCALE: f64 = 1.0;
async fn handle_request(conn: TcpStream, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc<RwLock<PhysicsData>>) {
match _handle_request(conn, remote_addr, mgr, physics_data).await {
Ok(_) => (),
Err(e) => {
error!("[{}] error in handler thread: {}", remote_addr, e);
}
}
}
async fn _handle_request(mut conn: TcpStream, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc<RwLock<PhysicsData>>) -> Result<(), Box<dyn Error>> {
let mut peek_buf = [0u8; 9];
loop {
let read = conn.peek(&mut peek_buf).await?;
if read == 9 {
break;
}
}
if peek_buf == *b"GET /ping" {
info!("[{}] incoming http connection", remote_addr);
let ping_resp = serde_json::to_string(&ServerPingResponse {
version: ServerPingResponseVersion {
name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs
number: env!("STK_VERSION").to_string(), // Set by build.rs
protocol: PROTOCOL_VERSION,
},
players: CMGR.usernames.read().await.len() as u32,
description: env!("STK_SLP_DESCRIPTION").to_string(),
}).unwrap();
let resp_str = format!("HTTP/1.0 200 OK\nAccess-Control-Allow-Origin: *\nContent-Length: {}\n\n{}", ping_resp.len(), ping_resp);
let http_resp = resp_str.as_bytes();
conn.write_all(http_resp).await?;
info!("[{}] sent ping response (200 OK {} bytes)", remote_addr, ping_resp.len());
return Ok(());
}
info!("[{}] incoming websocket connection", remote_addr);
// if its not GET /ping, assume its websocket and attempt to handshake with them
let ws_stream = async_tungstenite::accept_async(conn).await?;
let (ws_write, ws_read) = ws_stream.split();
let (tx, rx) = async_std::channel::unbounded();
let client = ClientHandler { tx };
// Acquire the write lock in a small scope, so it's dropped as quickly as possible
{
mgr.handlers.write().await.insert(remote_addr, client);
}
info!("[{}] passing to client handler", remote_addr);
//forward the stream to the sink to achieve echo
match handle_client(mgr.clone(), physics_data.clone(), remote_addr, rx, ws_write, ws_read).await {
Ok(_) => (),
Err(e) if e.is::<async_tungstenite::tungstenite::error::Error>() => {
let e = e.downcast::<async_tungstenite::tungstenite::error::Error>().unwrap();
if matches!(*e, async_tungstenite::tungstenite::Error::ConnectionClosed) {
info!("[{}] connection closed normally", remote_addr);
} else {
error!("[{}] error in client thread: {}", remote_addr, e);
}
},
Err(e) => {
error!("[{}] error in client thread: {}", remote_addr, e);
}
}
// clean up values left over
{
mgr.handlers.write().await.remove(&remote_addr);
mgr.usernames.write().await.remove(&remote_addr);
// remove player physics body
let mut data = physics_data.write().await;
let mut rigid_body_set = data.rigid_body_set.clone();
let mut island_manager = data.island_manager.clone();
let mut collider_set = data.collider_set.clone();
let mut impulse_joint_set = data.impulse_joint_set.clone();
let mut multibody_joint_set = data.multibody_joint_set.clone();
let handle = match mgr.players.read().await.get(&remote_addr) {
Some(s) => s.handle,
None => {
warn!("[{}] player missing from mgr.players", remote_addr);
return Err("Player missing from mgr.players".into());
}
};
rigid_body_set.remove(handle, &mut island_manager, &mut collider_set,
&mut impulse_joint_set, &mut multibody_joint_set, true);
data.rigid_body_set = rigid_body_set;
data.collider_set = collider_set;
data.island_manager = island_manager;
data.impulse_joint_set = impulse_joint_set;
data.multibody_joint_set = multibody_joint_set;
mgr.players.write().await.remove(&remote_addr);
}
Ok(())
}
/*
async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr: ClientManager, physics_data: Arc<RwLock<PhysicsData>>) -> Result<Response<Body>, Infallible> {
match (request.uri().path(), request.headers().contains_key(header::UPGRADE)) {
//if the request is ws_echo and the request headers contains an Upgrade key
("/ws", true) => {
info!("received connection from {}", remote_addr);
//assume request is a handshake, so create the handshake response
let response =
match handshake::server::create_response_with_body(&request, Body::empty) {
Ok(response) => {
//in case the handshake response creation succeeds,
//spawn a task to handle the websocket connection
tokio::spawn(async move {
//using the hyper feature of upgrading a connection
match upgrade::on(&mut request).await {
//if successfully upgraded
Ok(upgraded) => {
info!("[{}] connection upgraded", remote_addr);
//create a websocket stream from the upgraded object
let ws_stream = WebSocketStream::from_raw_socket(
//pass the upgraded object
//as the base layer stream of the Websocket
upgraded,
tungstenite::protocol::Role::Server,
None,
).await;
//we can split the stream into a sink and a stream
let (ws_write, ws_read) = ws_stream.split();
let (tx, rx) = tokio::sync::mpsc::channel(128);
let client = ClientHandler {
tx,
};
// Acquire the write lock in a small scope, so it's dropped as quickly as possible
{
mgr.handlers.write().await.insert(remote_addr, client);
}
info!("[{}] passing to client handler", remote_addr);
//forward the stream to the sink to achieve echo
match handle_client(mgr.clone(), physics_data.clone(), remote_addr, rx, ws_write, ws_read).await {
Ok(_) => {},
Err(e) => error!("error on WS connection {}: {}", remote_addr, e),
};
// clean up values left over
{
mgr.handlers.write().await.remove(&remote_addr);
mgr.usernames.write().await.remove(&remote_addr);
// remove player physics body
let mut data = physics_data.write().await;
let mut rigid_body_set = data.rigid_body_set.clone();
let mut island_manager = data.island_manager.clone();
let mut collider_set = data.collider_set.clone();
let mut impulse_joint_set = data.impulse_joint_set.clone();
let mut multibody_joint_set = data.multibody_joint_set.clone();
let handle = match mgr.players.read().await.get(&remote_addr) {
Some(s) => s.handle,
None => {error!("looks like somebody forgot to make their player"); return}
};
rigid_body_set.remove(handle, &mut island_manager, &mut collider_set,
&mut impulse_joint_set, &mut multibody_joint_set, true);
data.rigid_body_set = rigid_body_set;
data.collider_set = collider_set;
data.island_manager = island_manager;
data.impulse_joint_set = impulse_joint_set;
data.multibody_joint_set = multibody_joint_set;
mgr.players.write().await.remove(&remote_addr);
}
},
Err(e) => {
error!("error upgrading connection from {} to WS: {}", remote_addr, e);
}
}
});
//return the response to the handshake request
response
},
Err(e) => {
//probably the handshake request is not up to spec for websocket
error!("error creating websocket response to {}: {}", remote_addr, e);
let mut res = Response::new(Body::from(format!("Failed to create websocket: {}", e)));
*res.status_mut() = StatusCode::BAD_REQUEST;
return Ok(res);
}
};
Ok::<_, Infallible>(response)
},
("/ws", false) => {
Ok(Response::builder().status(400).body(Body::from("Connection-Upgrade header missing")).unwrap())
},
("/ping", false) => {
Ok(Response::builder().status(200).header("Access-Control-Allow-Origin", "*").body(Body::from(
serde_json::to_string(&ServerPingResponse {
version: ServerPingResponseVersion {
name: env!("STK_VERSION_NAME").to_string(), // Set by build.rs
number: env!("STK_VERSION").to_string(), // Set by build.rs
protocol: PROTOCOL_VERSION,
},
players: CMGR.usernames.read().await.len() as u32,
description: env!("STK_SLP_DESCRIPTION").to_string(),
}).unwrap()
)).unwrap())
},
(_url, false) => {
// typical HTTP file request
// TODO
Ok(Response::new(Body::empty()))
},
(_, true) => {
// http upgrade on non-/ws endpoint
Ok(Response::builder().status(400).body(Body::from("Incorrect WebSocket endpoint")).unwrap())
}
}
}
*/
lazy_static! {
static ref CMGR: ClientManager = ClientManager {
handlers: Arc::new(RwLock::new(Default::default())),
usernames: Arc::new(RwLock::new(Default::default())),
players: Arc::new(RwLock::new(Default::default())),
};
static ref DATA: Arc<RwLock<PhysicsData>> = Arc::new(RwLock::new(PhysicsData {
gravity: vector![0.0, 0.0],
integration_parameters: IntegrationParameters {
dt: 1.0 / 20.0,
..Default::default()
},
island_manager: IslandManager::new(),
broad_phase: BroadPhase::new(),
narrow_phase: NarrowPhase::new(),
rigid_body_set: RigidBodySet::new(),
collider_set: ColliderSet::new(),
impulse_joint_set: ImpulseJointSet::new(),
multibody_joint_set: MultibodyJointSet::new(),
ccd_solver: CCDSolver::new(),
}));
static ref PLANETS: Arc<RwLock<Planets>> = Arc::new(RwLock::new(Planets::default()));
}
#[async_std::main]
async fn main() {
simple_logger::init_with_level(Level::Debug).expect("Unable to start logging service");
let addr = SocketAddr::from(([0, 0, 0, 0], 3000));
info!("Listening on {} for HTTP/WebSocket connections", addr);
// make earth
{
let mut data_handle = DATA.write().await;
let mut rigid_body_set = data_handle.rigid_body_set.clone();
let mut collider_set = data_handle.collider_set.clone();
let mut planets = PLANETS.write().await;
planets.planets = Planets::new(&mut rigid_body_set, &mut collider_set).planets;
data_handle.rigid_body_set = rigid_body_set;
data_handle.collider_set = collider_set;
}
let mgr_timer = CMGR.clone();
let physics_data = DATA.clone();
let world_data = PLANETS.clone();
let _timer_thread = async_std::task::spawn(async move {
timer_main(mgr_timer, physics_data, world_data).await;
});
let try_socket = TcpListener::bind(&addr).await;
let listener = match try_socket {
Ok(l) => l,
Err(e) => {
error!("error binding to socket: {}", e);
std::process::exit(1);
}
};
while let Ok((stream, peer_addr)) = listener.accept().await {
async_std::task::spawn(handle_request(stream, peer_addr, CMGR.clone(), DATA.clone()));
}
}
#[derive(Serialize, Deserialize)]
pub struct ServerPingResponse {
pub version: ServerPingResponseVersion,
pub players: u32,
pub description: String
}
#[derive(Serialize, Deserialize)]
pub struct ServerPingResponseVersion {
pub name: String,
pub number: String,
pub protocol: u32
}