// StarKingdoms.IO, a browser game about drifting through space // Copyright (C) 2024 ghostly_zsh, TerraMaster85, core // // This program is free software: you can redistribute it and/or modify // it under the terms of the GNU Affero General Public License as published by // the Free Software Foundation, either version 3 of the License, or // (at your option) any later version. // // This program is distributed in the hope that it will be useful, // but WITHOUT ANY WARRANTY; without even the implied warranty of // MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the // GNU Affero General Public License for more details. // // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . use bevy::app::{App, Plugin, PostUpdate, Startup}; use bevy::ecs::event::ManualEventReader; use bevy::log::{error, warn}; use bevy::prelude::{Commands, Event, Events, Local, Res, ResMut, Resource}; use crossbeam_channel::{unbounded, Receiver}; use std::collections::HashMap; use std::net::{IpAddr, SocketAddr, TcpListener}; use std::sync::{Arc, RwLock}; use std::thread; use tungstenite::protocol::Role; use tungstenite::{Message, WebSocket}; pub use tungstenite; pub struct StkTungsteniteServerPlugin; impl Plugin for StkTungsteniteServerPlugin { fn build(&self, app: &mut App) { app.add_event::(); app.add_systems(Startup, Self::init_server); app.add_systems(PostUpdate, Self::event_listener); } } #[derive(Event, Clone, Debug)] pub enum WsEvent { Connection { from: SocketAddr }, Close { addr: SocketAddr }, Send { to: SocketAddr, message: Message }, Recv { from: SocketAddr, message: Message }, Broadcast { message: Message }, } #[derive(Resource, Clone)] pub struct Clients(Arc>>>); #[derive(Resource)] pub struct StkTungsteniteServerConfig { pub addr: IpAddr, pub port: u16, } #[derive(Resource)] pub struct Rx(Receiver); //#[derive(Resource)] //pub struct Tx(Sender); impl StkTungsteniteServerPlugin { pub fn init_server(config: Res, mut commands: Commands) { let listener = TcpListener::bind(SocketAddr::from((config.addr, config.port))) .expect("Failed to bind"); let (tx, rx) = unbounded::(); let clients = Clients(Arc::new(RwLock::new(HashMap::new()))); commands.insert_resource(clients.clone()); commands.insert_resource(Rx(rx.clone())); //commands.insert_resource(Tx(tx.clone())); let clients = clients.0.clone(); thread::spawn(move || { loop { let (stream, this_addr) = listener.accept().unwrap(); let upgraded = match tungstenite::accept(stream) { Ok(up) => up, Err(e) => { warn!("error upgrading {}: {}", this_addr, e); continue; } }; let (ltx, lrx) = std::sync::mpsc::channel(); { // Lock block let mut handle = clients.write().expect("failed to lock clients map"); handle.insert(this_addr, ltx); } // unlocked here tx.send(WsEvent::Connection { from: this_addr }) .expect("failed to send event across channel"); // send packet thread::spawn({ let fd_ref = upgraded .get_ref() .try_clone() .expect("failed to clone tcpstream"); let mut l_stream = WebSocket::from_raw_socket(fd_ref, Role::Server, None); let l_gtx = tx.clone(); move || { for event in lrx.iter() { match event { WsEvent::Send { to, message } => { if to == this_addr { match l_stream.send(message) { Ok(_) => (), Err(e) => match e { tungstenite::Error::AlreadyClosed | tungstenite::Error::ConnectionClosed => { l_gtx .send(WsEvent::Close { addr: this_addr }) .expect("failed to send on stream"); } e => { error!("error sending to {}: {}", this_addr, e); break; } }, } } } WsEvent::Close { addr } => { if addr == this_addr { match l_stream.close(None) { Ok(_) => (), Err(e) => { error!("failed to disconnect client: {}", e); break; } } break; } } _ => {} } } } }); // receive packet thread::spawn({ let fd_ref = upgraded .get_ref() .try_clone() .expect("failed to clone tcpstream"); let mut l_stream = WebSocket::from_raw_socket(fd_ref, Role::Server, None); let l_gtx = tx.clone(); move || loop { let msg = match l_stream.read() { Ok(m) => m, Err(e) => { error!("error reading from stream: {}", e); break; } }; if let Message::Close(_) = msg { let _ = l_stream.close(None); l_gtx.send(WsEvent::Close { addr: this_addr }).unwrap(); break; } l_gtx .send(WsEvent::Recv { from: this_addr, message: msg, }) .unwrap(); } }); } }); } pub fn event_listener( clients: Res, game_receiver: Res>, mut event_reader: Local>, mut events: ResMut>, ) { let mut clients = clients.0.write().unwrap(); for event in event_reader.read(&events) { match event { WsEvent::Send { to, .. } => { if let Some(client) = clients.get_mut(to) { client.send(event.clone()).expect("failed to forward event"); } } WsEvent::Close { addr } => { if let Some(client) = clients.get_mut(addr) { match client.send(event.clone()) { Ok(_) => (), Err(e) => error!("failed to forward event: {}", e), } } clients.remove(addr); } WsEvent::Broadcast { message } => { for (addr, client) in clients.iter() { client .send(WsEvent::Send { to: *addr, message: message.clone(), }) .expect("failed to forward event"); } } _ => {} } } if let Ok(event) = game_receiver.0.try_recv() { events.send(event); } } }