refactor sequence flatten() to use ProjectionHelper

This commit is contained in:
Michael Sippel 2021-06-05 23:01:32 +02:00
parent df91db7774
commit 6e3abbde41
Signed by: senvas
GPG key ID: F96CF119C34B64A6
2 changed files with 99 additions and 175 deletions

View file

@ -74,9 +74,7 @@ impl<P: Send + Sync + 'static> ProjectionHelper<P> {
notify: impl Fn(&mut P, &Key) + Send + Sync + 'static notify: impl Fn(&mut P, &Key) + Send + Sync + 'static
) -> Arc<RwLock<Option<Arc<dyn IndexView<Key, Item = Item>>>>> { ) -> Arc<RwLock<Option<Arc<dyn IndexView<Key, Item = Item>>>>> {
self.update_hooks.write().unwrap().push(Arc::new(port.0.clone())); self.update_hooks.write().unwrap().push(Arc::new(port.0.clone()));
port.add_observer(self.new_arg(notify, set_channel()));
let arg = self.new_arg(notify, set_channel());
port.add_observer(arg);
port.get_view_arc() port.get_view_arc()
} }

View file

@ -2,105 +2,61 @@ use {
async_std::stream::StreamExt, async_std::stream::StreamExt,
std::{ std::{
sync::{Arc}, sync::{Arc},
collections::{HashMap, HashSet} collections::{BTreeMap, HashSet}
}, },
std::sync::RwLock, std::sync::RwLock,
crate::{ crate::{
core::{ core::{
View, Observer, ObserverBroadcast, ObserverExt, View, Observer, ObserverBroadcast, ObserverExt,
ViewPort, InnerViewPort, OuterViewPort, ViewPort, InnerViewPort, OuterViewPort,
channel::{ChannelSender, ChannelReceiver} channel::{ChannelSender, ChannelReceiver},
port::UpdateTask
}, },
sequence::SequenceView sequence::SequenceView,
projection::ProjectionHelper
} }
}; };
impl<V1, V2> OuterViewPort<V1> impl<Item> OuterViewPort<dyn SequenceView<Item = OuterViewPort<dyn SequenceView<Item = Item>>>>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static, where Item: 'static{
V2: SequenceView + ?Sized + 'static pub fn flatten(&self) -> OuterViewPort<dyn SequenceView<Item = Item>> {
{
pub fn flatten(&self) -> OuterViewPort<dyn SequenceView<Item = V2::Item>> {
let port = ViewPort::new(); let port = ViewPort::new();
Flatten::new(self.clone(), port.inner()); Flatten::new(self.clone(), port.inner());
port.into_outer() port.into_outer()
} }
} }
pub struct Flatten<V1, V2> pub struct Chunk<Item>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static, where Item: 'static
V2: SequenceView + ?Sized + 'static
{
length: usize,
top: Arc<RwLock<TopObserver<V1, V2>>>,
chunks: HashMap<usize, Arc<RwLock<BotObserver<V2>>>>,
_cast: Arc<RwLock<ObserverBroadcast<dyn SequenceView<Item = V2::Item>>>>
}
struct TopObserver<V1, V2>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
V2: SequenceView + ?Sized + 'static
{
view: Option<Arc<V1>>,
sender: ChannelSender<HashSet<usize>>
}
struct BotObserver<V2>
where V2: SequenceView + ?Sized + 'static
{ {
offset: usize, offset: usize,
view: Option<Arc<V2>>, len: usize,
sender: ChannelSender<HashSet<usize>> view: Arc<dyn SequenceView<Item = Item>>
} }
impl<V1, V2> Observer<V1> for TopObserver<V1, V2> pub struct Flatten<Item>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static, where Item: 'static
V2: SequenceView + ?Sized + 'static
{ {
fn reset(&mut self, view: Option<Arc<V1>>) { length: usize,
let old_len = self.view.len().unwrap_or(0); top: Arc<dyn SequenceView<Item = OuterViewPort<dyn SequenceView<Item = Item>>>>,
self.view = view; chunks: BTreeMap<usize, Chunk<Item>>,
let new_len = self.view.len().unwrap_or(0); cast: Arc<RwLock<ObserverBroadcast<dyn SequenceView<Item = Item>>>>,
proj_helper: ProjectionHelper<Self>
self.notify_each(0 .. std::cmp::max(old_len, new_len));
}
fn notify(&mut self, chunk_idx: &usize) {
self.sender.send(*chunk_idx);
}
} }
impl<V2> Observer<V2> for BotObserver<V2> impl<Item> View for Flatten<Item>
where V2: SequenceView + ?Sized + 'static where Item: 'static
{
fn reset(&mut self, src: Option<Arc<V2>>) {
let old_len = self.view.len().unwrap_or(0);
self.view = src;
let new_len = self.view.len().unwrap_or(0);
self.notify_each(0 .. std::cmp::max(old_len, new_len));
}
fn notify(&mut self, idx: &usize) {
self.sender.send(*idx);
}
}
impl<V1, V2> View for Flatten<V1, V2>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
V2: SequenceView + ?Sized + 'static
{ {
type Msg = usize; type Msg = usize;
} }
impl<V1, V2> SequenceView for Flatten<V1, V2> impl<Item> SequenceView for Flatten<Item>
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static, where Item: 'static
V2: SequenceView + ?Sized + 'static
{ {
type Item = V2::Item; type Item = Item;
fn get(&self, idx: &usize) -> Option<Self::Item> { fn get(&self, idx: &usize) -> Option<Self::Item> {
let chunk = self.chunks[&self.get_chunk_idx(*idx)?].read().unwrap(); let chunk = self.chunks.get(&self.get_chunk_idx(*idx)?)?;
chunk.view.get(&(*idx - chunk.offset)) chunk.view.get(&(*idx - chunk.offset))
} }
@ -109,127 +65,98 @@ where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
} }
} }
impl<V1, V2> Flatten<V1, V2> /* TODO: remove unused projection args (bot-views) if they get replaced by a new viewport */
where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static, impl<Item> Flatten<Item>
V2: SequenceView + ?Sized + 'static where Item: 'static
{ {
pub fn new( pub fn new(
top_port: OuterViewPort<V1>, top_port: OuterViewPort<dyn SequenceView<Item = OuterViewPort<dyn SequenceView<Item = Item>>>>,
out_port: InnerViewPort<dyn SequenceView<Item = V2::Item>> out_port: InnerViewPort<dyn SequenceView<Item = Item>>
) -> Arc<RwLock<Self>> { ) -> Arc<RwLock<Self>> {
let (sender, mut recv) = crate::core::channel::set_channel(); let mut proj_helper = ProjectionHelper::new(out_port.0.update_hooks.clone());
let top_obs = Arc::new(RwLock::new( let flat = Arc::new(RwLock::new(
TopObserver { Flatten {
view: None,
sender
}
));
let flat = Arc::new(RwLock::new(Flatten::<V1, V2> {
length: 0, length: 0,
top: top_obs.clone(), top: proj_helper.new_sequence_arg(
chunks: HashMap::new(), top_port,
_cast: out_port.get_broadcast() |s: &mut Self, chunk_idx| {
s.update_chunk(*chunk_idx);
}
),
chunks: BTreeMap::new(),
cast: out_port.get_broadcast(),
proj_helper
})); }));
let f = flat.clone(); flat.write().unwrap().proj_helper.set_proj(&flat);
let cast = out_port.get_broadcast();
async_std::task::spawn(async move {
while let Some(chunk_idx) = recv.next().await {
if let Some(mut chunk_rcv) = f.write().unwrap().update_chunk(chunk_idx) {
let f = f.clone();
let mut cast = cast.clone();
async_std::task::spawn(async move {
while let Some(idx) = chunk_rcv.next().await {
let mut flat = f.write().unwrap();
let chunk = flat.chunks[&chunk_idx].read().unwrap();
let chunk_offset = chunk.offset;
let chunk_len = chunk.view.len().unwrap_or(0);
drop(chunk);
let mut dirty_idx = Vec::new();
if idx+1 >= chunk_len {
dirty_idx = flat.update_offsets(chunk_idx);
}
drop(flat);
cast.notify(&(idx + chunk_offset));
cast.notify_each(dirty_idx);
}
});
}
}
});
top_port.add_observer(top_obs);
out_port.set_view(Some(flat.clone())); out_port.set_view(Some(flat.clone()));
flat flat
} }
/// the top-sequence has changed the item at chunk_idx, /// the top-sequence has changed the item at chunk_idx,
/// create a new observer for the contained sub sequence /// create a new observer for the contained sub sequence
fn update_chunk(&mut self, chunk_idx: usize) -> Option<ChannelReceiver<HashSet<usize>>> { fn update_chunk(&mut self, chunk_idx: usize) {
if let Some(chunk_port) = self.top.read().unwrap().view.get(&chunk_idx) { if let Some(chunk_port) = self.top.get(&chunk_idx) {
let (sender, recv) = crate::core::channel::set_channel(); self.chunks.insert(
let chunk_obs = Arc::new(RwLock::new( chunk_idx,
BotObserver { Chunk {
offset: offset: 0, // will be adjusted by update_offsets() later
if chunk_idx > 0 { len: 0,
if let Some(prev_chunk) = self.chunks.get(&(chunk_idx-1)) { view: self.proj_helper.new_sequence_arg(
let prev_chunk = prev_chunk.read().unwrap(); chunk_port.clone(),
prev_chunk.offset + prev_chunk.view.len().unwrap_or(0) move |s: &mut Self, idx| {
} else { if let Some(chunk) = s.chunks.get(&chunk_idx) {
0 let chunk_offset = chunk.offset;
} let chunk_len = chunk.view.len().unwrap_or(0);
} else {
0
},
view: None,
sender
}
));
self.chunks.insert(chunk_idx, chunk_obs.clone()); let mut dirty_idx = Vec::new();
chunk_port.add_observer(chunk_obs); if chunk.len != chunk_len {
dirty_idx = s.update_all_offsets();
}
Some(recv) s.cast.notify(&(idx + chunk_offset));
s.cast.notify_each(dirty_idx);
}
}
)
}
);
chunk_port.0.update();
let dirty_idx = self.update_all_offsets();
self.cast.notify_each(dirty_idx);
} else { } else {
// todo:
//self.proj_helper.remove_arg();
self.chunks.remove(&chunk_idx); self.chunks.remove(&chunk_idx);
None
let dirty_idx = self.update_all_offsets();
self.cast.notify_each(dirty_idx);
} }
} }
/// recalculate all chunk offsets beginning at start_idx /// recalculate all chunk offsets beginning at start_idx
/// and update length of flattened sequence /// and update length of flattened sequence
fn update_offsets(&mut self, start_idx: usize) -> Vec<usize> { fn update_all_offsets(&mut self) -> Vec<usize> {
let top_len = self.top.read().unwrap().view.len().unwrap_or(0);
let first_chunk = self.chunks.get(&start_idx).unwrap().read().unwrap();
let start_offset = first_chunk.offset + first_chunk.view.len().unwrap_or(0);
let mut cur_offset = start_offset;
let mut dirty_idx = Vec::new(); let mut dirty_idx = Vec::new();
let mut cur_offset = 0;
for chunk_idx in start_idx+1 .. top_len { for (chunk_idx, chunk) in self.chunks.iter_mut() {
if let Some(cur_chunk) = self.chunks.get(&chunk_idx) { let old_offset = chunk.offset;
let mut cur_chunk = cur_chunk.write().unwrap(); chunk.offset = cur_offset;
chunk.len = chunk.view.len().unwrap_or(0);
let chunk_len = cur_chunk.view.len().unwrap_or(0);
let old_offset = cur_chunk.offset;
cur_chunk.offset = cur_offset;
if old_offset != cur_offset { if old_offset != cur_offset {
dirty_idx.extend( dirty_idx.extend(
std::cmp::min(old_offset, cur_offset) std::cmp::min(old_offset, cur_offset)
.. std::cmp::max(old_offset, cur_offset) + chunk_len .. std::cmp::max(old_offset, cur_offset) + chunk.len
); );
} }
cur_offset += chunk_len;
} cur_offset += chunk.len;
} }
let old_length = self.length; let old_length = self.length;
@ -242,12 +169,11 @@ where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
/// given an index in the flattened sequence, /// given an index in the flattened sequence,
/// which sub-sequence does it belong to? /// which sub-sequence does it belong to?
fn get_chunk_idx(&self, glob_idx: usize) -> Option<usize> { fn get_chunk_idx(&self, glob_idx: usize) -> Option<usize> {
for chunk_idx in 0 .. self.top.read().unwrap().view.len().unwrap_or(0) { let mut offset = 0;
if let Some(cur_chunk) = self.chunks.get(&chunk_idx) { for (chunk_idx, chunk) in self.chunks.iter() {
let cur_chunk = cur_chunk.read().unwrap(); offset += chunk.view.len().unwrap_or(0);
if glob_idx < cur_chunk.offset + cur_chunk.view.len().unwrap_or(0) { if glob_idx < offset {
return Some(chunk_idx) return Some(*chunk_idx);
}
} }
} }
None None