// StarKingdoms.IO, a browser game about drifting through space
// Copyright (C) 2024 ghostly_zsh, TerraMaster85, core
//
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see .
use bevy::app::{App, Plugin, PostUpdate, Startup};
use bevy::ecs::event::ManualEventReader;
use bevy::log::{error, warn};
use bevy::prelude::{Commands, Event, Events, Local, Res, ResMut, Resource};
use crossbeam_channel::{unbounded, Receiver, Sender};
use std::collections::HashMap;
use std::net::{IpAddr, SocketAddr, TcpListener};
use std::sync::{Arc, RwLock};
use std::thread;
use tungstenite::protocol::Role;
use tungstenite::{Message, WebSocket};
pub use tungstenite;
pub struct StkTungsteniteServerPlugin;
impl Plugin for StkTungsteniteServerPlugin {
fn build(&self, app: &mut App) {
app.add_event::();
app.add_systems(Startup, Self::init_server);
app.add_systems(PostUpdate, Self::event_listener);
}
}
#[derive(Event, Clone, Debug)]
pub enum WsEvent {
Connection { from: SocketAddr },
Close { addr: SocketAddr },
Send { to: SocketAddr, message: Message },
Recv { from: SocketAddr, message: Message },
Broadcast { message: Message },
}
#[derive(Resource, Clone)]
pub struct Clients(Arc>>>);
#[derive(Resource)]
pub struct StkTungsteniteServerConfig {
pub addr: IpAddr,
pub port: u16,
}
#[derive(Resource)]
pub struct Rx(Receiver);
#[derive(Resource)]
pub struct Tx(Sender);
impl StkTungsteniteServerPlugin {
pub fn init_server(config: Res, mut commands: Commands) {
let listener = TcpListener::bind(SocketAddr::from((config.addr, config.port)))
.expect("Failed to bind");
let (tx, rx) = unbounded::();
let clients = Clients(Arc::new(RwLock::new(HashMap::new())));
commands.insert_resource(clients.clone());
commands.insert_resource(Rx(rx.clone()));
commands.insert_resource(Tx(tx.clone()));
let clients = clients.0.clone();
thread::spawn(move || {
loop {
let (stream, this_addr) = listener.accept().unwrap();
let upgraded = match tungstenite::accept(stream) {
Ok(up) => up,
Err(e) => {
warn!("error upgrading {}: {}", this_addr, e);
continue;
}
};
let (ltx, lrx) = std::sync::mpsc::channel();
{
// Lock block
let mut handle = clients.write().expect("failed to lock clients map");
handle.insert(this_addr, ltx);
} // unlocked here
tx.send(WsEvent::Connection { from: this_addr })
.expect("failed to send event across channel");
thread::spawn({
let fd_ref = upgraded
.get_ref()
.try_clone()
.expect("failed to clone tcpstream");
let mut l_stream = WebSocket::from_raw_socket(fd_ref, Role::Server, None);
let l_gtx = tx.clone();
move || {
for event in lrx.iter() {
match event {
WsEvent::Send { to, message } => {
if to == this_addr {
match l_stream.send(message) {
Ok(_) => (),
Err(e) => match e {
tungstenite::Error::AlreadyClosed
| tungstenite::Error::ConnectionClosed => {
l_gtx
.send(WsEvent::Close { addr: this_addr })
.expect("failed to send on stream");
}
e => {
error!("error sending to {}: {}", this_addr, e);
break;
}
},
}
}
}
WsEvent::Close { addr } => {
if addr == this_addr {
l_stream.close(None).expect("failed to disconnect client");
break;
}
}
_ => {}
}
}
}
});
thread::spawn({
let fd_ref = upgraded
.get_ref()
.try_clone()
.expect("failed to clone tcpstream");
let mut l_stream = WebSocket::from_raw_socket(fd_ref, Role::Server, None);
let l_gtx = tx.clone();
move || loop {
let msg = match l_stream.read() {
Ok(m) => m,
Err(e) => {
error!("error reading from stream: {}", e);
break;
}
};
if let Message::Close(_) = msg {
let _ = l_stream.close(None);
l_gtx.send(WsEvent::Close { addr: this_addr }).unwrap();
break;
}
l_gtx
.send(WsEvent::Recv {
from: this_addr,
message: msg,
})
.unwrap();
}
});
}
});
}
pub fn event_listener(
clients: Res,
game_receiver: Res>,
mut event_reader: Local>,
mut events: ResMut>,
) {
let mut clients = clients.0.write().unwrap();
for event in event_reader.read(&events) {
match event {
WsEvent::Send { to, .. } => {
if let Some(client) = clients.get_mut(to) {
client.send(event.clone()).expect("failed to forward event");
}
}
WsEvent::Close { addr } => {
if let Some(client) = clients.get_mut(addr) {
client.send(event.clone()).expect("failed to forward event");
}
clients.remove(addr);
}
WsEvent::Broadcast { message } => {
for (addr, client) in clients.iter() {
client
.send(WsEvent::Send {
to: *addr,
message: message.clone(),
})
.expect("failed to forward event");
}
}
_ => {}
}
}
if let Ok(event) = game_receiver.0.try_recv() {
events.send(event);
}
}
}