From ca819aeaf4586e16b2b1daa9a0ba09bcb786f947 Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Sat, 22 May 2021 01:33:58 +0200 Subject: [PATCH] port: introduce update hooks to avoid extra async tasks per projection fixes deadlock issues --- nested/src/core/channel.rs | 13 +++- nested/src/core/context.rs | 12 ++- nested/src/core/observer.rs | 51 +++++++++---- nested/src/core/port.rs | 122 +++++++++++++++++++++++------- nested/src/core/view.rs | 2 +- nested/src/grid/offset.rs | 2 +- nested/src/index/map_item.rs | 23 ++++-- nested/src/index/map_key.rs | 31 +++++--- nested/src/index/mod.rs | 21 +++-- nested/src/integer/add.rs | 2 +- nested/src/integer/radix.rs | 2 +- nested/src/leveled_term_view.rs | 2 +- nested/src/projection.rs | 118 +++++++++++++++++++---------- nested/src/sequence/filter.rs | 4 +- nested/src/sequence/flatten.rs | 6 +- nested/src/sequence/map.rs | 4 +- nested/src/sequence/seq2idx.rs | 3 +- nested/src/sequence/vec_buffer.rs | 8 +- nested/src/string_editor.rs | 2 +- nested/src/terminal/compositor.rs | 2 +- nested/src/terminal/terminal.rs | 2 +- 21 files changed, 303 insertions(+), 129 deletions(-) diff --git a/nested/src/core/channel.rs b/nested/src/core/channel.rs index 453f17c..6f86a0e 100644 --- a/nested/src/core/channel.rs +++ b/nested/src/core/channel.rs @@ -86,7 +86,7 @@ pub struct ChannelReceiver(Arc>>); impl ChannelSender where Data::IntoIter: Send + Sync { pub fn send(&self, msg: Data::Item) { - let mut state = self.0.lock().unwrap(); + let mut state = self.0.lock().unwrap(); if state.send_buf.is_none() { state.send_buf = Some(Data::default()); @@ -103,7 +103,7 @@ where Data::IntoIter: Send + Sync { use crate::core::View; impl> Observer for ChannelSender where V::Msg: Clone, Data::IntoIter: Send + Sync { - fn notify(&self, msg: &V::Msg) { + fn notify(&mut self, msg: &V::Msg) { self.send(msg.clone()); } } @@ -131,6 +131,15 @@ impl ChannelReceiver { pub async fn recv(&self) -> Option { ChannelRead(self.0.clone()).await } + + pub fn try_recv(&self) -> Option { + let mut state = self.0.lock().unwrap(); + if let Some(buf) = state.send_buf.take() { + Some(buf) + } else { + None + } + } } struct ChannelRead(Arc>>); diff --git a/nested/src/core/context.rs b/nested/src/core/context.rs index bc02cf3..5131927 100644 --- a/nested/src/core/context.rs +++ b/nested/src/core/context.rs @@ -76,8 +76,8 @@ pub struct Object { } impl Object { - pub fn get_port(&self) -> Option> { - Some(self.repr.read().unwrap().port.clone()?.downcast::().unwrap()) + pub fn get_port(&self) -> Option> where V::Msg: Clone { + Some(self.repr.read().unwrap().port.clone()?.downcast::().ok().unwrap()) } pub fn downcast(&self, dst_type: TypeTerm) -> Option { @@ -201,6 +201,8 @@ impl Object { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> +pub struct TypeLadder(Vec); + #[derive(Clone, Copy, Hash, PartialEq, Eq)] pub enum MorphismMode { /// Isomorphism @@ -310,7 +312,8 @@ impl Context { &self, name: &str, type_ladder: impl Iterator - ) -> Option> { + ) -> Option> + where V::Msg: Clone { self.get_obj(&name.into())? .downcast_ladder( type_ladder.map(|tn| self.type_dict.type_term_from_str(tn).unwrap()) @@ -367,7 +370,8 @@ impl Context { &mut self, name: &str, type_ladder: impl Iterator - ) -> Option> { + ) -> Option> + where V::Msg: Clone { if let Some(p) = self.get_obj_port(name, type_ladder) { Some(p) } else { diff --git a/nested/src/core/observer.rs b/nested/src/core/observer.rs index dfa0afc..38eb8f9 100644 --- a/nested/src/core/observer.rs +++ b/nested/src/core/observer.rs @@ -1,5 +1,10 @@ use { - crate::core::View, + crate::{ + core::{ + View, + channel::{channel, ChannelSender, ChannelReceiver} + } + }, std::{ sync::{Arc, Weak} }, @@ -13,7 +18,7 @@ use { \*/ pub trait Observer : Send + Sync { fn reset(&mut self, _view: Option>) {} - fn notify(&self, msg: &V::Msg); + fn notify(&mut self, msg: &V::Msg); } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> @@ -23,19 +28,19 @@ impl> Observer for Arc> { self.write().unwrap().reset(view); } - fn notify(&self, msg: &V::Msg) { - self.read().unwrap().notify(&msg); + fn notify(&mut self, msg: &V::Msg) { + self.write().unwrap().notify(msg); } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> pub trait ObserverExt : Observer { - fn notify_each(&self, it: impl IntoIterator); + fn notify_each(&mut self, it: impl IntoIterator); } impl> ObserverExt for T { - fn notify_each(&self, it: impl IntoIterator) { + fn notify_each(&mut self, it: impl IntoIterator) { for msg in it { self.notify(&msg); } @@ -47,14 +52,19 @@ impl> ObserverExt for T { Broadcast <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ - -pub struct ObserverBroadcast { +pub struct ObserverBroadcast +where V::Msg : Send + Sync { + rx: ChannelReceiver>, + tx: ChannelSender>, observers: Vec>>> } -impl ObserverBroadcast { +impl ObserverBroadcast +where V::Msg : Clone + Send + Sync { pub fn new() -> Self { + let (tx, rx) = channel::>(); ObserverBroadcast { + rx, tx, observers: Vec::new() } } @@ -71,19 +81,28 @@ impl ObserverBroadcast { fn iter(&self) -> impl Iterator>>> + '_ { self.observers.iter().filter_map(|o| o.upgrade()) } + + pub fn update(&self) { + if let Some(msg_vec) = self.rx.try_recv() { + for msg in msg_vec { + for o in self.iter() { + o.write().unwrap().notify(&msg); + } + } + } + } } -impl Observer for ObserverBroadcast { +impl Observer for ObserverBroadcast +where V::Msg: Clone { fn reset(&mut self, view: Option>) { for o in self.iter() { o.write().unwrap().reset(view.clone()); } } - fn notify(&self, msg: &V::Msg) { - for o in self.iter() { - o.read().unwrap().notify(&msg); - } + fn notify(&mut self, msg: &V::Msg) { + self.tx.send(msg.clone()); } } @@ -110,7 +129,7 @@ where V: View + ?Sized, impl Observer for NotifyFnObserver where V: View + ?Sized, F: Fn(&V::Msg) + Send + Sync { - fn notify(&self, msg: &V::Msg) { + fn notify(&mut self, msg: &V::Msg) { (self.f)(msg); } } @@ -138,7 +157,7 @@ where V: View + ?Sized, impl Observer for ResetFnObserver where V: View + ?Sized, F: Fn(Option>) + Send + Sync { - fn notify(&self, _msg: &V::Msg) {} + fn notify(&mut self, _msg: &V::Msg) {} fn reset(&mut self, view: Option>) { (self.f)(view); } diff --git a/nested/src/core/port.rs b/nested/src/core/port.rs index 9e871aa..e36db56 100644 --- a/nested/src/core/port.rs +++ b/nested/src/core/port.rs @@ -11,6 +11,11 @@ use { } }; + +pub trait UpdateTask : Send + Sync { + fn update(&self); +} + /*\ <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> View Port @@ -18,14 +23,17 @@ use { \*/ pub struct ViewPort { view: Arc>>>, - cast: Arc>> + cast: Arc>>, + pub update_hooks: Arc>>> } -impl ViewPort { +impl ViewPort +where V::Msg: Clone { pub fn new() -> Self { ViewPort { view: Arc::new(RwLock::new(None)), - cast: Arc::new(RwLock::new(ObserverBroadcast::new())) + cast: Arc::new(RwLock::new(ObserverBroadcast::new())), + update_hooks: Arc::new(RwLock::new(Vec::new())) } } @@ -45,40 +53,86 @@ impl ViewPort { observer.write().unwrap().reset(self.view.read().unwrap().clone()); } + pub fn add_update_hook(&self, hook_cast: Arc) { + self.update_hooks.write().unwrap().push(hook_cast); + } + pub fn inner(&self) -> InnerViewPort { - InnerViewPort(ViewPort{ view: self.view.clone(), cast: self.cast.clone() }) + InnerViewPort( + ViewPort{ + view: self.view.clone(), + cast: self.cast.clone(), + update_hooks: self.update_hooks.clone() + } + ) } pub fn outer(&self) -> OuterViewPort { - OuterViewPort(ViewPort{ view: self.view.clone(), cast: self.cast.clone() }) + OuterViewPort( + ViewPort{ + view: self.view.clone(), + cast: self.cast.clone(), + update_hooks: self.update_hooks.clone() + } + ) } pub fn into_inner(self) -> InnerViewPort { - InnerViewPort(ViewPort{ view: self.view, cast: self.cast }) + InnerViewPort( + ViewPort{ + view: self.view, + cast: self.cast, + update_hooks: self.update_hooks + } + ) } pub fn into_outer(self) -> OuterViewPort { - OuterViewPort(ViewPort{ view: self.view, cast: self.cast }) + OuterViewPort( + ViewPort{ + view: self.view, + cast: self.cast, + update_hooks: self.update_hooks + } + ) } } -impl Clone for ViewPort { +impl UpdateTask for ViewPort where V::Msg: Clone + Send + Sync{ + fn update(&self) { + let mut v = { + let t = self.update_hooks.read().unwrap(); + t.iter().cloned().collect::>() + }; + + for hook in v { + hook.update(); + } + + self.cast.read().unwrap().update(); + } +} + + +impl Clone for ViewPort +where V::Msg: Clone { fn clone(&self) -> Self { ViewPort { view: self.view.clone(), - cast: self.cast.clone() + cast: self.cast.clone(), + update_hooks: self.update_hooks.clone() } } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub struct InnerViewPort(ViewPort); -pub struct OuterViewPort(ViewPort); +pub struct InnerViewPort(pub ViewPort) where V::Msg: Clone; +pub struct OuterViewPort(pub ViewPort) where V::Msg: Clone; //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl InnerViewPort { +impl InnerViewPort where V::Msg: Clone { pub fn get_broadcast(&self) -> Arc>> { self.0.cast.clone() } @@ -93,11 +147,12 @@ impl InnerViewPort { } pub fn notify(&self, msg: &V::Msg) { - self.0.cast.read().unwrap().notify(msg); + self.0.cast.write().unwrap().notify(msg); } } -impl Clone for InnerViewPort { +impl Clone for InnerViewPort +where V::Msg: Clone { fn clone(&self) -> Self { InnerViewPort(self.0.clone()) } @@ -105,7 +160,8 @@ impl Clone for InnerViewPort { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl OuterViewPort { +impl OuterViewPort +where V::Msg: Clone { pub fn get_view(&self) -> Option> { self.0.view.read().unwrap().clone() } @@ -132,7 +188,8 @@ impl OuterViewPort { } } -impl Clone for OuterViewPort { +impl Clone for OuterViewPort +where V::Msg: Clone { fn clone(&self) -> Self { OuterViewPort(self.0.clone()) } @@ -160,19 +217,21 @@ where V::Msg: Clone { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AnyViewPort { view: Arc, - cast: Arc + cast: Arc, + update_hooks: Arc>>> } impl AnyViewPort { pub fn downcast(self) -> Result, AnyViewPort> { match ( self.view.clone().downcast::>>>(), - self.cast.clone().downcast::>>() + self.cast.clone().downcast::>>(), + self.update_hooks.clone() ) { - (Ok(view), Ok(cast)) => Ok(ViewPort{view, cast}), + (Ok(view), Ok(cast), update_hooks) => Ok(ViewPort{view, cast, update_hooks}), _ => Err(self) } } @@ -182,45 +241,50 @@ impl From> for AnyViewPort { fn from(port: ViewPort) -> Self { AnyViewPort { view: port.view as Arc, - cast: port.cast as Arc + cast: port.cast as Arc, + update_hooks: port.update_hooks } } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AnyOuterViewPort(AnyViewPort); -#[derive(Debug, Clone)] +#[derive(Clone)] pub struct AnyInnerViewPort(AnyViewPort); impl AnyOuterViewPort { - pub fn downcast(self) -> Result, AnyViewPort> { + pub fn downcast(self) -> Result, AnyViewPort> where V::Msg: Clone { Ok(OuterViewPort(self.0.downcast::()?)) } } -impl From> for AnyOuterViewPort { +impl From> for AnyOuterViewPort +where V::Msg: Clone { fn from(port: OuterViewPort) -> Self { AnyOuterViewPort(AnyViewPort{ view: port.0.view as Arc, - cast: port.0.cast as Arc + cast: port.0.cast as Arc, + update_hooks: port.0.update_hooks }) } } impl AnyInnerViewPort { - pub fn downcast(self) -> Result, AnyViewPort> { + pub fn downcast(self) -> Result, AnyViewPort> where V::Msg: Clone { Ok(InnerViewPort(self.0.downcast::()?)) } } -impl From> for AnyInnerViewPort { +impl From> for AnyInnerViewPort +where V::Msg: Clone { fn from(port: InnerViewPort) -> Self { AnyInnerViewPort(AnyViewPort{ view: port.0.view as Arc, - cast: port.0.cast as Arc + cast: port.0.cast as Arc, + update_hooks: port.0.update_hooks }) } } diff --git a/nested/src/core/view.rs b/nested/src/core/view.rs index 1b5a33c..b3da6e6 100644 --- a/nested/src/core/view.rs +++ b/nested/src/core/view.rs @@ -6,7 +6,7 @@ \*/ pub trait View : Send + Sync { /// Notification message for the observers - type Msg; + type Msg : Send + Sync; } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> diff --git a/nested/src/grid/offset.rs b/nested/src/grid/offset.rs index 1b406ad..77d9ab1 100644 --- a/nested/src/grid/offset.rs +++ b/nested/src/grid/offset.rs @@ -77,7 +77,7 @@ impl Observer for GridOffset { if let Some(area) = new_area { self.cast.notify_each(area); } } - fn notify(&self, msg: &Point2) { + fn notify(&mut self, msg: &Point2) { self.cast.notify(&(msg + self.offset)); } } diff --git a/nested/src/index/map_item.rs b/nested/src/index/map_item.rs index 6cee0ee..f12f2c6 100644 --- a/nested/src/index/map_item.rs +++ b/nested/src/index/map_item.rs @@ -18,7 +18,10 @@ pub use { } }; -impl OuterViewPort> { +impl OuterViewPort> +where Key: Clone + Send + Sync + 'static, + Item: Send + Sync + 'static +{ pub fn map_item< DstItem: 'static, F: Fn(&Key, &Item) -> DstItem + Send + Sync + 'static @@ -27,6 +30,8 @@ impl OuterViewPort> f: F ) -> OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); + let map = MapIndexItem::new(port.inner(), f); self.add_observer(map.clone()); port.into_outer() @@ -34,7 +39,8 @@ impl OuterViewPort> } pub struct MapIndexItem -where SrcView: IndexView + ?Sized, +where Key: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync { src_view: Option>, @@ -43,7 +49,7 @@ where SrcView: IndexView + ?Sized, } impl MapIndexItem -where Key: 'static, +where Key: Clone + Send + Sync + 'static, DstItem: 'static, SrcView: IndexView + ?Sized + 'static, F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync + 'static @@ -66,14 +72,16 @@ where Key: 'static, } impl View for MapIndexItem -where SrcView: IndexView + ?Sized, +where Key: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync { type Msg = Key; } impl IndexView for MapIndexItem -where SrcView: IndexView + ?Sized, +where Key: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync { type Item = DstItem; @@ -88,7 +96,8 @@ where SrcView: IndexView + ?Sized, } impl Observer for MapIndexItem -where SrcView: IndexView + ?Sized, +where Key: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync { fn reset(&mut self, view: Option>) { @@ -100,7 +109,7 @@ where SrcView: IndexView + ?Sized, if let Some(area) = new_area { self.cast.notify_each(area); } } - fn notify(&self, msg: &Key) { + fn notify(&mut self, msg: &Key) { self.cast.notify(msg); } } diff --git a/nested/src/index/map_key.rs b/nested/src/index/map_key.rs index b2d450e..19e6916 100644 --- a/nested/src/index/map_key.rs +++ b/nested/src/index/map_key.rs @@ -18,9 +18,12 @@ pub use { } }; -impl OuterViewPort> { +impl OuterViewPort> +where SrcKey: Clone + Send + Sync + 'static, + Item: 'static +{ pub fn map_key< - DstKey: 'static, + DstKey: Clone + Send + Sync + 'static, F1: Fn(&SrcKey) -> DstKey + Send + Sync + 'static, F2: Fn(&DstKey) -> Option + Send + Sync + 'static, >( @@ -29,6 +32,8 @@ impl OuterViewPort OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); + let map = MapIndexKey::new(port.inner(), f1, f2); self.add_observer(map.clone()); port.into_outer() @@ -36,7 +41,9 @@ impl OuterViewPort -where SrcView: IndexView + ?Sized, +where DstKey: Clone + Send + Sync, + SrcKey: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F1: Fn(&SrcKey) -> DstKey + Send + Sync, F2: Fn(&DstKey) -> Option + Send + Sync, { @@ -47,8 +54,8 @@ where SrcView: IndexView + ?Sized, } impl MapIndexKey -where DstKey: 'static, - SrcKey: 'static, +where DstKey: Clone + Send + Sync + 'static, + SrcKey: Clone + Send + Sync + 'static, SrcView: IndexView + ?Sized + 'static, SrcView::Item: 'static, F1: Fn(&SrcKey) -> DstKey + Send + Sync + 'static, @@ -74,7 +81,9 @@ where DstKey: 'static, } impl View for MapIndexKey -where SrcView: IndexView + ?Sized, +where DstKey: Clone + Send + Sync, + SrcKey: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F1: Fn(&SrcKey) -> DstKey + Send + Sync, F2: Fn(&DstKey) -> Option + Send + Sync, { @@ -82,7 +91,9 @@ where SrcView: IndexView + ?Sized, } impl IndexView for MapIndexKey -where SrcView: IndexView + ?Sized, +where DstKey: Clone + Send + Sync, + SrcKey: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F1: Fn(&SrcKey) -> DstKey + Send + Sync, F2: Fn(&DstKey) -> Option + Send + Sync, { @@ -98,7 +109,9 @@ where SrcView: IndexView + ?Sized, } impl Observer for MapIndexKey -where SrcView: IndexView + ?Sized, +where DstKey: Clone + Send + Sync, + SrcKey: Clone + Send + Sync, + SrcView: IndexView + ?Sized, F1: Fn(&SrcKey) -> DstKey + Send + Sync, F2: Fn(&DstKey) -> Option + Send + Sync, { @@ -111,7 +124,7 @@ where SrcView: IndexView + ?Sized, if let Some(area) = new_area { self.cast.notify_each(area); } } - fn notify(&self, msg: &SrcKey) { + fn notify(&mut self, msg: &SrcKey) { self.cast.notify(&(self.f1)(msg)); } } diff --git a/nested/src/index/mod.rs b/nested/src/index/mod.rs index ddf7a99..51d42cf 100644 --- a/nested/src/index/mod.rs +++ b/nested/src/index/mod.rs @@ -13,7 +13,8 @@ use { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub trait IndexView : View { +pub trait IndexView : View +where Key: Send + Sync { type Item; fn get(&self, key: &Key) -> Option; @@ -26,7 +27,10 @@ pub trait IndexView : View { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl + ?Sized> IndexView for RwLock { +impl IndexView for RwLock +where Key: Send + Sync, + V: IndexView + ?Sized +{ type Item = V::Item; fn get(&self, key: &Key) -> Option { @@ -38,7 +42,11 @@ impl + ?Sized> IndexView for RwLock { } } -impl + ?Sized> IndexView for Arc { +impl IndexView for Arc +where Key: Send + Sync, + V: IndexView + ?Sized +{ + type Item = V::Item; fn get(&self, key: &Key) -> Option { @@ -50,7 +58,10 @@ impl + ?Sized> IndexView for Arc { } } -impl> IndexView for Option { +impl IndexView for Option +where Key: Send + Sync, + V: IndexView +{ type Item = V::Item; fn get(&self, key: &Key) -> Option { @@ -69,7 +80,7 @@ impl> IndexView for Option { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> pub trait ImplIndexView : Send + Sync { - type Key; + type Key : Send + Sync; type Value; fn get(&self, key: &Self::Key) -> Option; diff --git a/nested/src/integer/add.rs b/nested/src/integer/add.rs index e093577..49a87bf 100644 --- a/nested/src/integer/add.rs +++ b/nested/src/integer/add.rs @@ -53,7 +53,7 @@ impl Add { b: OuterViewPort>, c: InnerViewPort>>//> ) -> Arc> { - let mut proj_helper = ProjectionHelper::new(); + let mut proj_helper = ProjectionHelper::new(c.0.update_hooks.clone()); let add = Arc::new(RwLock::new( Add { radix, diff --git a/nested/src/integer/radix.rs b/nested/src/integer/radix.rs index 9f3169c..2176eb0 100644 --- a/nested/src/integer/radix.rs +++ b/nested/src/integer/radix.rs @@ -82,7 +82,7 @@ impl Observer> for RadixProjection { self.src_digits = view; } - fn notify(&self, _idx: &usize) { + fn notify(&mut self, _idx: &usize) { // todo: // src digit i changed. // which dst-digits does it affect? diff --git a/nested/src/leveled_term_view.rs b/nested/src/leveled_term_view.rs index a4d8ff4..1f6b387 100644 --- a/nested/src/leveled_term_view.rs +++ b/nested/src/leveled_term_view.rs @@ -35,7 +35,7 @@ impl LeveledTermView { src_port: OuterViewPort, dst_port: InnerViewPort ) -> Arc> { - let mut proj_helper = ProjectionHelper::new(); + let mut proj_helper = ProjectionHelper::new(dst_port.0.update_hooks.clone()); let v = Arc::new(RwLock::new( LeveledTermView { diff --git a/nested/src/projection.rs b/nested/src/projection.rs index 7d1e40f..f33b9f9 100644 --- a/nested/src/projection.rs +++ b/nested/src/projection.rs @@ -2,21 +2,19 @@ use { std::{ cmp::{max}, any::Any, - sync::{Arc, Weak}, - }, - async_std::{ - stream::StreamExt + sync::{Arc, Weak} }, std::sync::RwLock, crate::{ core::{ View, Observer, ObserverExt, + port::UpdateTask, OuterViewPort, channel::{ - channel, + ChannelSender, ChannelReceiver, ChannelData, - ChannelSender + channel } }, singleton::{SingletonView}, @@ -29,14 +27,16 @@ use { pub struct ProjectionHelper { keepalive: Vec>, - proj: Arc>>> + proj: Arc>>>, + update_hooks: Arc>>> } impl ProjectionHelper

{ - pub fn new() -> Self { + pub fn new(update_hooks: Arc>>>) -> Self { ProjectionHelper { keepalive: Vec::new(), - proj: Arc::new(RwLock::new(Weak::new())) + proj: Arc::new(RwLock::new(Weak::new())), + update_hooks } } @@ -52,6 +52,7 @@ impl ProjectionHelper

{ port: OuterViewPort>, notify: impl Fn(&mut P, &()) + Send + Sync + 'static ) -> Arc>>>> { + self.update_hooks.write().unwrap().push(Arc::new(port.0.clone())); port.add_observer(self.new_arg(notify)); port.get_view_arc() } @@ -61,6 +62,7 @@ impl ProjectionHelper

{ port: OuterViewPort>, notify: impl Fn(&mut P, &usize) + Send + Sync + 'static ) -> Arc>>>> { + self.update_hooks.write().unwrap().push(Arc::new(port.0.clone())); port.add_observer(self.new_arg(notify)); port.get_view_arc() } @@ -70,7 +72,10 @@ impl ProjectionHelper

{ port: OuterViewPort>, notify: impl Fn(&mut P, &Key) + Send + Sync + 'static ) -> Arc>>>> { - port.add_observer(self.new_arg(notify)); + self.update_hooks.write().unwrap().push(Arc::new(port.0.clone())); + + let arg = self.new_arg(notify); + port.add_observer(arg); port.get_view_arc() } @@ -79,46 +84,76 @@ impl ProjectionHelper

{ >( &mut self, notify: impl Fn(&mut P, &V::Msg) + Send + Sync + 'static - ) -> Arc>>> - where V::Msg: Send + Sync { - let (tx, mut rx) = channel::>(); - + ) -> Arc>>> + where V::Msg: Send + Sync + { + let (tx, rx) = channel::>(); let arg = Arc::new(RwLock::new( ProjectionArg { src: None, - sender: tx + notify: Box::new(notify), + proj: self.proj.clone(), + rx, tx })); - let proj = self.proj.clone(); - async_std::task::spawn(async move { - while let Some(msg) = rx.next().await { - if let Some(proj) = proj.read().unwrap().upgrade() { - notify(&mut *proj.write().unwrap(), &msg); - } - } - }); - self.keepalive.push(arg.clone()); + self.update_hooks.write().unwrap().push(arg.clone()); arg } } /// Special Observer which can access the state of the projection on notify -/// also handles the reset() and default behaviour of unitinitalized inputs -pub struct ProjectionArg -where V: View + ?Sized, +/// also handles the reset() +pub struct ProjectionArg +where P: Send + Sync + 'static, + V: View + ?Sized, D: ChannelData, D::IntoIter: Send + Sync { src: Option>, - sender: ChannelSender + notify: Box, + proj: Arc>>>, + rx: ChannelReceiver, + tx: ChannelSender +} + +impl UpdateTask for ProjectionArg +where P: Send + Sync + 'static, + V: View + ?Sized, + D: ChannelData, + D::IntoIter: Send + Sync +{ + fn update(&self) { + if let Some(p) = self.proj.read().unwrap().upgrade() { + if let Some(data) = self.rx.try_recv() { + for msg in data { + (self.notify)( + &mut *p.write().unwrap(), + &msg + ); + } + } + } + } +} + +impl UpdateTask for RwLock> +where P: Send + Sync + 'static, + V: View + ?Sized, + D: ChannelData, + D::IntoIter: Send + Sync +{ + fn update(&self) { + self.read().unwrap().update(); + } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl Observer> for ProjectionArg, D> -where D: ChannelData, +impl Observer> for ProjectionArg, D> +where P: Send + Sync + 'static, + D: ChannelData, D::IntoIter: Send + Sync { fn reset(&mut self, new_src: Option>>) { @@ -126,15 +161,16 @@ where D: ChannelData, self.notify(&()); } - fn notify(&self, msg: &()) { - self.sender.send(*msg); + fn notify(&mut self, msg: &()) { + self.tx.send(msg.clone()); } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl Observer> for ProjectionArg, D> -where D: ChannelData, +impl Observer> for ProjectionArg, D> +where P: Send + Sync + 'static, + D: ChannelData, D::IntoIter: Send + Sync { fn reset(&mut self, new_src: Option>>) { @@ -145,15 +181,17 @@ where D: ChannelData, self.notify_each(0 .. max(old_len, new_len)); } - fn notify(&self, msg: &usize) { - self.sender.send(*msg); + fn notify(&mut self, msg: &usize) { + self.tx.send(*msg); } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl Observer> for ProjectionArg, D> -where D: ChannelData, +impl Observer> for ProjectionArg, D> +where P: Send + Sync + 'static, + Key: Clone + Send + Sync, + D: ChannelData, D::IntoIter: Send + Sync { fn reset(&mut self, new_src: Option>>) { @@ -165,8 +203,8 @@ where D: ChannelData, if let Some(area) = new_area { self.notify_each(area); } } - fn notify(&self, msg: &Key) { - self.sender.send(msg.clone()); + fn notify(&mut self, msg: &Key) { + self.tx.send(msg.clone()); } } diff --git a/nested/src/sequence/filter.rs b/nested/src/sequence/filter.rs index 0cfba42..77b3938 100644 --- a/nested/src/sequence/filter.rs +++ b/nested/src/sequence/filter.rs @@ -18,6 +18,8 @@ impl OuterViewPort P: Fn(&V::Item) -> bool + Send + Sync + 'static >(&self, pred: P) -> OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); + let filter = Arc::new(RwLock::new( Filter { src_view: None, @@ -115,7 +117,7 @@ where SrcView: SequenceView + ?Sized + 'static, if let Some(len) = new_len { self.cast.notify_each(0 .. len); } } - fn notify(&self, idx: &usize) { + fn notify(&mut self, idx: &usize) { let l = self.len().unwrap_or(0)+1; let np = if let Some(x) = self.src_view.get(idx) { diff --git a/nested/src/sequence/flatten.rs b/nested/src/sequence/flatten.rs index 30fc68b..4fcedc8 100644 --- a/nested/src/sequence/flatten.rs +++ b/nested/src/sequence/flatten.rs @@ -65,7 +65,7 @@ where V1: SequenceView> + ?Sized + 'static, self.notify_each(0 .. std::cmp::max(old_len, new_len)); } - fn notify(&self, chunk_idx: &usize) { + fn notify(&mut self, chunk_idx: &usize) { self.sender.send(*chunk_idx); } } @@ -81,7 +81,7 @@ where V2: SequenceView + ?Sized + 'static self.notify_each(0 .. std::cmp::max(old_len, new_len)); } - fn notify(&self, idx: &usize) { + fn notify(&mut self, idx: &usize) { self.sender.send(*idx); } } @@ -139,7 +139,7 @@ where V1: SequenceView> + ?Sized + 'static, while let Some(chunk_idx) = recv.next().await { if let Some(mut chunk_rcv) = f.write().unwrap().update_chunk(chunk_idx) { let f = f.clone(); - let cast = cast.clone(); + let mut cast = cast.clone(); async_std::task::spawn(async move { while let Some(idx) = chunk_rcv.next().await { let mut flat = f.write().unwrap(); diff --git a/nested/src/sequence/map.rs b/nested/src/sequence/map.rs index 79eb93b..a537fd7 100644 --- a/nested/src/sequence/map.rs +++ b/nested/src/sequence/map.rs @@ -21,6 +21,8 @@ impl OuterViewPort> { f: F ) -> OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); + let map = Arc::new(RwLock::new(MapSequenceItem { src_view: None, f, @@ -83,7 +85,7 @@ where SrcView: SequenceView + ?Sized, if let Some(len) = new_len { self.cast.notify_each(0 .. len ); } } - fn notify(&self, msg: &usize) { + fn notify(&mut self, msg: &usize) { self.cast.notify(msg); } } diff --git a/nested/src/sequence/seq2idx.rs b/nested/src/sequence/seq2idx.rs index 0967e4e..79cf003 100644 --- a/nested/src/sequence/seq2idx.rs +++ b/nested/src/sequence/seq2idx.rs @@ -39,6 +39,7 @@ where SrcView: SequenceView + ?Sized + 'static { impl OuterViewPort> { pub fn to_index(&self) -> OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); self.add_observer(Sequence2Index::new(port.inner())); port.into_outer() } @@ -73,7 +74,7 @@ where SrcView: SequenceView + ?Sized + 'static { if let Some(area) = new_area { self.cast.notify_each(area); } } - fn notify(&self, msg: &usize) { + fn notify(&mut self, msg: &usize) { self.cast.notify(msg); } } diff --git a/nested/src/sequence/vec_buffer.rs b/nested/src/sequence/vec_buffer.rs index 371fcb8..4fba5f9 100644 --- a/nested/src/sequence/vec_buffer.rs +++ b/nested/src/sequence/vec_buffer.rs @@ -61,6 +61,8 @@ impl OuterViewPort>> where T: Clone + Send + Sync + 'static { pub fn to_sequence(&self) -> OuterViewPort> { let port = ViewPort::new(); + port.add_update_hook(Arc::new(self.0.clone())); + let vec_seq = VecSequence::new(port.inner()); self.add_observer(vec_seq.clone()); port.into_outer() @@ -114,7 +116,7 @@ where T: Clone + Serialize + Send + Sync + 'static, out.flush().expect(""); } - fn notify(&self, diff: &VecDiff) { + fn notify(&mut self, diff: &VecDiff) { let mut out = self.out.write().unwrap(); out.write(&bincode::serialized_size(diff).unwrap().to_le_bytes()).expect(""); out.write(&bincode::serialize(diff).unwrap()).expect(""); @@ -142,7 +144,7 @@ where T: Clone + Serialize + Send + Sync + 'static, self.out.write().unwrap().flush().expect(""); } - fn notify(&self, diff: &VecDiff) { + fn notify(&mut self, diff: &VecDiff) { self.out.write().unwrap().write(serde_json::to_string(diff).unwrap().as_bytes()).expect(""); self.out.write().unwrap().write(b"\n").expect(""); self.out.write().unwrap().flush().expect(""); @@ -184,7 +186,7 @@ where T: Clone + Send + Sync + 'static { self.cast.notify_each(0 .. std::cmp::max(old_len, new_len)); } - fn notify(&self, diff: &VecDiff) { + fn notify(&mut self, diff: &VecDiff) { match diff { VecDiff::Clear => { let l = { diff --git a/nested/src/string_editor.rs b/nested/src/string_editor.rs index 3e610aa..064671e 100644 --- a/nested/src/string_editor.rs +++ b/nested/src/string_editor.rs @@ -166,7 +166,7 @@ pub mod insert_view { data_port: OuterViewPort>, out_port: InnerViewPort ) -> Arc> { - let mut proj_helper = ProjectionHelper::new(); + let mut proj_helper = ProjectionHelper::new(out_port.0.update_hooks.clone()); let proj = Arc::new(RwLock::new( StringInsertView { diff --git a/nested/src/terminal/compositor.rs b/nested/src/terminal/compositor.rs index 0efb950..c2d64b3 100644 --- a/nested/src/terminal/compositor.rs +++ b/nested/src/terminal/compositor.rs @@ -30,7 +30,7 @@ impl TerminalCompositor { TerminalCompositor { layers: Vec::new(), cast: port.get_broadcast(), - proj_helper: ProjectionHelper::new() + proj_helper: ProjectionHelper::new(port.0.update_hooks.clone()) } )); diff --git a/nested/src/terminal/terminal.rs b/nested/src/terminal/terminal.rs index eec519b..c7b1d91 100644 --- a/nested/src/terminal/terminal.rs +++ b/nested/src/terminal/terminal.rs @@ -135,7 +135,7 @@ impl Observer for TermOutObserver { } } - fn notify(&self, pos: &Point2) { + fn notify(&mut self, pos: &Point2) { self.dirty_pos_tx.send(*pos); } }