mod message; use std::collections::{HashMap, HashSet}; use std::sync::atomic::AtomicU64; use bevy::ecs::component::HookContext; use bevy::ecs::world::DeferredWorld; use bevy::prelude::Component; use starkingdoms_proc::replicable; use bevy::prelude::*; use bevy::tasks::AsyncComputeTaskPool; use bevy_trait_query::{All, RegisterExt}; use serde::{Deserialize, Serialize}; use crate::replication::message::ReplicationMessage; pub struct ReplicationServerPlugin; impl Plugin for ReplicationServerPlugin { fn build(&self, app: &mut App) { let (tx_to_io_task, rx_from_io_task) = crossbeam_channel::unbounded(); let res = ReplicationServerResource { message_sender: tx_to_io_task, component_tracking_map: HashMap::new() }; let task_pool = AsyncComputeTaskPool::get(); task_pool.spawn(async move { while let Ok(message) = rx_from_io_task.recv() { debug!(?message, "broadcasting to clients"); } }).detach(); app .insert_resource(res) .replicate::() .add_systems(Update, replicate); } } #[derive(Resource)] struct ReplicationServerResource { message_sender: crossbeam_channel::Sender, component_tracking_map: HashMap>, } /// Implement this trait on a component to enable replication for that component. #[bevy_trait_query::queryable] #[typetag::serde(tag = "component")] pub trait Replicable { fn component_id(&self) -> &'static str { std::any::type_name::() } } /// Add this component to an entity to replicate it, and all of it's `Replicatable` components to clients #[derive(Component)] #[replicable] #[require(ReplicatedEntityID)] #[component(on_add = replicated_add_hook)] #[component(on_remove = replicated_remove_hook)] pub struct Replicated; fn replicated_add_hook(mut world: DeferredWorld, context: HookContext) { world.get_resource_mut::().unwrap().component_tracking_map.insert(context.entity, HashSet::new()); let replicated_id = world.get::(context.entity).unwrap(); world.get_resource::().unwrap().message_sender.send(ReplicationMessage::AddEntity(*replicated_id)).unwrap(); } fn replicated_remove_hook(mut world: DeferredWorld, context: HookContext) { world.get_resource_mut::().unwrap().component_tracking_map.remove(&context.entity); let replicated_id = world.get::(context.entity).unwrap(); world.get_resource::().unwrap().message_sender.send(ReplicationMessage::RemoveEntity(*replicated_id)).unwrap(); } static ID_COUNTER: AtomicU64 = AtomicU64::new(0); #[derive(Component, Debug, Serialize, Deserialize, Copy, Clone)] struct ReplicatedEntityID(u64); impl Default for ReplicatedEntityID { fn default() -> Self { Self(ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst)) } } /// The main server-side replication system. Tracks all changes and produce replication messages fn replicate( replicated_query: Query<(Entity, &ReplicatedEntityID, All<&dyn Replicable>), With>, mut replication_server_resource: ResMut ) { for (server_entity, replicated_entity, replicated_components) in &replicated_query { for added_component in replicated_components.iter_added() { let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap(); our_entry.insert(added_component.component_id().to_string()); } for updated_component in replicated_components.iter_changed() { let serialized_update = serde_json::to_value(updated_component.into_inner()).unwrap(); replication_server_resource.message_sender.send(ReplicationMessage::UpdateComponent( *replicated_entity, serialized_update )).unwrap(); } let mut existing_component_ids = vec![]; for component in replicated_components.iter() { existing_component_ids.push(component.component_id()); } let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap(); let mut removed_component_ids = vec![]; our_entry.retain(|u| { if !existing_component_ids.contains(&&**u) { removed_component_ids.push(u.clone()); false } else { true } }); for removed_component in &removed_component_ids { replication_server_resource.message_sender.send(ReplicationMessage::RemoveComponent( *replicated_entity, removed_component.to_string() )).unwrap(); } } } pub trait ReplicateExt { fn replicate(&mut self) -> &mut Self; } impl ReplicateExt for App { fn replicate(&mut self) -> &mut Self { self.register_component_as::(); self } }