M crates/unified/src/client/mod.rs => crates/unified/src/client/mod.rs +29 -2
@@ 1,5 1,6 @@
use aeronet::io::{Session, SessionEndpoint};
use aeronet::io::connection::{DisconnectReason, Disconnected};
+use aeronet_transport::lane::LaneKind;
//use aeronet_replicon::client::AeronetRepliconClient;
use aeronet_transport::{Transport, TransportConfig};
use aeronet_websocket::client::{ClientConfig, WebSocketClient};
@@ 21,7 22,7 @@ use crate::client::components::Me;
use crate::client::ship::attachment::client_attachment_plugin;
use crate::shared::ecs::{GameplayState, TimeOffset};
use crate::shared::gravity::update_gravity;
-use crate::shared::net::Hi;
+use crate::shared::net::{Hi, ClientMessageRegistry, ServerMessageRegistry};
use crate::shared::orbit::OrbitPlugin;
pub mod colors;
@@ 38,6 39,7 @@ pub mod starguide;
pub mod crafting;
pub mod components;
pub mod plugins;
+mod net;
pub struct ClientPlugin {
pub server: Option<String>
@@ 47,6 49,7 @@ impl Plugin for ClientPlugin {
fn build(&self, app: &mut App) {
app
.init_gizmo_group::<StarguideGizmos>()
+ .add_plugins(net::net_plugin)
.add_plugins(rendering::render_plugin)
.add_plugins(OrbitPlugin)
.add_systems(FixedUpdate, update_gravity.before(PhysicsSystems::Prepare))
@@ 96,9 99,33 @@ pub fn on_connecting(
let name = names.get(entity).unwrap();
info!("{name} is connecting");
}
-pub fn on_connected(trigger: On<Add, Session>, names: Query<&Name>) {
+pub fn on_connected(
+ trigger: On<Add, Session>,
+ names: Query<&Name>,
+ sessions: Query<&Session>,
+ server_message_registry: Res<ServerMessageRegistry>,
+ client_message_registry: Res<ClientMessageRegistry>,
+ mut commands: Commands,
+) {
let entity = trigger.event_target();
let name = names.get(entity).unwrap();
+ let Ok(session) = sessions.get(entity) else {
+ return;
+ };
+
+ let server_message_count = server_message_registry.message_count();
+ let client_message_count = client_message_registry.message_count();
+ let message_count = server_message_count + client_message_count;
+ debug!("message count: {}", message_count);
+ let lanes = [LaneKind::ReliableOrdered].repeat(message_count);
+ let transport = Transport::new(
+ session,
+ lanes.clone(),
+ lanes,
+ bevy::platform::time::Instant::now(),
+ ).expect("packet MTU too small to support transport");
+ commands.entity(entity).insert(transport);
+
info!("{name} is connected");
}
pub fn on_disconnected(trigger: On<Disconnected>, names: Query<&Name>) {
A crates/unified/src/client/net.rs => crates/unified/src/client/net.rs +20 -0
@@ 0,0 1,20 @@
+use crate::{prelude::*, shared::net::{ServerEntityMap, SpawnEntity}};
+
+pub fn net_plugin(app: &mut App) {
+ app
+ .insert_resource(ServerEntityMap::default())
+ .add_systems(PostUpdate, spawn_server_entities);
+}
+
+fn spawn_server_entities(
+ mut spawn_entity_messages: MessageReader<SpawnEntity>,
+ mut entity_mapper: ResMut<ServerEntityMap>,
+ mut commands: Commands,
+) {
+ for spawn_entity in spawn_entity_messages.read() {
+ let entity = commands.spawn_empty();
+ debug!("client: {:?}, server: {:?}", entity.id(), spawn_entity.server);
+ entity_mapper.server_to_client.set_mapped(spawn_entity.server, entity.id());
+ entity_mapper.client_to_server.set_mapped(entity.id(), spawn_entity.server);
+ }
+}
M crates/unified/src/server/mod.rs => crates/unified/src/server/mod.rs +32 -19
@@ 5,6 5,7 @@ mod heat;
mod drill;
mod craft;
mod damping;
+pub mod net;
pub mod planets;
pub mod player;
mod system_sets;
@@ 28,6 29,7 @@ use crate::server::damping::damping_plugin;
use crate::server::drill::drill_plugin;
use crate::server::earth_parts::spawn_parts_plugin;
use crate::server::gravity::newtonian_gravity_plugin;
+use crate::server::net::net_plugin;
use crate::server::part::part_management_plugin;
use crate::server::planets::planets_plugin;
use crate::server::player::player_management_plugin;
@@ 35,6 37,7 @@ use crate::server::system_sets::{PlayerInputSet, WorldUpdateSet};
use crate::prelude::*;
use crate::server::orbit::OrbitPlugin;
use crate::server::player::thrust::server_thrust_plugin;
+use crate::shared::net::{ClientMessageRegistry, ServerMessageRegistry};
pub struct ServerPlugin {
pub bind: SocketAddr
@@ 44,6 47,7 @@ impl Plugin for ServerPlugin {
fn build(&self, app: &mut App) {
let bind = self.bind;
app
+ .add_plugins(net_plugin)
.add_plugins(planets_plugin)
.add_plugins(newtonian_gravity_plugin)
.add_plugins(player_management_plugin)
@@ 91,34 95,43 @@ fn on_connected(
trigger: On<Add, Session>,
clients: Query<&ChildOf>,
sessions: Query<&Session>,
+ server_message_registry: Res<ServerMessageRegistry>,
+ client_message_registry: Res<ClientMessageRegistry>,
mut commands: Commands,
) {
let client = trigger.event_target();
let Ok(&ChildOf(server)) = clients.get(client) else {
return;
};
+ let Ok(session) = sessions.get(client) else {
+ return;
+ };
info!(?client, ?server, "client connected");
-}
+ let player = commands
+ .spawn((ConnectedGameEntity {
+ network_entity: client,
+ }))
+ .id();
-/*fn handle_authorized(
- newly_authorized_clients: Query<Entity, Added<AuthorizedClient>>,
- mut commands: Commands
-) {
- for client in newly_authorized_clients.iter() {
- let player = commands
- .spawn((Replicated, ConnectedGameEntity {
- network_entity: client,
- }))
- .id();
+ let server_message_count = server_message_registry.message_count();
+ let client_message_count = client_message_registry.message_count();
+ let message_count = server_message_count + client_message_count;
+ debug!("message count: {}", message_count);
+ let lanes = [LaneKind::ReliableOrdered].repeat(message_count);
+ let transport = Transport::new(
+ session,
+ lanes.clone(),
+ lanes,
+ bevy::platform::time::Instant::now(),
+ ).expect("packet MTU too small to support transport");
+ commands.entity(client).insert((
+ ConnectedNetworkEntity {
+ game_entity: player,
+ },
+ transport
+ ));
+}
- commands.entity(client).insert((
- Replicated,
- ConnectedNetworkEntity {
- game_entity: player,
- },
- ));
- }
-}*/
fn on_disconnected(
trigger: On<Disconnected>,
clients: Query<&ChildOf>,
A crates/unified/src/server/net.rs => crates/unified/src/server/net.rs +18 -0
@@ 0,0 1,18 @@
+use crate::{prelude::*, shared::net::{Mapped, SendTargets, SpawnEntity, ToClients}};
+
+pub fn net_plugin(app: &mut App) {
+ app.add_systems(PreUpdate, detect_entity_spawn);
+}
+
+fn detect_entity_spawn(
+ mapped_entities: Query<Entity, Added<Mapped>>,
+ mut spawn_entity: MessageWriter<ToClients<SpawnEntity>>,
+) {
+ for entity in mapped_entities {
+ debug!("entity was spawned");
+ spawn_entity.write(ToClients {
+ message: SpawnEntity { server: entity },
+ targets: SendTargets::All,
+ });
+ }
+}
M crates/unified/src/server/player/join.rs => crates/unified/src/server/player/join.rs +2 -1
@@ 5,7 5,7 @@ use crate::shared::ecs::{Player, PlayerStorage};
use crate::prelude::*;
use crate::server::ConnectedGameEntity;
use crate::server::part::SpawnPartRequest;
-use crate::shared::net::{ClientId, Hi, SendTargets, ToClients};
+use crate::shared::net::{ClientId, Hi, SendTargets, ToClients, Mapped};
use crate::shared::world_config::WorldConfigResource;
const SPAWN_ORBIT_OFFSET: f64 = 150.0;
@@ 60,6 60,7 @@ fn join_player(joined_player: Entity, mut commands: Commands, wc: &GlobalWorldCo
}
#[derive(Component)]
+#[require(Mapped)]
pub struct PendingPlayer;
pub fn handle_new_players(
M crates/unified/src/shared/ecs.rs => crates/unified/src/shared/ecs.rs +2 -0
@@ 1,5 1,6 @@
pub mod thruster;
+use crate::shared::net::Mapped;
use crate::shared::config::part::PartConfig;
use bevy::math::{Quat, Vec2};
use bevy::camera::visibility::RenderLayers;
@@ 40,6 41,7 @@ pub struct PartHandle(pub Handle<PartConfig>);
pub struct TimeOffset(pub f64);
#[derive(Component, Serialize, Deserialize, Debug)]
+#[require(Mapped)]
pub struct Player {
pub client: Entity,
}
M crates/unified/src/shared/net.rs => crates/unified/src/shared/net.rs +98 -16
@@ 7,9 7,10 @@ use aeronet_transport::Transport;
use aeronet_transport::lane::LaneIndex;
use aeronet_websocket::tungstenite::Bytes;
use avian2d::prelude::{AngularInertia, AngularVelocity, Collider, LinearVelocity, Mass, Position, Rotation};
-use bevy::ecs::entity::MapEntities;
+use bevy::ecs::entity::{EntityHashMap, MapEntities};
use bevy::ecs::system::SystemState;
use bevy::prelude::*;
+use bevy::reflect::DynamicTypePath;
use postcard::{from_bytes, to_allocvec, to_slice, to_vec};
use crate::prelude::{App, Message};
use crate::shared::thrust::ThrustSolution;
@@ 61,7 62,8 @@ use crate::shared::ecs::thruster::{Thruster, ThrusterOfPart};
pub fn register_net(app: &mut App) {
app
- .add_server_message::<Hi>()
+ .add_mapped_server_message::<Hi>()
+ .add_server_message::<SpawnEntity>()
.add_client_message::<DragRequestEvent>()
.add_client_message::<ToggleDrillEvent>()
@@ 69,18 71,22 @@ pub fn register_net(app: &mut App) {
.add_client_message::<ThrustSolution>();
}
-#[derive(Message, Deserialize, Serialize, TypePath, MapEntities)]
+#[derive(Message, Clone, Deserialize, Serialize, TypePath, MapEntities)]
pub struct Hi {
#[entities]
pub you_are: Entity,
pub time_offset: f64,
}
+#[derive(Message, Deserialize, Serialize, TypePath)]
+pub struct SpawnEntity {
+ pub server: Entity,
+}
pub fn setup_net(app: &mut App) {
app.insert_resource(ServerMessageRegistry::default());
app.insert_resource(ClientMessageRegistry::default());
- app.add_systems(PostUpdate, recv_from_server);
- app.add_systems(PostUpdate, recv_from_client);
+ app.add_systems(Update, recv_from_server);
+ app.add_systems(Update, recv_from_client);
}
#[derive(Message, Deref, Deserialize, Serialize)]
pub struct ToClients<T: Message + TypePath> {
@@ 106,17 112,34 @@ pub enum ClientId {
Server,
Client(Entity),
}
+#[derive(Component, Default)]
+pub struct Mapped;
#[derive(Resource, Default)]
+pub struct ServerEntityMap {
+ pub server_to_client: EntityHashMap<Entity>,
+ pub client_to_server: EntityHashMap<Entity>,
+}
+#[derive(Resource, Default)]
pub struct ServerMessageRegistry {
forward: HashMap<String, LaneIndex>,
reverse: HashMap<LaneIndex, fn(Vec<u8>, &mut World)>,
}
+impl ServerMessageRegistry {
+ pub fn message_count(&self) -> usize {
+ self.forward.len()
+ }
+}
#[derive(Resource, Default)]
pub struct ClientMessageRegistry {
forward: HashMap<String, LaneIndex>,
reverse: HashMap<LaneIndex, fn(Vec<u8>, ClientId, &mut World)>,
}
+impl ClientMessageRegistry {
+ pub fn message_count(&self) -> usize {
+ self.forward.len()
+ }
+}
static COUNTER: AtomicU32 = AtomicU32::new(0);
fn get_lane_index() -> LaneIndex {
@@ 124,35 147,68 @@ fn get_lane_index() -> LaneIndex {
}
pub trait NetAppExt {
+ fn add_mapped_server_message<T: Message + MapEntities + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self;
fn add_server_message<T: Message + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self;
+ fn add_mapped_client_message<T: Message + Clone + MapEntities + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self;
fn add_client_message<T: Message + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self;
}
impl NetAppExt for App {
+ fn add_mapped_server_message<T: Message + MapEntities + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self {
+ let mut registry = self.world_mut().resource_mut::<ServerMessageRegistry>();
+ register_mapped_server_message::<T>(&mut registry);
+ self
+ .add_message::<ToClients<T>>()
+ .add_message::<T>()
+ .add_systems(Update, send_to_client::<T>)
+ }
fn add_server_message<T: Message + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self {
+ let mut registry = self.world_mut().resource_mut::<ServerMessageRegistry>();
+ register_server_message::<T>(&mut registry);
self
.add_message::<ToClients<T>>()
.add_message::<T>()
- .add_systems(Startup, register_server_message::<T>)
.add_systems(Update, send_to_client::<T>)
}
+ fn add_mapped_client_message<T: Message + Clone + MapEntities + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self {
+ let mut registry = self.world_mut().resource_mut::<ClientMessageRegistry>();
+ register_client_message::<T>(&mut registry);
+ self
+ .add_message::<FromClients<T>>()
+ .add_message::<T>()
+ .add_systems(Update, send_mapped_to_server::<T>)
+ }
fn add_client_message<T: Message + TypePath + Serialize + for<'a> Deserialize<'a>>(&mut self) -> &mut Self {
+ let mut registry = self.world_mut().resource_mut::<ClientMessageRegistry>();
+ register_client_message::<T>(&mut registry);
self
.add_message::<FromClients<T>>()
.add_message::<T>()
- .add_systems(Startup, register_client_message::<T>)
.add_systems(Update, send_to_server::<T>)
}
}
-fn register_server_message<T: Message + TypePath + for<'a> Deserialize<'a>>(mut registry: ResMut<ServerMessageRegistry>) {
- registry.forward.insert(T::type_path().to_string(), get_lane_index());
- registry.reverse.insert(get_lane_index(), |payload: Vec<u8>, world: &mut World| {
+fn register_mapped_server_message<T: Message + MapEntities + TypePath + for<'a> Deserialize<'a>>(registry: &mut ServerMessageRegistry) {
+ let lane_index = get_lane_index();
+ registry.forward.insert(T::type_path().to_string(), lane_index);
+ registry.reverse.insert(lane_index, |payload: Vec<u8>, world: &mut World| {
+ let mut entity_map = world.resource_mut::<ServerEntityMap>();
+ let mut message = from_bytes::<T>(&payload).expect(&format!("Failed to deserialize message of type {}", T::type_path()));
+ message.map_entities(&mut entity_map.server_to_client);
+ debug!("entity map: {:?}", entity_map.server_to_client);
+ world.write_message(message).expect("Could not send message to game");
+ });
+}
+fn register_server_message<T: Message + TypePath + for<'a> Deserialize<'a>>(registry: &mut ServerMessageRegistry) {
+ let lane_index = get_lane_index();
+ registry.forward.insert(T::type_path().to_string(), lane_index);
+ registry.reverse.insert(lane_index, |payload: Vec<u8>, world: &mut World| {
let message = from_bytes::<T>(&payload).expect(&format!("Failed to deserialize message of type {}", T::type_path()));
world.write_message(message).expect("Could not send message to game");
});
}
-fn register_client_message<T: Message + TypePath + for<'a> Deserialize<'a>>(mut registry: ResMut<ClientMessageRegistry>) {
- registry.forward.insert(T::type_path().to_string(), get_lane_index());
- registry.reverse.insert(get_lane_index(), |payload: Vec<u8>, client_id: ClientId, world: &mut World| {
+fn register_client_message<T: Message + TypePath + for<'a> Deserialize<'a>>(registry: &mut ClientMessageRegistry) {
+ let lane_index = get_lane_index();
+ registry.forward.insert(T::type_path().to_string(), lane_index);
+ registry.reverse.insert(lane_index, |payload: Vec<u8>, client_id: ClientId, world: &mut World| {
let message = from_bytes::<T>(&payload).expect(&format!("Failed to deserialize message of type {}", T::type_path()));
world.write_message(FromClients {
client_id,
@@ 170,22 226,27 @@ fn send_to_client<M: Message + TypePath + Serialize>(
match &message.targets {
SendTargets::All => {
for (client_entity, mut transport) in &mut clients {
+ debug!("lane: {:?}", message_registry.forward.get(M::type_path()).unwrap());
+ debug!("registry: {:?}", message_registry.forward);
transport.send.push(
*message_registry.forward.get(M::type_path())
.expect("Failed to get message lane; the message likely isn't serialized yet"),
to_allocvec(&message).expect("Failed to serialize message").into(),
- Instant::now(),
+ bevy::platform::time::Instant::now(),
).expect("Failed to send message");
}
}
SendTargets::Single(client) => {
for (client_entity, mut transport) in &mut clients {
if let ClientId::Client(client) = client && client_entity == *client {
+ debug!("writing message");
+ debug!("lane: {:?}", message_registry.forward.get(M::type_path()).unwrap());
+ debug!("registry: {:?}", message_registry.forward);
transport.send.push(
*message_registry.forward.get(M::type_path())
.expect("Failed to get message lane; the message likely isn't serialized yet"),
to_allocvec(&message).expect("Failed to serialize message").into(),
- Instant::now(),
+ bevy::platform::time::Instant::now(),
).expect("Failed to send message");
}
}
@@ 204,7 265,26 @@ fn send_to_server<M: Message + TypePath + Serialize>(
*message_registry.forward.get(M::type_path())
.expect("Failed to get message lane; the message likely isn't serialized yet"),
to_allocvec(&message).expect("Failed to serialize message").into(),
- Instant::now(),
+ bevy::platform::time::Instant::now(),
+ ).expect("Failed to send message");
+ }
+ }
+}
+fn send_mapped_to_server<M: Message + Clone + MapEntities + TypePath + Serialize>(
+ mut messages: MessageReader<M>,
+ mut sessions: Query<&mut Transport, Without<ChildOf>>,
+ mut entity_map: ResMut<ServerEntityMap>,
+ message_registry: Res<ClientMessageRegistry>,
+) {
+ for message in messages.read() {
+ for mut transport in &mut sessions {
+ let mut message = message.clone();
+ message.map_entities(&mut entity_map.client_to_server);
+ transport.send.push(
+ *message_registry.forward.get(M::type_path())
+ .expect("Failed to get message lane; the message likely isn't serialized yet"),
+ to_allocvec(&message).expect("Failed to serialize message").into(),
+ bevy::platform::time::Instant::now(),
).expect("Failed to send message");
}
}
@@ 220,6 300,8 @@ fn recv_from_server(
let mut messages = Vec::new();
for mut transport in sessions.iter_mut() {
for message in transport.recv.msgs.drain() {
+ debug!("forward: {:?}", message_registry.forward);
+ debug!("reverse: {:?}", message_registry.reverse);
let payload = message.payload;
let message_fn = message_registry.reverse.get(&message.lane).expect("Packet was sent across a lane that didn't have a message assigned to it yet");
messages.push((*message_fn, payload));