From 6e3abbde41461f5bd76357cd63c05cbb4d7eb300 Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Sat, 5 Jun 2021 23:01:32 +0200 Subject: [PATCH] refactor sequence flatten() to use ProjectionHelper --- nested/src/projection.rs | 6 +- nested/src/sequence/flatten.rs | 268 ++++++++++++--------------------- 2 files changed, 99 insertions(+), 175 deletions(-) diff --git a/nested/src/projection.rs b/nested/src/projection.rs index 8f78778..bc0ee05 100644 --- a/nested/src/projection.rs +++ b/nested/src/projection.rs @@ -74,9 +74,7 @@ impl ProjectionHelper

{ notify: impl Fn(&mut P, &Key) + Send + Sync + 'static ) -> Arc>>>> { self.update_hooks.write().unwrap().push(Arc::new(port.0.clone())); - - let arg = self.new_arg(notify, set_channel()); - port.add_observer(arg); + port.add_observer(self.new_arg(notify, set_channel())); port.get_view_arc() } @@ -128,7 +126,7 @@ where P: Send + Sync + 'static, D: ChannelData, D::IntoIter: Send + Sync { - fn update(&self) { + fn update(&self) { if let Some(p) = self.proj.read().unwrap().upgrade() { if let Some(data) = self.rx.try_recv() { for msg in data { diff --git a/nested/src/sequence/flatten.rs b/nested/src/sequence/flatten.rs index 4fcedc8..e2383fe 100644 --- a/nested/src/sequence/flatten.rs +++ b/nested/src/sequence/flatten.rs @@ -2,105 +2,61 @@ use { async_std::stream::StreamExt, std::{ sync::{Arc}, - collections::{HashMap, HashSet} + collections::{BTreeMap, HashSet} }, std::sync::RwLock, crate::{ core::{ View, Observer, ObserverBroadcast, ObserverExt, ViewPort, InnerViewPort, OuterViewPort, - channel::{ChannelSender, ChannelReceiver} + channel::{ChannelSender, ChannelReceiver}, + port::UpdateTask }, - sequence::SequenceView + sequence::SequenceView, + projection::ProjectionHelper } }; -impl OuterViewPort -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static -{ - pub fn flatten(&self) -> OuterViewPort> { +impl OuterViewPort>>> +where Item: 'static{ + pub fn flatten(&self) -> OuterViewPort> { let port = ViewPort::new(); Flatten::new(self.clone(), port.inner()); port.into_outer() } } -pub struct Flatten -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static -{ - length: usize, - top: Arc>>, - chunks: HashMap>>>, - - _cast: Arc>>> -} - -struct TopObserver -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static -{ - view: Option>, - sender: ChannelSender> -} - -struct BotObserver -where V2: SequenceView + ?Sized + 'static +pub struct Chunk +where Item: 'static { offset: usize, - view: Option>, - sender: ChannelSender> + len: usize, + view: Arc> } -impl Observer for TopObserver -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static +pub struct Flatten +where Item: 'static { - fn reset(&mut self, view: Option>) { - let old_len = self.view.len().unwrap_or(0); - self.view = view; - let new_len = self.view.len().unwrap_or(0); - - self.notify_each(0 .. std::cmp::max(old_len, new_len)); - } - - fn notify(&mut self, chunk_idx: &usize) { - self.sender.send(*chunk_idx); - } + length: usize, + top: Arc>>>, + chunks: BTreeMap>, + cast: Arc>>>, + proj_helper: ProjectionHelper } -impl Observer for BotObserver -where V2: SequenceView + ?Sized + 'static -{ - fn reset(&mut self, src: Option>) { - let old_len = self.view.len().unwrap_or(0); - self.view = src; - let new_len = self.view.len().unwrap_or(0); - - self.notify_each(0 .. std::cmp::max(old_len, new_len)); - } - - fn notify(&mut self, idx: &usize) { - self.sender.send(*idx); - } -} - -impl View for Flatten -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static +impl View for Flatten +where Item: 'static { type Msg = usize; } -impl SequenceView for Flatten -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static +impl SequenceView for Flatten +where Item: 'static { - type Item = V2::Item; + type Item = Item; fn get(&self, idx: &usize) -> Option { - let chunk = self.chunks[&self.get_chunk_idx(*idx)?].read().unwrap(); + let chunk = self.chunks.get(&self.get_chunk_idx(*idx)?)?; chunk.view.get(&(*idx - chunk.offset)) } @@ -109,127 +65,98 @@ where V1: SequenceView> + ?Sized + 'static, } } -impl Flatten -where V1: SequenceView> + ?Sized + 'static, - V2: SequenceView + ?Sized + 'static +/* TODO: remove unused projection args (bot-views) if they get replaced by a new viewport */ +impl Flatten +where Item: 'static { pub fn new( - top_port: OuterViewPort, - out_port: InnerViewPort> + top_port: OuterViewPort>>>, + out_port: InnerViewPort> ) -> Arc> { - let (sender, mut recv) = crate::core::channel::set_channel(); + let mut proj_helper = ProjectionHelper::new(out_port.0.update_hooks.clone()); - let top_obs = Arc::new(RwLock::new( - TopObserver { - view: None, - sender - } - )); - - let flat = Arc::new(RwLock::new(Flatten:: { - length: 0, - top: top_obs.clone(), - chunks: HashMap::new(), - _cast: out_port.get_broadcast() - })); - - let f = flat.clone(); - let cast = out_port.get_broadcast(); - async_std::task::spawn(async move { - while let Some(chunk_idx) = recv.next().await { - if let Some(mut chunk_rcv) = f.write().unwrap().update_chunk(chunk_idx) { - let f = f.clone(); - let mut cast = cast.clone(); - async_std::task::spawn(async move { - while let Some(idx) = chunk_rcv.next().await { - let mut flat = f.write().unwrap(); - - let chunk = flat.chunks[&chunk_idx].read().unwrap(); - let chunk_offset = chunk.offset; - let chunk_len = chunk.view.len().unwrap_or(0); - drop(chunk); - - let mut dirty_idx = Vec::new(); - if idx+1 >= chunk_len { - dirty_idx = flat.update_offsets(chunk_idx); - } - - drop(flat); - - cast.notify(&(idx + chunk_offset)); - cast.notify_each(dirty_idx); - } - }); - } - } - }); - - top_port.add_observer(top_obs); + let flat = Arc::new(RwLock::new( + Flatten { + length: 0, + top: proj_helper.new_sequence_arg( + top_port, + |s: &mut Self, chunk_idx| { + s.update_chunk(*chunk_idx); + } + ), + chunks: BTreeMap::new(), + cast: out_port.get_broadcast(), + proj_helper + })); + flat.write().unwrap().proj_helper.set_proj(&flat); out_port.set_view(Some(flat.clone())); flat } /// the top-sequence has changed the item at chunk_idx, /// create a new observer for the contained sub sequence - fn update_chunk(&mut self, chunk_idx: usize) -> Option>> { - if let Some(chunk_port) = self.top.read().unwrap().view.get(&chunk_idx) { - let (sender, recv) = crate::core::channel::set_channel(); - let chunk_obs = Arc::new(RwLock::new( - BotObserver { - offset: - if chunk_idx > 0 { - if let Some(prev_chunk) = self.chunks.get(&(chunk_idx-1)) { - let prev_chunk = prev_chunk.read().unwrap(); - prev_chunk.offset + prev_chunk.view.len().unwrap_or(0) - } else { - 0 + fn update_chunk(&mut self, chunk_idx: usize) { + if let Some(chunk_port) = self.top.get(&chunk_idx) { + self.chunks.insert( + chunk_idx, + Chunk { + offset: 0, // will be adjusted by update_offsets() later + len: 0, + view: self.proj_helper.new_sequence_arg( + chunk_port.clone(), + move |s: &mut Self, idx| { + if let Some(chunk) = s.chunks.get(&chunk_idx) { + let chunk_offset = chunk.offset; + let chunk_len = chunk.view.len().unwrap_or(0); + + let mut dirty_idx = Vec::new(); + if chunk.len != chunk_len { + dirty_idx = s.update_all_offsets(); + } + + s.cast.notify(&(idx + chunk_offset)); + s.cast.notify_each(dirty_idx); + } } - } else { - 0 - }, - view: None, - sender + ) } - )); + ); - self.chunks.insert(chunk_idx, chunk_obs.clone()); - chunk_port.add_observer(chunk_obs); + chunk_port.0.update(); - Some(recv) + let dirty_idx = self.update_all_offsets(); + self.cast.notify_each(dirty_idx); } else { + // todo: + //self.proj_helper.remove_arg(); + self.chunks.remove(&chunk_idx); - None + + let dirty_idx = self.update_all_offsets(); + self.cast.notify_each(dirty_idx); } } /// recalculate all chunk offsets beginning at start_idx /// and update length of flattened sequence - fn update_offsets(&mut self, start_idx: usize) -> Vec { - let top_len = self.top.read().unwrap().view.len().unwrap_or(0); - - let first_chunk = self.chunks.get(&start_idx).unwrap().read().unwrap(); - let start_offset = first_chunk.offset + first_chunk.view.len().unwrap_or(0); - let mut cur_offset = start_offset; - + fn update_all_offsets(&mut self) -> Vec { let mut dirty_idx = Vec::new(); + let mut cur_offset = 0; - for chunk_idx in start_idx+1 .. top_len { - if let Some(cur_chunk) = self.chunks.get(&chunk_idx) { - let mut cur_chunk = cur_chunk.write().unwrap(); + for (chunk_idx, chunk) in self.chunks.iter_mut() { + let old_offset = chunk.offset; + chunk.offset = cur_offset; + chunk.len = chunk.view.len().unwrap_or(0); - let chunk_len = cur_chunk.view.len().unwrap_or(0); - let old_offset = cur_chunk.offset; - cur_chunk.offset = cur_offset; - - if old_offset != cur_offset { - dirty_idx.extend( - std::cmp::min(old_offset, cur_offset) - .. std::cmp::max(old_offset, cur_offset) + chunk_len - ); - } - cur_offset += chunk_len; + if old_offset != cur_offset { + dirty_idx.extend( + std::cmp::min(old_offset, cur_offset) + .. std::cmp::max(old_offset, cur_offset) + chunk.len + ); } + + cur_offset += chunk.len; } let old_length = self.length; @@ -241,13 +168,12 @@ where V1: SequenceView> + ?Sized + 'static, /// given an index in the flattened sequence, /// which sub-sequence does it belong to? - fn get_chunk_idx(&self, glob_idx: usize) -> Option { - for chunk_idx in 0 .. self.top.read().unwrap().view.len().unwrap_or(0) { - if let Some(cur_chunk) = self.chunks.get(&chunk_idx) { - let cur_chunk = cur_chunk.read().unwrap(); - if glob_idx < cur_chunk.offset + cur_chunk.view.len().unwrap_or(0) { - return Some(chunk_idx) - } + fn get_chunk_idx(&self, glob_idx: usize) -> Option { + let mut offset = 0; + for (chunk_idx, chunk) in self.chunks.iter() { + offset += chunk.view.len().unwrap_or(0); + if glob_idx < offset { + return Some(*chunk_idx); } } None