mod message;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use bevy::ecs::component::HookContext;
use bevy::ecs::world::DeferredWorld;
use bevy::prelude::Component;
use starkingdoms_proc::replicable;
use bevy::prelude::*;
use bevy::tasks::AsyncComputeTaskPool;
use bevy_trait_query::{All, RegisterExt};
use serde::{Deserialize, Serialize};
use crate::replication::message::ReplicationMessage;
pub struct ReplicationServerPlugin;
impl Plugin for ReplicationServerPlugin {
fn build(&self, app: &mut App) {
let (tx_to_io_task, rx_from_io_task) = crossbeam_channel::unbounded();
let res = ReplicationServerResource { message_sender: tx_to_io_task, component_tracking_map: HashMap::new() };
let task_pool = AsyncComputeTaskPool::get();
task_pool.spawn(async move {
while let Ok(message) = rx_from_io_task.recv() {
debug!(?message, "broadcasting to clients");
}
}).detach();
app
.insert_resource(res)
.replicate::<Replicated>()
.add_systems(Update, replicate);
}
}
#[derive(Resource)]
struct ReplicationServerResource {
message_sender: crossbeam_channel::Sender<message::ReplicationMessage>,
component_tracking_map: HashMap<Entity, HashSet<String>>,
}
/// Implement this trait on a component to enable replication for that component.
#[bevy_trait_query::queryable]
#[typetag::serde(tag = "component")]
pub trait Replicable {
fn component_id(&self) -> &'static str {
std::any::type_name::<Self>()
}
}
/// Add this component to an entity to replicate it, and all of it's `Replicatable` components to clients
#[derive(Component)]
#[replicable]
#[require(ReplicatedEntityID)]
#[component(on_add = replicated_add_hook)]
#[component(on_remove = replicated_remove_hook)]
pub struct Replicated;
fn replicated_add_hook(mut world: DeferredWorld, context: HookContext) {
world.get_resource_mut::<ReplicationServerResource>().unwrap().component_tracking_map.insert(context.entity, HashSet::new());
let replicated_id = world.get::<ReplicatedEntityID>(context.entity).unwrap();
world.get_resource::<ReplicationServerResource>().unwrap().message_sender.send(ReplicationMessage::AddEntity(*replicated_id)).unwrap();
}
fn replicated_remove_hook(mut world: DeferredWorld, context: HookContext) {
world.get_resource_mut::<ReplicationServerResource>().unwrap().component_tracking_map.remove(&context.entity);
let replicated_id = world.get::<ReplicatedEntityID>(context.entity).unwrap();
world.get_resource::<ReplicationServerResource>().unwrap().message_sender.send(ReplicationMessage::RemoveEntity(*replicated_id)).unwrap();
}
static ID_COUNTER: AtomicU64 = AtomicU64::new(0);
#[derive(Component, Debug, Serialize, Deserialize, Copy, Clone)]
struct ReplicatedEntityID(u64);
impl Default for ReplicatedEntityID {
fn default() -> Self {
Self(ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
}
}
/// The main server-side replication system. Tracks all changes and produce replication messages
fn replicate(
replicated_query: Query<(Entity, &ReplicatedEntityID, All<&dyn Replicable>), With<Replicated>>,
mut replication_server_resource: ResMut<ReplicationServerResource>
) {
for (server_entity, replicated_entity, replicated_components) in &replicated_query {
for added_component in replicated_components.iter_added() {
let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap();
our_entry.insert(added_component.component_id().to_string());
}
for updated_component in replicated_components.iter_changed() {
let serialized_update = serde_json::to_value(updated_component.into_inner()).unwrap();
replication_server_resource.message_sender.send(ReplicationMessage::UpdateComponent(
*replicated_entity,
serialized_update
)).unwrap();
}
let mut existing_component_ids = vec![];
for component in replicated_components.iter() {
existing_component_ids.push(component.component_id());
}
let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap();
let mut removed_component_ids = vec![];
our_entry.retain(|u| {
if !existing_component_ids.contains(&&**u) {
removed_component_ids.push(u.clone());
false
} else { true }
});
for removed_component in &removed_component_ids {
replication_server_resource.message_sender.send(ReplicationMessage::RemoveComponent(
*replicated_entity,
removed_component.to_string()
)).unwrap();
}
}
}
pub trait ReplicateExt {
fn replicate<T: Replicable + Component>(&mut self) -> &mut Self;
}
impl ReplicateExt for App {
fn replicate<T: Replicable + Component>(&mut self) -> &mut Self {
self.register_component_as::<dyn Replicable, T>();
self
}
}