~starkingdoms/starkingdoms

ref: b6949834d53aadf53fa99b5f914a3dac09702395 starkingdoms/crates/unified/src/replication/mod.rs -rw-r--r-- 5.0 KiB
b6949834 — core replication experiment 5 months ago
                                                                                
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
mod message;

use std::collections::{HashMap, HashSet};
use std::sync::atomic::AtomicU64;
use bevy::ecs::component::HookContext;
use bevy::ecs::world::DeferredWorld;
use bevy::prelude::Component;
use starkingdoms_proc::replicable;
use bevy::prelude::*;
use bevy::tasks::AsyncComputeTaskPool;
use bevy_trait_query::{All, RegisterExt};
use serde::{Deserialize, Serialize};
use crate::replication::message::ReplicationMessage;

pub struct ReplicationServerPlugin;
impl Plugin for ReplicationServerPlugin {
    fn build(&self, app: &mut App) {
        let (tx_to_io_task, rx_from_io_task) = crossbeam_channel::unbounded();
        let res = ReplicationServerResource { message_sender: tx_to_io_task, component_tracking_map: HashMap::new() };

        let task_pool = AsyncComputeTaskPool::get();
        task_pool.spawn(async move {
            while let Ok(message) = rx_from_io_task.recv() {
                debug!(?message, "broadcasting to clients");
            }
        }).detach();

        app
            .insert_resource(res)
            .replicate::<Replicated>()
            .add_systems(Update, replicate);

    }
}

#[derive(Resource)]
struct ReplicationServerResource {
    message_sender: crossbeam_channel::Sender<message::ReplicationMessage>,
    component_tracking_map: HashMap<Entity, HashSet<String>>,
}

/// Implement this trait on a component to enable replication for that component.
#[bevy_trait_query::queryable]
#[typetag::serde(tag = "component")]
pub trait Replicable {
    fn component_id(&self) -> &'static str {
        std::any::type_name::<Self>()
    }
}

/// Add this component to an entity to replicate it, and all of it's `Replicatable` components to clients
#[derive(Component)]
#[replicable]
#[require(ReplicatedEntityID)]
#[component(on_add = replicated_add_hook)]
#[component(on_remove = replicated_remove_hook)]
pub struct Replicated;

fn replicated_add_hook(mut world: DeferredWorld, context: HookContext) {
    world.get_resource_mut::<ReplicationServerResource>().unwrap().component_tracking_map.insert(context.entity, HashSet::new());

    let replicated_id = world.get::<ReplicatedEntityID>(context.entity).unwrap();
    world.get_resource::<ReplicationServerResource>().unwrap().message_sender.send(ReplicationMessage::AddEntity(*replicated_id)).unwrap();
}
fn replicated_remove_hook(mut world: DeferredWorld, context: HookContext) {
    world.get_resource_mut::<ReplicationServerResource>().unwrap().component_tracking_map.remove(&context.entity);

    let replicated_id = world.get::<ReplicatedEntityID>(context.entity).unwrap();
    world.get_resource::<ReplicationServerResource>().unwrap().message_sender.send(ReplicationMessage::RemoveEntity(*replicated_id)).unwrap();
}

static ID_COUNTER: AtomicU64 = AtomicU64::new(0);

#[derive(Component, Debug, Serialize, Deserialize, Copy, Clone)]
struct ReplicatedEntityID(u64);
impl Default for ReplicatedEntityID {
    fn default() -> Self {
        Self(ID_COUNTER.fetch_add(1, std::sync::atomic::Ordering::SeqCst))
    }
}


/// The main server-side replication system. Tracks all changes and produce replication messages
fn replicate(
    replicated_query: Query<(Entity, &ReplicatedEntityID, All<&dyn Replicable>), With<Replicated>>,
    mut replication_server_resource: ResMut<ReplicationServerResource>
) {
    for (server_entity, replicated_entity, replicated_components) in &replicated_query {
        for added_component in replicated_components.iter_added() {
            let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap();
            our_entry.insert(added_component.component_id().to_string());
        }

        for updated_component in replicated_components.iter_changed() {
            let serialized_update = serde_json::to_value(updated_component.into_inner()).unwrap();
            replication_server_resource.message_sender.send(ReplicationMessage::UpdateComponent(
                *replicated_entity,
                serialized_update
            )).unwrap();
        }
        
        let mut existing_component_ids = vec![];
        for component in replicated_components.iter() {
            existing_component_ids.push(component.component_id());
        }
        let our_entry = replication_server_resource.component_tracking_map.get_mut(&server_entity).unwrap();
        let mut removed_component_ids = vec![];
        our_entry.retain(|u| {
            if !existing_component_ids.contains(&&**u) {
                removed_component_ids.push(u.clone());
                false
            } else { true }
        });
        
        for removed_component in &removed_component_ids {
            replication_server_resource.message_sender.send(ReplicationMessage::RemoveComponent(
                *replicated_entity,
                removed_component.to_string()
            )).unwrap();
        }
    }
}

pub trait ReplicateExt {
    fn replicate<T: Replicable + Component>(&mut self) -> &mut Self;
}
impl ReplicateExt for App {
    fn replicate<T: Replicable + Component>(&mut self) -> &mut Self {
        self.register_component_as::<dyn Replicable, T>();
        self
    }
}