From c53f07e762a7b953a70c5010614201f04df87ad8 Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Sat, 24 Apr 2021 15:57:09 +0200 Subject: [PATCH] VecBuffer: add Serialization/Deserialization from Write/Reader VecDiff: add Clear --- nested/Cargo.toml | 4 +- nested/src/sequence/vec_buffer.rs | 158 ++++++++++++++++++++++++++++-- 2 files changed, 152 insertions(+), 10 deletions(-) diff --git a/nested/Cargo.toml b/nested/Cargo.toml index d8a83d0..4db1541 100644 --- a/nested/Cargo.toml +++ b/nested/Cargo.toml @@ -11,7 +11,9 @@ termion = "1.5.5" signal-hook = "0.3.1" signal-hook-async-std = "0.2.0" serde = { version = "1.0", features = ["derive"] } +bincode = "1.3.3" +serde_json = "*" [dependencies.async-std] -version = "1.7.0" +version = "1.9.0" features = ["unstable", "attributes"] diff --git a/nested/src/sequence/vec_buffer.rs b/nested/src/sequence/vec_buffer.rs index 220d52d..04643c4 100644 --- a/nested/src/sequence/vec_buffer.rs +++ b/nested/src/sequence/vec_buffer.rs @@ -1,16 +1,24 @@ use { std::{ sync::Arc, - ops::{Deref, DerefMut} + ops::{Deref, DerefMut}, + io::Write }, std::sync::RwLock, + async_std::{ + io::{Read, ReadExt}, + stream::{Stream, StreamExt} + }, + serde::{Serialize, Deserialize, de::DeserializeOwned}, crate::{ core::{View, Observer, ObserverExt, ObserverBroadcast, ViewPort, InnerViewPort, OuterViewPort}, sequence::SequenceView, } }; +#[derive(Clone, Serialize, Deserialize)] pub enum VecDiff { + Clear, Push(T), Remove(usize), Insert{ idx: usize, val: T }, @@ -26,6 +34,7 @@ where T: Clone + Send + Sync + 'static { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> +/// Adapter View implementing `Sequence` for `Vec` pub struct VecSequence where T: Clone + Send + Sync + 'static { cur_len: RwLock, @@ -33,6 +42,21 @@ where T: Clone + Send + Sync + 'static { cast: Arc>>> } +/// Serialization Observer for `Vec` +pub struct VecBinWriter +where T: Clone + Send + Sync + 'static, + W: Write + Send + Sync { + data: Option>>>, + out: RwLock +} + +pub struct VecJsonWriter +where T: Clone + Send + Sync + 'static, + W: Write + Send + Sync { + data: Option>>>, + out: RwLock +} + impl OuterViewPort>> where T: Clone + Send + Sync + 'static { pub fn to_sequence(&self) -> OuterViewPort> { @@ -43,6 +67,88 @@ where T: Clone + Send + Sync + 'static { } } +impl OuterViewPort>> +where T: Clone + Serialize + Send + Sync + 'static { + pub fn serialize_bin(&self, out: W) -> Arc>> { + let writer = Arc::new(RwLock::new( + VecBinWriter { + data: None, + out: RwLock::new(out), + } + )); + self.add_observer(writer.clone()); + writer + } + + pub fn serialize_json(&self, out: W) -> Arc>> { + let writer = Arc::new(RwLock::new( + VecJsonWriter { + data: None, + out: RwLock::new(out), + } + )); + self.add_observer(writer.clone()); + writer + } +} + + +impl Observer>> for VecBinWriter +where T: Clone + Serialize + Send + Sync + 'static, + W: Write + Send + Sync +{ + fn reset(&mut self, view: Option>>>) { + self.data = view; + let mut out = self.out.write().unwrap(); + + out.write(&bincode::serialized_size(&VecDiff::::Clear).unwrap().to_le_bytes()); + out.write(&bincode::serialize(&VecDiff::::Clear).unwrap()); + + if let Some(data) = self.data.as_ref() { + for x in data.read().unwrap().iter() { + out.write(&bincode::serialized_size(&VecDiff::Push(x)).unwrap().to_le_bytes()); + out.write(&bincode::serialize(&VecDiff::Push(x)).unwrap()); + } + } + + out.flush(); + } + + fn notify(&self, diff: &VecDiff) { + let mut out = self.out.write().unwrap(); + out.write(&bincode::serialized_size(diff).unwrap().to_le_bytes()); + out.write(&bincode::serialize(diff).unwrap()); + out.flush(); + } +} + +impl Observer>> for VecJsonWriter +where T: Clone + Serialize + Send + Sync + 'static, + W: Write + Send + Sync +{ + fn reset(&mut self, view: Option>>>) { + self.data = view; + + self.out.write().unwrap().write(&serde_json::to_string(&VecDiff::::Clear).unwrap().as_bytes()); + self.out.write().unwrap().write(b"\n"); + + if let Some(data) = self.data.as_ref() { + for x in data.read().unwrap().iter() { + self.out.write().unwrap().write(&serde_json::to_string(&VecDiff::Push(x)).unwrap().as_bytes()); + self.out.write().unwrap().write(b"\n"); + } + } + + self.out.write().unwrap().flush(); + } + + fn notify(&self, diff: &VecDiff) { + self.out.write().unwrap().write(serde_json::to_string(diff).unwrap().as_bytes()); + self.out.write().unwrap().write(b"\n"); + self.out.write().unwrap().flush(); + } +} + impl VecSequence where T: Clone + Send + Sync + 'static { pub fn new( @@ -80,6 +186,15 @@ where T: Clone + Send + Sync + 'static { fn notify(&self, diff: &VecDiff) { match diff { + VecDiff::Clear => { + let l = { + let mut l = self.cur_len.write().unwrap(); + let old_l = *l; + *l = 0; + old_l + }; + self.cast.notify_each(0 .. l) + }, VecDiff::Push(_) => { let l = { let mut l = self.cur_len.write().unwrap(); @@ -135,13 +250,38 @@ where T: Clone + Send + Sync + 'static { #[derive(Clone)] pub struct VecBuffer -where T: Clone + Send + Sync + 'static { +where T: Clone + Send + Sync + 'static +{ data: Arc>>, cast: Arc>>>> } impl VecBuffer -where T: Clone + Send + Sync + 'static { +where T: DeserializeOwned + Clone + Send + Sync + 'static +{ + pub async fn from_json(&mut self, read: R) { + let mut bytes = read.bytes(); + let mut s = String::new(); + while let Some(Ok(b)) = bytes.next().await { + match b { + b'\n' => { + if s.len() > 0 { + let diff = serde_json::from_str::>(&s).expect("error parsing json"); + self.apply_diff(diff); + s.clear(); + } + }, + c => { + s.push(c as char); + } + } + } + } +} + +impl VecBuffer +where T: Clone + Send + Sync + 'static +{ pub fn with_data( data: Vec, port: InnerViewPort>> @@ -158,19 +298,15 @@ where T: Clone + Send + Sync + 'static { pub fn apply_diff(&mut self, diff: VecDiff) { let mut data = self.data.write().unwrap(); match &diff { + VecDiff::Clear => { data.clear(); }, VecDiff::Push(val) => { data.push(val.clone()); }, VecDiff::Remove(idx) => { data.remove(*idx); }, VecDiff::Insert{ idx, val } => { data.insert(*idx, val.clone()); }, VecDiff::Update{ idx, val } => { data[*idx] = val.clone(); } } drop(data); - self.cast.notify(&diff); - } - pub fn clear(&mut self) { - for _ in 0 .. self.len() { - self.remove(0); - } + self.cast.notify(&diff); } pub fn len(&self) -> usize { @@ -181,6 +317,10 @@ where T: Clone + Send + Sync + 'static { self.data.read().unwrap()[idx].clone() } + pub fn clear(&mut self) { + self.apply_diff(VecDiff::Clear); + } + pub fn push(&mut self, val: T) { self.apply_diff(VecDiff::Push(val)); }