VecBuffer: add Serialization/Deserialization from Write/Reader

VecDiff: add Clear
This commit is contained in:
Michael Sippel 2021-04-24 15:57:09 +02:00
parent fd1040decb
commit c53f07e762
Signed by: senvas
GPG key ID: F96CF119C34B64A6
2 changed files with 152 additions and 10 deletions

View file

@ -11,7 +11,9 @@ termion = "1.5.5"
signal-hook = "0.3.1"
signal-hook-async-std = "0.2.0"
serde = { version = "1.0", features = ["derive"] }
bincode = "1.3.3"
serde_json = "*"
[dependencies.async-std]
version = "1.7.0"
version = "1.9.0"
features = ["unstable", "attributes"]

View file

@ -1,16 +1,24 @@
use {
std::{
sync::Arc,
ops::{Deref, DerefMut}
ops::{Deref, DerefMut},
io::Write
},
std::sync::RwLock,
async_std::{
io::{Read, ReadExt},
stream::{Stream, StreamExt}
},
serde::{Serialize, Deserialize, de::DeserializeOwned},
crate::{
core::{View, Observer, ObserverExt, ObserverBroadcast, ViewPort, InnerViewPort, OuterViewPort},
sequence::SequenceView,
}
};
#[derive(Clone, Serialize, Deserialize)]
pub enum VecDiff<T> {
Clear,
Push(T),
Remove(usize),
Insert{ idx: usize, val: T },
@ -26,6 +34,7 @@ where T: Clone + Send + Sync + 'static {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
/// Adapter View implementing `Sequence` for `Vec`
pub struct VecSequence<T>
where T: Clone + Send + Sync + 'static {
cur_len: RwLock<usize>,
@ -33,6 +42,21 @@ where T: Clone + Send + Sync + 'static {
cast: Arc<RwLock<ObserverBroadcast<dyn SequenceView<Item = T>>>>
}
/// Serialization Observer for `Vec`
pub struct VecBinWriter<T, W>
where T: Clone + Send + Sync + 'static,
W: Write + Send + Sync {
data: Option<Arc<RwLock<Vec<T>>>>,
out: RwLock<W>
}
pub struct VecJsonWriter<T, W>
where T: Clone + Send + Sync + 'static,
W: Write + Send + Sync {
data: Option<Arc<RwLock<Vec<T>>>>,
out: RwLock<W>
}
impl<T> OuterViewPort<RwLock<Vec<T>>>
where T: Clone + Send + Sync + 'static {
pub fn to_sequence(&self) -> OuterViewPort<dyn SequenceView<Item = T>> {
@ -43,6 +67,88 @@ where T: Clone + Send + Sync + 'static {
}
}
impl<T> OuterViewPort<RwLock<Vec<T>>>
where T: Clone + Serialize + Send + Sync + 'static {
pub fn serialize_bin<W: Write + Send + Sync + 'static>(&self, out: W) -> Arc<RwLock<VecBinWriter<T, W>>> {
let writer = Arc::new(RwLock::new(
VecBinWriter {
data: None,
out: RwLock::new(out),
}
));
self.add_observer(writer.clone());
writer
}
pub fn serialize_json<W: Write + Send + Sync + 'static>(&self, out: W) -> Arc<RwLock<VecJsonWriter<T, W>>> {
let writer = Arc::new(RwLock::new(
VecJsonWriter {
data: None,
out: RwLock::new(out),
}
));
self.add_observer(writer.clone());
writer
}
}
impl<T, W> Observer<RwLock<Vec<T>>> for VecBinWriter<T, W>
where T: Clone + Serialize + Send + Sync + 'static,
W: Write + Send + Sync
{
fn reset(&mut self, view: Option<Arc<RwLock<Vec<T>>>>) {
self.data = view;
let mut out = self.out.write().unwrap();
out.write(&bincode::serialized_size(&VecDiff::<T>::Clear).unwrap().to_le_bytes());
out.write(&bincode::serialize(&VecDiff::<T>::Clear).unwrap());
if let Some(data) = self.data.as_ref() {
for x in data.read().unwrap().iter() {
out.write(&bincode::serialized_size(&VecDiff::Push(x)).unwrap().to_le_bytes());
out.write(&bincode::serialize(&VecDiff::Push(x)).unwrap());
}
}
out.flush();
}
fn notify(&self, diff: &VecDiff<T>) {
let mut out = self.out.write().unwrap();
out.write(&bincode::serialized_size(diff).unwrap().to_le_bytes());
out.write(&bincode::serialize(diff).unwrap());
out.flush();
}
}
impl<T, W> Observer<RwLock<Vec<T>>> for VecJsonWriter<T, W>
where T: Clone + Serialize + Send + Sync + 'static,
W: Write + Send + Sync
{
fn reset(&mut self, view: Option<Arc<RwLock<Vec<T>>>>) {
self.data = view;
self.out.write().unwrap().write(&serde_json::to_string(&VecDiff::<T>::Clear).unwrap().as_bytes());
self.out.write().unwrap().write(b"\n");
if let Some(data) = self.data.as_ref() {
for x in data.read().unwrap().iter() {
self.out.write().unwrap().write(&serde_json::to_string(&VecDiff::Push(x)).unwrap().as_bytes());
self.out.write().unwrap().write(b"\n");
}
}
self.out.write().unwrap().flush();
}
fn notify(&self, diff: &VecDiff<T>) {
self.out.write().unwrap().write(serde_json::to_string(diff).unwrap().as_bytes());
self.out.write().unwrap().write(b"\n");
self.out.write().unwrap().flush();
}
}
impl<T> VecSequence<T>
where T: Clone + Send + Sync + 'static {
pub fn new(
@ -80,6 +186,15 @@ where T: Clone + Send + Sync + 'static {
fn notify(&self, diff: &VecDiff<T>) {
match diff {
VecDiff::Clear => {
let l = {
let mut l = self.cur_len.write().unwrap();
let old_l = *l;
*l = 0;
old_l
};
self.cast.notify_each(0 .. l)
},
VecDiff::Push(_) => {
let l = {
let mut l = self.cur_len.write().unwrap();
@ -135,13 +250,38 @@ where T: Clone + Send + Sync + 'static {
#[derive(Clone)]
pub struct VecBuffer<T>
where T: Clone + Send + Sync + 'static {
where T: Clone + Send + Sync + 'static
{
data: Arc<RwLock<Vec<T>>>,
cast: Arc<RwLock<ObserverBroadcast<RwLock<Vec<T>>>>>
}
impl<T> VecBuffer<T>
where T: Clone + Send + Sync + 'static {
where T: DeserializeOwned + Clone + Send + Sync + 'static
{
pub async fn from_json<R: Read + async_std::io::Read + Unpin>(&mut self, read: R) {
let mut bytes = read.bytes();
let mut s = String::new();
while let Some(Ok(b)) = bytes.next().await {
match b {
b'\n' => {
if s.len() > 0 {
let diff = serde_json::from_str::<VecDiff<T>>(&s).expect("error parsing json");
self.apply_diff(diff);
s.clear();
}
},
c => {
s.push(c as char);
}
}
}
}
}
impl<T> VecBuffer<T>
where T: Clone + Send + Sync + 'static
{
pub fn with_data(
data: Vec<T>,
port: InnerViewPort<RwLock<Vec<T>>>
@ -158,19 +298,15 @@ where T: Clone + Send + Sync + 'static {
pub fn apply_diff(&mut self, diff: VecDiff<T>) {
let mut data = self.data.write().unwrap();
match &diff {
VecDiff::Clear => { data.clear(); },
VecDiff::Push(val) => { data.push(val.clone()); },
VecDiff::Remove(idx) => { data.remove(*idx); },
VecDiff::Insert{ idx, val } => { data.insert(*idx, val.clone()); },
VecDiff::Update{ idx, val } => { data[*idx] = val.clone(); }
}
drop(data);
self.cast.notify(&diff);
}
pub fn clear(&mut self) {
for _ in 0 .. self.len() {
self.remove(0);
}
self.cast.notify(&diff);
}
pub fn len(&self) -> usize {
@ -181,6 +317,10 @@ where T: Clone + Send + Sync + 'static {
self.data.read().unwrap()[idx].clone()
}
pub fn clear(&mut self) {
self.apply_diff(VecDiff::Clear);
}
pub fn push(&mut self, val: T) {
self.apply_diff(VecDiff::Push(val));
}