1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
use anyhow::Error;
use futures::future::ready;
use futures::{Sink, SinkExt, Stream, StreamExt, TryStreamExt};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite;
use tokio_tungstenite::WebSocketStream;
use warp::ws::WebSocket;
pub type WsWrite = dyn Sink<Vec<u8>, Error = Error> + Unpin + Send;
pub type WsRead = dyn Stream<Item = Result<Vec<u8>, Error>> + Unpin + Send;
pub trait WebSock {
fn ws_split(self) -> (Box<WsWrite>, Box<WsRead>);
}
impl WebSock for WebSocketStream<TcpStream> {
fn ws_split(self) -> (Box<WsWrite>, Box<WsRead>) {
let (ws_write, ws_read) = self.split();
let ws_write = ws_write
.with(|bytes| ready(Ok(tungstenite::Message::Binary(bytes))))
.sink_map_err(|err: tungstenite::Error| anyhow!(err));
let ws_read = ws_read.map_ok(|ws_message| ws_message.into_data()).map_err(|err| anyhow!(err));
(Box::new(ws_write), Box::new(ws_read))
}
}
impl WebSock for WebSocket {
fn ws_split(
self,
) -> (
Box<dyn Sink<Vec<u8>, Error = Error> + Unpin + Send>,
Box<dyn Stream<Item = Result<Vec<u8>, Error>> + Unpin + Send>,
) {
let (ws_write, ws_read) = self.split();
let ws_write = ws_write
.with(|bytes| ready(Ok(warp::ws::Message::binary(bytes))))
.sink_map_err(|err: warp::Error| anyhow!(err));
let ws_read = ws_read.map_ok(|ws_message| ws_message.as_bytes().to_vec()).map_err(|err| anyhow!(err));
(Box::new(ws_write), Box::new(ws_read))
}
}