M Cargo.lock => Cargo.lock +65 -6
@@ 3,6 3,17 @@
version = 3
[[package]]
+name = "async_io_stream"
+version = "0.3.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "b6d7b9decdf35d8908a7e3ef02f64c5e9b1695e230154c0e8de3969142d9b94c"
+dependencies = [
+ "futures",
+ "pharos",
+ "rustc_version",
+]
+
+[[package]]
name = "atty"
version = "0.2.14"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ 66,13 77,14 @@ dependencies = [
"futures",
"js-sys",
"log",
- "tokio",
- "tokio-tungstenite",
- "tungstenite",
+ "protocol",
+ "rmp-serde",
+ "serde",
"url",
"wasm-bindgen",
"wasm-bindgen-futures",
"web-sys",
+ "ws_stream_wasm",
]
[[package]]
@@ 473,6 485,16 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "478c572c3d73181ff3c2539045f6eb99e5491218eae919370993b890cdbdd98e"
[[package]]
+name = "pharos"
+version = "0.5.3"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "e9567389417feee6ce15dd6527a8a1ecac205ef62c2932bcf3d9f6fc5b78b414"
+dependencies = [
+ "futures",
+ "rustc_version",
+]
+
+[[package]]
name = "pin-project-lite"
version = "0.2.9"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ 503,11 525,8 @@ dependencies = [
name = "protocol"
version = "0.1.0"
dependencies = [
- "futures",
"rmp-serde",
"serde",
- "tokio-tungstenite",
- "tungstenite",
]
[[package]]
@@ 572,6 591,27 @@ dependencies = [
]
[[package]]
+name = "rustc_version"
+version = "0.4.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bfa0f585226d2e68097d4f95d113b15b83a82e819ab25717ec0590d9584ef366"
+dependencies = [
+ "semver",
+]
+
+[[package]]
+name = "semver"
+version = "1.0.17"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "bebd363326d05ec3e2f532ab7660680f3b02130d780c299bca73469d521bc0ed"
+
+[[package]]
+name = "send_wrapper"
+version = "0.6.0"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "cd0b0ec5f1c1ca621c432a25813d8d60c88abe6d3e08a3eb9cf37d97a0fe3d73"
+
+[[package]]
name = "serde"
version = "1.0.159"
source = "registry+https://github.com/rust-lang/crates.io-index"
@@ 1086,3 1126,22 @@ name = "windows_x86_64_msvc"
version = "0.42.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9aec5da331524158c6d1a4ac0ab1541149c0b9505fde06423b02f5ef0106b9f0"
+
+[[package]]
+name = "ws_stream_wasm"
+version = "0.7.4"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "7999f5f4217fe3818726b66257a4475f71e74ffd190776ad053fa159e50737f5"
+dependencies = [
+ "async_io_stream",
+ "futures",
+ "js-sys",
+ "log",
+ "pharos",
+ "rustc_version",
+ "send_wrapper",
+ "thiserror",
+ "wasm-bindgen",
+ "wasm-bindgen-futures",
+ "web-sys",
+]
M client/Cargo.toml => client/Cargo.toml +5 -4
@@ 14,9 14,10 @@ js-sys = "0.3"
web-sys = { version = "0.3", features = ["CanvasRenderingContext2d", "Document", "Element", "HtmlCanvasElement", "Window"]}
console_log = { version = "1", features = ["color"] }
log = "0.4"
-tungstenite = { version = "0.18", default-features = false }
-tokio-tungstenite = { version = "0.18" }
-tokio = { version = "1.27", features = ["macros", "sync", "rt-multi-thread"] }
futures = { version = "0.3", default-features = false }
wasm-bindgen-futures = "0.4"
-url = "2.3">
\ No newline at end of file
+url = "2.3"
+protocol = { version = "0.1.0", path = "../protocol" }
+rmp-serde = "1.1"
+ws_stream_wasm = "0.7"
+serde = { version = "1", features = ["derive"] }<
\ No newline at end of file
M client/src/lib.rs => client/src/lib.rs +62 -13
@@ 1,8 1,17 @@
use std::error::Error;
+use futures::stream::{SplitSink, SplitStream};
use futures::StreamExt;
-use log::{debug, error, info, Level};
-use tokio_tungstenite::connect_async;
+use log::{debug, error, info, Level, trace};
use wasm_bindgen::prelude::*;
+use ws_stream_wasm::{WsMessage, WsMeta, WsStream};
+use protocol::State;
+use protocol::PROTOCOL_VERSION;
+use protocol::MessageS2C;
+use protocol::MessageC2S;
+use futures::SinkExt;
+
+#[macro_use]
+pub mod macros;
#[wasm_bindgen]
extern {
@@ 15,30 24,70 @@ pub fn send_chat(chat: &str) {
}
#[wasm_bindgen]
-pub async fn rust_init(gateway: &str, username: &str) {
+pub async fn rust_init(gateway: &str, username: &str) -> Result<(), JsError> {
console_log::init_with_level(Level::Debug).unwrap();
info!("Logger setup successfully");
- match init(gateway, username).await {
- Ok(_) => (),
+ match main(gateway, username).await {
+ Ok(c) => c,
Err(e) => {
error!("Error initializing gateway client: {}", e);
- return;
+ return Err(JsError::new(&e.to_string()));
}
- }
+ };
- info!("Gateway client initialized successfully");
+ info!("Gateway client exited");
+
+ Ok(())
}
-pub async fn init(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
+pub struct Client {
+ pub state: State,
+ pub tx: SplitSink<WsStream, WsMessage>,
+ pub rx: SplitStream<WsStream>
+}
+
+pub async fn main(gateway: &str, username: &str) -> Result<(), Box<dyn Error>> {
info!("FAST CONNECT: {}", gateway);
let gateway_url = url::Url::parse(gateway)?;
- debug!("Gateway URL parsed");
- let (ws_stream, _) = connect_async(gateway_url).await?;
- debug!("Connected to gateway socket");
+ trace!("Gateway URL parsed");
+ let (_ws, ws_stream) = WsMeta::connect(gateway_url, None).await?;
+ trace!("Connected to gateway socket");
let (tx, rx) = ws_stream.split();
- debug!("Split stream, handshaking with server");
+
+ let mut client = Client {
+ state: State::Handshake,
+ tx,
+ rx
+ };
+
+ trace!("Split stream, handshaking with server");
+
+ send!(client.tx, &MessageC2S::Hello {
+ next_state: State::Play,
+ version: PROTOCOL_VERSION,
+ requested_username: username.to_string()
+ }).await?;
+
+ trace!("Sent handshake start packet");
+
+ if let Some(msg) = recv_now!(client.rx)? {
+ let typed_msg: MessageS2C = msg;
+
+ match typed_msg {
+ MessageS2C::Hello { version, given_username, next_state } => {
+ info!("FAST CONNECT - connected to server protocol {} given username {}, switching to state {:?}", version, given_username, next_state);
+ client.state = next_state;
+ },
+ MessageS2C::Goodbye { reason } => {
+ error!("server disconnected before finishing handshake: {:?}", reason);
+ return Err(format!("disconnected by server: {:?}", reason).into());
+ }
+ }
+ } else {
+ error!("Server closed the connection")
+ }
Ok(())
}=
\ No newline at end of file
A client/src/macros.rs => client/src/macros.rs +77 -0
@@ 0,0 1,77 @@
+use std::error::Error;
+use std::io;
+use futures::{AsyncRead, AsyncWrite, FutureExt, Stream, StreamExt};
+use futures::stream::SplitStream;
+use serde::{Deserialize, Serialize};
+use ws_stream_wasm::WsMessage;
+
+#[macro_export]
+macro_rules! send {
+ ($writer:expr,$pkt:expr) => {
+ $writer.send($crate::macros::__generic_packet_to_message($pkt).unwrap())
+ };
+}
+
+#[macro_export]
+macro_rules! recv {
+ ($reader:expr) => {
+ {
+ if let Some(future_result) = $reader.next().now_or_never() {
+ if let Some(msg) = future_result {
+ match msg {
+ Ok(msg) => {
+ if msg.is_binary() {
+ match rmp_serde::from_slice(&msg.into_data()) {
+ Ok(d) => Ok(Some(d)),
+ Err(e) => {
+ log::error!("error deserializing message: {}", e);
+ Ok(None)
+ }
+ }
+ } else {
+ Ok(None)
+ }
+ },
+ Err(e) => {
+ log::error!("error receiving message: {}", e);
+ Ok(None)
+ }
+ }
+ } else {
+ log::error!("pipe closed");
+ Err("Pipe closed")
+ }
+ } else {
+ Ok(None)
+ }
+ }
+ }
+}
+
+#[macro_export]
+macro_rules! recv_now {
+ ($reader:expr) => {
+ {
+ if let Some(msg) = $reader.next().await {
+ if let WsMessage::Binary(msg) = msg {
+ match rmp_serde::from_slice(&msg) {
+ Ok(d) => Ok(Some(d)),
+ Err(e) => {
+ log::error!("error deserializing message: {}", e);
+ Ok(None)
+ }
+ }
+ } else {
+ Ok(None)
+ }
+ } else {
+ log::error!("pipe closed");
+ Err("Pipe closed")
+ }
+ }
+ };
+}
+
+pub fn __generic_packet_to_message<T: Serialize>(pkt: &T) -> Result<WsMessage, rmp_serde::encode::Error> {
+ rmp_serde::to_vec(&pkt).map(WsMessage::from)
+}<
\ No newline at end of file
M protocol/Cargo.toml => protocol/Cargo.toml +1 -4
@@ 7,7 7,4 @@ edition = "2021"
[dependencies]
serde = { version = "1", features = ["derive"] }
-rmp-serde = "1.1"
-tungstenite = { version = "0.18", default-features = false }
-tokio-tungstenite = { version = "0.18" }
-futures = "0.3">
\ No newline at end of file
+rmp-serde = "1.1"<
\ No newline at end of file
M protocol/src/lib.rs => protocol/src/lib.rs +1 -4
@@ 1,7 1,4 @@
-use serde::{Serialize, Deserialize};
-
-#[macro_use]
-pub mod macros;
+use serde::{Deserialize, Serialize};
pub const PROTOCOL_VERSION: u32 = 1;
M server/src/client_handler.rs => server/src/client_handler.rs +4 -1
@@ 7,8 7,9 @@ use log::{error, info};
use tokio::sync::mpsc::Receiver;
use tokio_tungstenite::WebSocketStream;
use tungstenite::Message;
-use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, recv, send, State};
+use protocol::{GoodbyeReason, MessageC2S, MessageS2C, PROTOCOL_VERSION, ps2c, State};
use crate::handler::{ClientHandlerMessage, ClientManager};
+use crate::{send, recv};
pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx: Receiver<ClientHandlerMessage>, mut client_tx: SplitSink<WebSocketStream<Upgraded>, Message>, mut client_rx: SplitStream<WebSocketStream<Upgraded>>) -> Result<(), Box<dyn Error>> {
let mut state = State::Handshake;
@@ 23,6 24,8 @@ pub async fn handle_client(mgr: ClientManager, remote_addr: SocketAddr, mut rx:
break;
}
+ info!("here");
+
if let Some(pkt) = recv!(client_rx)? {
match state {
State::Handshake => {
M server/src/handler.rs => server/src/handler.rs +1 -1
@@ 10,7 10,7 @@ use protocol::State;
#[derive(Clone)]
pub struct ClientManager {
- pub clients: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
+ pub handlers: Arc<RwLock<HashMap<SocketAddr, ClientHandler>>>,
pub usernames: Arc<RwLock<HashMap<SocketAddr, String>>>
}
R protocol/src/macros.rs => server/src/macros.rs +32 -0
@@ 49,6 49,38 @@ macro_rules! recv {
}
}
+#[macro_export]
+macro_rules! recv_now {
+ ($reader:expr) => {
+ {
+ if let Some(msg) = $reader.next().await {
+ match msg {
+ Ok(msg) => {
+ if msg.is_binary() {
+ match rmp_serde::from_slice(&msg.into_data()) {
+ Ok(d) => Ok(Some(d)),
+ Err(e) => {
+ log::error!("error deserializing message: {}", e);
+ Ok(None)
+ }
+ }
+ } else {
+ Ok(None)
+ }
+ },
+ Err(e) => {
+ log::error!("error receiving message: {}", e);
+ Ok(None)
+ }
+ }
+ } else {
+ log::error!("pipe closed");
+ Err("Pipe closed")
+ }
+ }
+ };
+}
+
pub fn __generic_packet_to_message<T: Serialize>(pkt: &T) -> Result<Message, rmp_serde::encode::Error> {
rmp_serde::to_vec(&pkt).map(Message::from)
}=
\ No newline at end of file
M server/src/main.rs => server/src/main.rs +17 -8
@@ 1,10 1,10 @@
use std::convert::Infallible;
use std::net::SocketAddr;
use std::sync::Arc;
-use hyper::{header, upgrade, StatusCode, Body, Request, Response, Server, server::conn::AddrStream};
+use hyper::{Body, header, Request, Response, Server, server::conn::AddrStream, StatusCode, upgrade};
use hyper::service::{make_service_fn, service_fn};
use tokio_tungstenite::WebSocketStream;
-use tungstenite::{handshake, Error};
+use tungstenite::{Error, handshake};
use futures::stream::StreamExt;
use lazy_static::lazy_static;
use log::{error, info};
@@ 12,15 12,19 @@ use tokio::sync::RwLock;
use protocol::State;
use crate::handler::{ClientHandler, ClientManager};
use crate::client_handler::handle_client;
+use crate::timer::timer_main;
pub mod client_handler;
pub mod handler;
pub mod timer;
+#[macro_use]
+pub mod macros;
async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr: ClientManager) -> Result<Response<Body>, Infallible> {
match (request.uri().path(), request.headers().contains_key(header::UPGRADE)) {
//if the request is ws_echo and the request headers contains an Upgrade key
("/ws", true) => {
+ info!("received connection from {}", remote_addr);
//assume request is a handshake, so create the handshake response
let response =
match handshake::server::create_response_with_body(&request, || Body::empty()) {
@@ 32,7 36,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
match upgrade::on(&mut request).await {
//if successfully upgraded
Ok(upgraded) => {
-
+ info!("[{}] connection upgraded", remote_addr);
//create a websocket stream from the upgraded object
let ws_stream = WebSocketStream::from_raw_socket(
//pass the upgraded object
@@ 53,9 57,11 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
// Acquire the write lock in a small scope, so it's dropped as quickly as possible
{
- mgr.clients.write().await.insert(remote_addr, client);
+ mgr.handlers.write().await.insert(remote_addr, client);
}
+ info!("[{}] passing to client handler", remote_addr);
+
//forward the stream to the sink to achieve echo
match handle_client(mgr.clone(), remote_addr, rx, ws_write, ws_read).await {
Ok(_) => {},
@@ 64,7 70,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
// clean up values left over
{
- mgr.clients.write().await.remove(&remote_addr);
+ mgr.handlers.write().await.remove(&remote_addr);
mgr.usernames.write().await.remove(&remote_addr);
}
},
@@ 104,7 110,7 @@ async fn handle_request(mut request: Request<Body>, remote_addr: SocketAddr, mgr
lazy_static! {
static ref cmgr: ClientManager = ClientManager {
- clients: Arc::new(RwLock::new(Default::default())),
+ handlers: Arc::new(RwLock::new(Default::default())),
usernames: Arc::new(RwLock::new(Default::default())),
};
}
@@ 129,11 135,14 @@ async fn main() {
}
});
-
+ let mgr_timer = cmgr.clone();
+ let timer_thread = tokio::spawn(async move {
+ timer_main(mgr_timer).await;
+ });
let server = Server::bind(&addr).serve(make_svc);
if let Err(e) = server.await {
error!("error in server thread: {}", e);
}
-}>
\ No newline at end of file
+}
M server/src/timer.rs => server/src/timer.rs +2 -2
@@ 1,6 1,6 @@
use std::error::Error;
use std::time::Duration;
-use log::error;
+use log::{error, trace};
use tokio::sync::mpsc::Receiver;
use tokio::time::sleep;
use crate::handler::{ClientHandlerMessage, ClientManager};
@@ 9,7 9,7 @@ pub async fn timer_main(mgr: ClientManager) {
loop {
sleep(Duration::from_millis(5)).await;
- for (addr, client_thread) in mgr.clients.read().await.iter() {
+ for (addr, client_thread) in mgr.handlers.read().await.iter() {
match client_thread.tx.send(ClientHandlerMessage::Tick).await {
Ok(_) => (),
Err(e) => {
M web/play.html => web/play.html +1 -1
@@ 22,7 22,7 @@
// v
import init, { rust_init, send_chat } from "./dist/client.js";
init().then(() => {
- rust_init();
+ rust_init("ws://localhost:3000/ws", "core");
document.getElementById("chat-submit").addEventListener("click", e => {
send_chat(document.getElementById("chat-value").value);