From ce2ff9e8c8fa338757524a29f181986b6776cb31 Mon Sep 17 00:00:00 2001 From: ghostly_zsh Date: Thu, 23 Jan 2025 17:32:14 -0600 Subject: [PATCH] networking on native --- crates/client/Cargo.toml | 2 + crates/client/src/lib.rs | 3 +- crates/client/src/networking/ws_native.rs | 61 +++++++++++++++++++++++ crates/client/src/networking/ws_wasm.rs | 18 ++++--- crates/client/src/rendering/mod.rs | 6 ++- crates/client/src/rendering/renderer.rs | 6 ++- 6 files changed, 83 insertions(+), 13 deletions(-) diff --git a/crates/client/Cargo.toml b/crates/client/Cargo.toml index 84b1e0930fb3ebcad6a912bc4ed269b6487c6da2..fbc9571cb4d0d2831e2c3339c30a148baadbd57e 100644 --- a/crates/client/Cargo.toml +++ b/crates/client/Cargo.toml @@ -24,6 +24,7 @@ nalgebra = "0.33" starkingdoms-common = { version = "0.1", path = "../common" } serde = "1" serde_json = "1" +crossbeam = "0.8.4" # WASM dependencies [target.'cfg(target_arch = "wasm32")'.dependencies] @@ -37,3 +38,4 @@ reqwest = "0.11" # Native dependencies [target.'cfg(not(target_arch = "wasm32"))'.dependencies] pollster = "0.4" +tungstenite = "0.26.1" diff --git a/crates/client/src/lib.rs b/crates/client/src/lib.rs index 4fe2d816c7e65b4c06e27ec8573866377d5f9641..d15614e7aefbe1ced72a270063c6f4dfcbacb621 100644 --- a/crates/client/src/lib.rs +++ b/crates/client/src/lib.rs @@ -3,7 +3,6 @@ use crate::input::MouseWheelEvent; use crate::rendering::ui::UiRenderable; use crate::rendering::App; use bevy_ecs::event::{EventReader, Events}; -use bevy_ecs::observer::Trigger; use bevy_ecs::schedule::Schedule; use bevy_ecs::system::ResMut; use bevy_ecs::world::World; @@ -50,7 +49,7 @@ pub fn start() { zoom: 1.0, }); world.insert_resource(Assets::new()); - world.insert_resource(Ws::new().expect("Couldn't connect to server")); + world.insert_resource(Ws::new()); let send_packet_events = Events::::default(); let recv_packet_events = Events::::default(); diff --git a/crates/client/src/networking/ws_native.rs b/crates/client/src/networking/ws_native.rs index e69de29bb2d1d6434b8b29ae775ad8c2e48c5391..61a07169c88058d531241e96b0f4e53e0e888a50 100644 --- a/crates/client/src/networking/ws_native.rs +++ b/crates/client/src/networking/ws_native.rs @@ -0,0 +1,61 @@ +use std::{net::TcpStream, sync::{Arc, Mutex}}; + +use bevy_ecs::system::Resource; +use crossbeam::channel::{unbounded, Receiver, Sender}; +use starkingdoms_common::packet::{MsgFromError, Packet}; +use tungstenite::{connect, stream::MaybeTlsStream, Message, WebSocket}; + +pub trait PacketMessageConvert { + fn from_message(value: &Message) -> Result; + fn into_message(&self) -> Message; +} + +impl PacketMessageConvert for Packet { + fn from_message(value: &Message) -> Result { + match value { + Message::Text(s) => serde_json::from_str(s).map_err(MsgFromError::JSONError), + Message::Binary(b) => serde_json::from_slice(b).map_err(MsgFromError::JSONError), + Message::Close(_) => Ok(Packet::_SpecialDisconnect {}), + Message::Frame(_) | Message::Pong(_) | Message::Ping(_) => { + Err(MsgFromError::InvalidMessageType) + } + } + } + fn into_message(&self) -> Message { + Message::Text(serde_json::to_string(self).expect("failed to serialize packet to json").into()) + } +} + +#[derive(Resource, Debug)] +pub struct Ws { + socket: Arc>>>, + pub sender: Sender, + pub receiver: Receiver, + packet_receiver: Receiver, +} +impl Ws { + pub fn new() -> Self { + let (socket, _) = connect("ws://localhost:3000").expect("Failed to connect to server"); + let socket = Arc::new(Mutex::new(socket)); + let (packet_sender, receiver) = unbounded(); + let (sender, packet_receiver) = unbounded(); + let socket_clone = socket.clone(); + std::thread::spawn(move || { + let socket = socket_clone; + loop { + let message = socket.lock().unwrap().read().expect("Failed to reading message"); + let packet = Packet::from_message(&message).expect("Server sent invalid packet"); + packet_sender.send(packet).expect("Couldn't send packet to server"); + } + }); + Ws { + socket, + sender, + receiver, + packet_receiver, + } + } + pub fn send_packet(&mut self, packet: &Packet) { + self.socket.lock().unwrap().send(packet.into_message()).expect("Couldn't send packet to server"); + } +} diff --git a/crates/client/src/networking/ws_wasm.rs b/crates/client/src/networking/ws_wasm.rs index 7be42c48bc772333bb1f4f4b5d13273611ccd144..4d57ec6432f837efb4c2720d7dd1c5da90bf1d29 100644 --- a/crates/client/src/networking/ws_wasm.rs +++ b/crates/client/src/networking/ws_wasm.rs @@ -1,6 +1,7 @@ use std::{any::Any, cell::{OnceCell, RefCell}, rc::Rc, sync::{Arc, Mutex, RwLock}}; -use bevy_ecs::{system::Resource, world::World}; +use bevy_ecs::system::Resource; +//use crossbeam::channel::{unbounded, Receiver, Sender}; use futures::{channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender}, SinkExt}; use wasm_bindgen::{prelude::Closure, JsCast, JsValue}; use wasm_bindgen_futures::spawn_local; @@ -25,12 +26,12 @@ pub struct Ws { } impl Ws { - pub fn new() -> Result { + pub fn new() -> Self { let window = web_sys::window().unwrap(); let ws = WebSocket::new(&format!("ws://{}:{}", - window.location().hostname().unwrap(), PORT))?; + window.location().hostname().unwrap(), PORT)).expect("Couldn't connect to server"); let (packet_sender, receiver) = unbounded(); - let packet_sender = Rc::new(RwLock::new(packet_sender)); + //let packet_sender = Rc::new(RwLock::new(packet_sender)); let (sender, packet_receiver) = unbounded(); /*let onopen_callback = Closure::::new(move || { @@ -42,21 +43,22 @@ impl Ws { //tracing::error!("{}", ws.ready_state()); let data = e.data().as_string().expect("Expected string, found some other type"); let data: Packet = serde_json::from_str(&data).expect("Received invalid json from server"); - let sender_clone = packet_sender.clone(); + let mut sender_clone = packet_sender.clone(); spawn_local(async move { - sender_clone.write().unwrap().send(data).await.expect("Couldn't transmit packet to client"); + sender_clone.send(data).await.expect("Couldn't transmit packet to client"); }); }); ws.set_onmessage(Some(onmessage_callback.as_ref().unchecked_ref())); onmessage_callback.forget(); - Ok(Ws { + Ws { socket: Socket(ws), sender, receiver, packet_receiver, - }) + } } pub fn send_all_packets_from_channel(&mut self) { + //for packet in self.packet_receiver.iter() { while let Ok(Some(packet)) = self.packet_receiver.try_next() { self.socket.0.send_with_str(&serde_json::to_string(&packet).expect("Couldn't convert packet to json")).expect("Couldn't send packet to server"); } diff --git a/crates/client/src/rendering/mod.rs b/crates/client/src/rendering/mod.rs index bd0e10a4fb7e8cfcc2385bada3dbcf4f8fc3bb30..0382fc1354fa2dfa560b6bc7d173afa19faae55d 100644 --- a/crates/client/src/rendering/mod.rs +++ b/crates/client/src/rendering/mod.rs @@ -213,10 +213,12 @@ impl ApplicationHandler for App { let world = self.world.take().unwrap(); let update_schedule = self.update_schedule.take().unwrap(); let ui_renderable = self.ui_renderable.take().unwrap(); + let send_packet_events = self.send_packet_events.take().unwrap(); + let recv_packet_events = self.recv_packet_events.take().unwrap(); let renderer = pollster::block_on(async move { - Renderer::try_init(window.clone(), world, update_schedule, ui_renderable) - .await + Renderer::try_init(window.clone(), world, update_schedule, + ui_renderable, send_packet_events, recv_packet_events).await }); match renderer { diff --git a/crates/client/src/rendering/renderer.rs b/crates/client/src/rendering/renderer.rs index 27aacbb093879579fc49b585fec66e3eb871da64..44f6f6d8cdb750827af75217747e97fbd9640050 100644 --- a/crates/client/src/rendering/renderer.rs +++ b/crates/client/src/rendering/renderer.rs @@ -9,7 +9,6 @@ use bevy_ecs::event::Events; use bevy_ecs::schedule::Schedule; use bevy_ecs::world::World; use egui::ViewportId; -use futures::SinkExt; use std::collections::HashMap; use std::fmt::{Debug, Formatter}; use std::sync::Arc; @@ -216,9 +215,14 @@ impl Renderer { pub fn render(&mut self) { let mut ws = self.world.get_resource_mut::().expect("Failed to get Ws resource"); + #[cfg(target_arch = "wasm32")] while let Ok(Some(packet)) = ws.receiver.try_next() { self.recv_packet_events.send(RecvPacket(packet)); } + #[cfg(not(target_arch = "wasm32"))] + for packet in ws.receiver.iter() { + self.recv_packet_events.send(RecvPacket(packet)); + } // update the world self.update_schedule.run(&mut self.world); self.send_packet_events.update();