port: introduce update hooks to avoid extra async tasks per projection

fixes deadlock issues
This commit is contained in:
Michael Sippel 2021-05-22 01:33:58 +02:00
parent f8b5da1393
commit ca819aeaf4
Signed by: senvas
GPG key ID: F96CF119C34B64A6
21 changed files with 303 additions and 129 deletions

View file

@ -86,7 +86,7 @@ pub struct ChannelReceiver<Data: ChannelData>(Arc<Mutex<ChannelState<Data>>>);
impl<Data: ChannelData> ChannelSender<Data>
where Data::IntoIter: Send + Sync {
pub fn send(&self, msg: Data::Item) {
let mut state = self.0.lock().unwrap();
let mut state = self.0.lock().unwrap();
if state.send_buf.is_none() {
state.send_buf = Some(Data::default());
@ -103,7 +103,7 @@ where Data::IntoIter: Send + Sync {
use crate::core::View;
impl<V: View + ?Sized, Data: ChannelData<Item = V::Msg>> Observer<V> for ChannelSender<Data>
where V::Msg: Clone, Data::IntoIter: Send + Sync {
fn notify(&self, msg: &V::Msg) {
fn notify(&mut self, msg: &V::Msg) {
self.send(msg.clone());
}
}
@ -131,6 +131,15 @@ impl<Data: ChannelData> ChannelReceiver<Data> {
pub async fn recv(&self) -> Option<Data> {
ChannelRead(self.0.clone()).await
}
pub fn try_recv(&self) -> Option<Data> {
let mut state = self.0.lock().unwrap();
if let Some(buf) = state.send_buf.take() {
Some(buf)
} else {
None
}
}
}
struct ChannelRead<Data: ChannelData>(Arc<Mutex<ChannelState<Data>>>);

View file

@ -76,8 +76,8 @@ pub struct Object {
}
impl Object {
pub fn get_port<V: View + ?Sized + 'static>(&self) -> Option<OuterViewPort<V>> {
Some(self.repr.read().unwrap().port.clone()?.downcast::<V>().unwrap())
pub fn get_port<V: View + ?Sized + 'static>(&self) -> Option<OuterViewPort<V>> where V::Msg: Clone {
Some(self.repr.read().unwrap().port.clone()?.downcast::<V>().ok().unwrap())
}
pub fn downcast(&self, dst_type: TypeTerm) -> Option<Object> {
@ -201,6 +201,8 @@ impl Object {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
pub struct TypeLadder(Vec<TypeTerm>);
#[derive(Clone, Copy, Hash, PartialEq, Eq)]
pub enum MorphismMode {
/// Isomorphism
@ -310,7 +312,8 @@ impl Context {
&self,
name: &str,
type_ladder: impl Iterator<Item = &'a str>
) -> Option<OuterViewPort<V>> {
) -> Option<OuterViewPort<V>>
where V::Msg: Clone {
self.get_obj(&name.into())?
.downcast_ladder(
type_ladder.map(|tn| self.type_dict.type_term_from_str(tn).unwrap())
@ -367,7 +370,8 @@ impl Context {
&mut self,
name: &str,
type_ladder: impl Iterator<Item = &'a str>
) -> Option<OuterViewPort<V>> {
) -> Option<OuterViewPort<V>>
where V::Msg: Clone {
if let Some(p) = self.get_obj_port(name, type_ladder) {
Some(p)
} else {

View file

@ -1,5 +1,10 @@
use {
crate::core::View,
crate::{
core::{
View,
channel::{channel, ChannelSender, ChannelReceiver}
}
},
std::{
sync::{Arc, Weak}
},
@ -13,7 +18,7 @@ use {
\*/
pub trait Observer<V: View + ?Sized> : Send + Sync {
fn reset(&mut self, _view: Option<Arc<V>>) {}
fn notify(&self, msg: &V::Msg);
fn notify(&mut self, msg: &V::Msg);
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
@ -23,19 +28,19 @@ impl<V: View + ?Sized, O: Observer<V>> Observer<V> for Arc<RwLock<O>> {
self.write().unwrap().reset(view);
}
fn notify(&self, msg: &V::Msg) {
self.read().unwrap().notify(&msg);
fn notify(&mut self, msg: &V::Msg) {
self.write().unwrap().notify(msg);
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
pub trait ObserverExt<V: View + ?Sized> : Observer<V> {
fn notify_each(&self, it: impl IntoIterator<Item = V::Msg>);
fn notify_each(&mut self, it: impl IntoIterator<Item = V::Msg>);
}
impl<V: View + ?Sized, T: Observer<V>> ObserverExt<V> for T {
fn notify_each(&self, it: impl IntoIterator<Item = V::Msg>) {
fn notify_each(&mut self, it: impl IntoIterator<Item = V::Msg>) {
for msg in it {
self.notify(&msg);
}
@ -47,14 +52,19 @@ impl<V: View + ?Sized, T: Observer<V>> ObserverExt<V> for T {
Broadcast
<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
\*/
pub struct ObserverBroadcast<V: View + ?Sized> {
pub struct ObserverBroadcast<V: View + ?Sized>
where V::Msg : Send + Sync {
rx: ChannelReceiver<Vec<V::Msg>>,
tx: ChannelSender<Vec<V::Msg>>,
observers: Vec<Weak<RwLock<dyn Observer<V>>>>
}
impl<V: View + ?Sized> ObserverBroadcast<V> {
impl<V: View + ?Sized> ObserverBroadcast<V>
where V::Msg : Clone + Send + Sync {
pub fn new() -> Self {
let (tx, rx) = channel::<Vec<V::Msg>>();
ObserverBroadcast {
rx, tx,
observers: Vec::new()
}
}
@ -71,19 +81,28 @@ impl<V: View + ?Sized> ObserverBroadcast<V> {
fn iter(&self) -> impl Iterator<Item = Arc<RwLock<dyn Observer<V>>>> + '_ {
self.observers.iter().filter_map(|o| o.upgrade())
}
pub fn update(&self) {
if let Some(msg_vec) = self.rx.try_recv() {
for msg in msg_vec {
for o in self.iter() {
o.write().unwrap().notify(&msg);
}
}
}
}
}
impl<V: View + ?Sized> Observer<V> for ObserverBroadcast<V> {
impl<V: View + ?Sized> Observer<V> for ObserverBroadcast<V>
where V::Msg: Clone {
fn reset(&mut self, view: Option<Arc<V>>) {
for o in self.iter() {
o.write().unwrap().reset(view.clone());
}
}
fn notify(&self, msg: &V::Msg) {
for o in self.iter() {
o.read().unwrap().notify(&msg);
}
fn notify(&mut self, msg: &V::Msg) {
self.tx.send(msg.clone());
}
}
@ -110,7 +129,7 @@ where V: View + ?Sized,
impl<V, F> Observer<V> for NotifyFnObserver<V, F>
where V: View + ?Sized,
F: Fn(&V::Msg) + Send + Sync {
fn notify(&self, msg: &V::Msg) {
fn notify(&mut self, msg: &V::Msg) {
(self.f)(msg);
}
}
@ -138,7 +157,7 @@ where V: View + ?Sized,
impl<V, F> Observer<V> for ResetFnObserver<V, F>
where V: View + ?Sized,
F: Fn(Option<Arc<V>>) + Send + Sync {
fn notify(&self, _msg: &V::Msg) {}
fn notify(&mut self, _msg: &V::Msg) {}
fn reset(&mut self, view: Option<Arc<V>>) {
(self.f)(view);
}

View file

@ -11,6 +11,11 @@ use {
}
};
pub trait UpdateTask : Send + Sync {
fn update(&self);
}
/*\
<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
View Port
@ -18,14 +23,17 @@ use {
\*/
pub struct ViewPort<V: View + ?Sized> {
view: Arc<RwLock<Option<Arc<V>>>>,
cast: Arc<RwLock<ObserverBroadcast<V>>>
cast: Arc<RwLock<ObserverBroadcast<V>>>,
pub update_hooks: Arc<RwLock<Vec<Arc<dyn UpdateTask>>>>
}
impl<V: View + ?Sized> ViewPort<V> {
impl<V: View + ?Sized> ViewPort<V>
where V::Msg: Clone {
pub fn new() -> Self {
ViewPort {
view: Arc::new(RwLock::new(None)),
cast: Arc::new(RwLock::new(ObserverBroadcast::new()))
cast: Arc::new(RwLock::new(ObserverBroadcast::new())),
update_hooks: Arc::new(RwLock::new(Vec::new()))
}
}
@ -45,40 +53,86 @@ impl<V: View + ?Sized> ViewPort<V> {
observer.write().unwrap().reset(self.view.read().unwrap().clone());
}
pub fn add_update_hook(&self, hook_cast: Arc<dyn UpdateTask>) {
self.update_hooks.write().unwrap().push(hook_cast);
}
pub fn inner(&self) -> InnerViewPort<V> {
InnerViewPort(ViewPort{ view: self.view.clone(), cast: self.cast.clone() })
InnerViewPort(
ViewPort{
view: self.view.clone(),
cast: self.cast.clone(),
update_hooks: self.update_hooks.clone()
}
)
}
pub fn outer(&self) -> OuterViewPort<V> {
OuterViewPort(ViewPort{ view: self.view.clone(), cast: self.cast.clone() })
OuterViewPort(
ViewPort{
view: self.view.clone(),
cast: self.cast.clone(),
update_hooks: self.update_hooks.clone()
}
)
}
pub fn into_inner(self) -> InnerViewPort<V> {
InnerViewPort(ViewPort{ view: self.view, cast: self.cast })
InnerViewPort(
ViewPort{
view: self.view,
cast: self.cast,
update_hooks: self.update_hooks
}
)
}
pub fn into_outer(self) -> OuterViewPort<V> {
OuterViewPort(ViewPort{ view: self.view, cast: self.cast })
OuterViewPort(
ViewPort{
view: self.view,
cast: self.cast,
update_hooks: self.update_hooks
}
)
}
}
impl<V: View + ?Sized> Clone for ViewPort<V> {
impl<V: View + ?Sized> UpdateTask for ViewPort<V> where V::Msg: Clone + Send + Sync{
fn update(&self) {
let mut v = {
let t = self.update_hooks.read().unwrap();
t.iter().cloned().collect::<Vec<_>>()
};
for hook in v {
hook.update();
}
self.cast.read().unwrap().update();
}
}
impl<V: View + ?Sized> Clone for ViewPort<V>
where V::Msg: Clone {
fn clone(&self) -> Self {
ViewPort {
view: self.view.clone(),
cast: self.cast.clone()
cast: self.cast.clone(),
update_hooks: self.update_hooks.clone()
}
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
pub struct InnerViewPort<V: View + ?Sized>(ViewPort<V>);
pub struct OuterViewPort<V: View + ?Sized>(ViewPort<V>);
pub struct InnerViewPort<V: View + ?Sized>(pub ViewPort<V>) where V::Msg: Clone;
pub struct OuterViewPort<V: View + ?Sized>(pub ViewPort<V>) where V::Msg: Clone;
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<V: View + ?Sized> InnerViewPort<V> {
impl<V: View + ?Sized> InnerViewPort<V> where V::Msg: Clone {
pub fn get_broadcast(&self) -> Arc<RwLock<ObserverBroadcast<V>>> {
self.0.cast.clone()
}
@ -93,11 +147,12 @@ impl<V: View + ?Sized> InnerViewPort<V> {
}
pub fn notify(&self, msg: &V::Msg) {
self.0.cast.read().unwrap().notify(msg);
self.0.cast.write().unwrap().notify(msg);
}
}
impl<V: View + ?Sized> Clone for InnerViewPort<V> {
impl<V: View + ?Sized> Clone for InnerViewPort<V>
where V::Msg: Clone {
fn clone(&self) -> Self {
InnerViewPort(self.0.clone())
}
@ -105,7 +160,8 @@ impl<V: View + ?Sized> Clone for InnerViewPort<V> {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<V: View + ?Sized + 'static> OuterViewPort<V> {
impl<V: View + ?Sized + 'static> OuterViewPort<V>
where V::Msg: Clone {
pub fn get_view(&self) -> Option<Arc<V>> {
self.0.view.read().unwrap().clone()
}
@ -132,7 +188,8 @@ impl<V: View + ?Sized + 'static> OuterViewPort<V> {
}
}
impl<V: View + ?Sized> Clone for OuterViewPort<V> {
impl<V: View + ?Sized> Clone for OuterViewPort<V>
where V::Msg: Clone {
fn clone(&self) -> Self {
OuterViewPort(self.0.clone())
}
@ -160,19 +217,21 @@ where V::Msg: Clone {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct AnyViewPort {
view: Arc<dyn Any + Send + Sync + 'static>,
cast: Arc<dyn Any + Send + Sync + 'static>
cast: Arc<dyn Any + Send + Sync + 'static>,
update_hooks: Arc<RwLock<Vec<Arc<dyn UpdateTask>>>>
}
impl AnyViewPort {
pub fn downcast<V: View + ?Sized + 'static>(self) -> Result<ViewPort<V>, AnyViewPort> {
match (
self.view.clone().downcast::<RwLock<Option<Arc<V>>>>(),
self.cast.clone().downcast::<RwLock<ObserverBroadcast<V>>>()
self.cast.clone().downcast::<RwLock<ObserverBroadcast<V>>>(),
self.update_hooks.clone()
) {
(Ok(view), Ok(cast)) => Ok(ViewPort{view, cast}),
(Ok(view), Ok(cast), update_hooks) => Ok(ViewPort{view, cast, update_hooks}),
_ => Err(self)
}
}
@ -182,45 +241,50 @@ impl<V: View + ?Sized + 'static> From<ViewPort<V>> for AnyViewPort {
fn from(port: ViewPort<V>) -> Self {
AnyViewPort {
view: port.view as Arc<dyn Any + Send + Sync + 'static>,
cast: port.cast as Arc<dyn Any + Send + Sync + 'static>
cast: port.cast as Arc<dyn Any + Send + Sync + 'static>,
update_hooks: port.update_hooks
}
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct AnyOuterViewPort(AnyViewPort);
#[derive(Debug, Clone)]
#[derive(Clone)]
pub struct AnyInnerViewPort(AnyViewPort);
impl AnyOuterViewPort {
pub fn downcast<V: View + ?Sized + 'static>(self) -> Result<OuterViewPort<V>, AnyViewPort> {
pub fn downcast<V: View + ?Sized + 'static>(self) -> Result<OuterViewPort<V>, AnyViewPort> where V::Msg: Clone {
Ok(OuterViewPort(self.0.downcast::<V>()?))
}
}
impl<V: View + ?Sized + 'static> From<OuterViewPort<V>> for AnyOuterViewPort {
impl<V: View + ?Sized + 'static> From<OuterViewPort<V>> for AnyOuterViewPort
where V::Msg: Clone {
fn from(port: OuterViewPort<V>) -> Self {
AnyOuterViewPort(AnyViewPort{
view: port.0.view as Arc<dyn Any + Send + Sync + 'static>,
cast: port.0.cast as Arc<dyn Any + Send + Sync + 'static>
cast: port.0.cast as Arc<dyn Any + Send + Sync + 'static>,
update_hooks: port.0.update_hooks
})
}
}
impl AnyInnerViewPort {
pub fn downcast<V: View + ?Sized + 'static>(self) -> Result<InnerViewPort<V>, AnyViewPort> {
pub fn downcast<V: View + ?Sized + 'static>(self) -> Result<InnerViewPort<V>, AnyViewPort> where V::Msg: Clone {
Ok(InnerViewPort(self.0.downcast::<V>()?))
}
}
impl<V: View + ?Sized + 'static> From<InnerViewPort<V>> for AnyInnerViewPort {
impl<V: View + ?Sized + 'static> From<InnerViewPort<V>> for AnyInnerViewPort
where V::Msg: Clone {
fn from(port: InnerViewPort<V>) -> Self {
AnyInnerViewPort(AnyViewPort{
view: port.0.view as Arc<dyn Any + Send + Sync + 'static>,
cast: port.0.cast as Arc<dyn Any + Send + Sync + 'static>
cast: port.0.cast as Arc<dyn Any + Send + Sync + 'static>,
update_hooks: port.0.update_hooks
})
}
}

View file

@ -6,7 +6,7 @@
\*/
pub trait View : Send + Sync {
/// Notification message for the observers
type Msg;
type Msg : Send + Sync;
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>

View file

@ -77,7 +77,7 @@ impl<V: GridView + ?Sized> Observer<V> for GridOffset<V> {
if let Some(area) = new_area { self.cast.notify_each(area); }
}
fn notify(&self, msg: &Point2<i16>) {
fn notify(&mut self, msg: &Point2<i16>) {
self.cast.notify(&(msg + self.offset));
}
}

View file

@ -18,7 +18,10 @@ pub use {
}
};
impl<Key: 'static, Item: 'static> OuterViewPort<dyn IndexView<Key, Item = Item>> {
impl<Key, Item> OuterViewPort<dyn IndexView<Key, Item = Item>>
where Key: Clone + Send + Sync + 'static,
Item: Send + Sync + 'static
{
pub fn map_item<
DstItem: 'static,
F: Fn(&Key, &Item) -> DstItem + Send + Sync + 'static
@ -27,6 +30,8 @@ impl<Key: 'static, Item: 'static> OuterViewPort<dyn IndexView<Key, Item = Item>>
f: F
) -> OuterViewPort<dyn IndexView<Key, Item = DstItem>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
let map = MapIndexItem::new(port.inner(), f);
self.add_observer(map.clone());
port.into_outer()
@ -34,7 +39,8 @@ impl<Key: 'static, Item: 'static> OuterViewPort<dyn IndexView<Key, Item = Item>>
}
pub struct MapIndexItem<Key, DstItem, SrcView, F>
where SrcView: IndexView<Key> + ?Sized,
where Key: Clone + Send + Sync,
SrcView: IndexView<Key> + ?Sized,
F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync
{
src_view: Option<Arc<SrcView>>,
@ -43,7 +49,7 @@ where SrcView: IndexView<Key> + ?Sized,
}
impl<Key, DstItem, SrcView, F> MapIndexItem<Key, DstItem, SrcView, F>
where Key: 'static,
where Key: Clone + Send + Sync + 'static,
DstItem: 'static,
SrcView: IndexView<Key> + ?Sized + 'static,
F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync + 'static
@ -66,14 +72,16 @@ where Key: 'static,
}
impl<Key, DstItem, SrcView, F> View for MapIndexItem<Key, DstItem, SrcView, F>
where SrcView: IndexView<Key> + ?Sized,
where Key: Clone + Send + Sync,
SrcView: IndexView<Key> + ?Sized,
F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync
{
type Msg = Key;
}
impl<Key, DstItem, SrcView, F> IndexView<Key> for MapIndexItem<Key, DstItem, SrcView, F>
where SrcView: IndexView<Key> + ?Sized,
where Key: Clone + Send + Sync,
SrcView: IndexView<Key> + ?Sized,
F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync
{
type Item = DstItem;
@ -88,7 +96,8 @@ where SrcView: IndexView<Key> + ?Sized,
}
impl<Key, DstItem, SrcView, F> Observer<SrcView> for MapIndexItem<Key, DstItem, SrcView, F>
where SrcView: IndexView<Key> + ?Sized,
where Key: Clone + Send + Sync,
SrcView: IndexView<Key> + ?Sized,
F: Fn(&Key, &SrcView::Item) -> DstItem + Send + Sync
{
fn reset(&mut self, view: Option<Arc<SrcView>>) {
@ -100,7 +109,7 @@ where SrcView: IndexView<Key> + ?Sized,
if let Some(area) = new_area { self.cast.notify_each(area); }
}
fn notify(&self, msg: &Key) {
fn notify(&mut self, msg: &Key) {
self.cast.notify(msg);
}
}

View file

@ -18,9 +18,12 @@ pub use {
}
};
impl<SrcKey: 'static, Item: 'static> OuterViewPort<dyn IndexView<SrcKey, Item = Item>> {
impl<SrcKey, Item> OuterViewPort<dyn IndexView<SrcKey, Item = Item>>
where SrcKey: Clone + Send + Sync + 'static,
Item: 'static
{
pub fn map_key<
DstKey: 'static,
DstKey: Clone + Send + Sync + 'static,
F1: Fn(&SrcKey) -> DstKey + Send + Sync + 'static,
F2: Fn(&DstKey) -> Option<SrcKey> + Send + Sync + 'static,
>(
@ -29,6 +32,8 @@ impl<SrcKey: 'static, Item: 'static> OuterViewPort<dyn IndexView<SrcKey, Item =
f2: F2
) -> OuterViewPort<dyn IndexView<DstKey, Item = Item>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
let map = MapIndexKey::new(port.inner(), f1, f2);
self.add_observer(map.clone());
port.into_outer()
@ -36,7 +41,9 @@ impl<SrcKey: 'static, Item: 'static> OuterViewPort<dyn IndexView<SrcKey, Item =
}
pub struct MapIndexKey<DstKey, SrcKey, SrcView, F1, F2>
where SrcView: IndexView<SrcKey> + ?Sized,
where DstKey: Clone + Send + Sync,
SrcKey: Clone + Send + Sync,
SrcView: IndexView<SrcKey> + ?Sized,
F1: Fn(&SrcKey) -> DstKey + Send + Sync,
F2: Fn(&DstKey) -> Option<SrcKey> + Send + Sync,
{
@ -47,8 +54,8 @@ where SrcView: IndexView<SrcKey> + ?Sized,
}
impl<DstKey, SrcKey, SrcView, F1, F2> MapIndexKey<DstKey, SrcKey, SrcView, F1, F2>
where DstKey: 'static,
SrcKey: 'static,
where DstKey: Clone + Send + Sync + 'static,
SrcKey: Clone + Send + Sync + 'static,
SrcView: IndexView<SrcKey> + ?Sized + 'static,
SrcView::Item: 'static,
F1: Fn(&SrcKey) -> DstKey + Send + Sync + 'static,
@ -74,7 +81,9 @@ where DstKey: 'static,
}
impl<DstKey, SrcKey, SrcView, F1, F2> View for MapIndexKey<DstKey, SrcKey, SrcView, F1, F2>
where SrcView: IndexView<SrcKey> + ?Sized,
where DstKey: Clone + Send + Sync,
SrcKey: Clone + Send + Sync,
SrcView: IndexView<SrcKey> + ?Sized,
F1: Fn(&SrcKey) -> DstKey + Send + Sync,
F2: Fn(&DstKey) -> Option<SrcKey> + Send + Sync,
{
@ -82,7 +91,9 @@ where SrcView: IndexView<SrcKey> + ?Sized,
}
impl<DstKey, SrcKey, SrcView, F1, F2> IndexView<DstKey> for MapIndexKey<DstKey, SrcKey, SrcView, F1, F2>
where SrcView: IndexView<SrcKey> + ?Sized,
where DstKey: Clone + Send + Sync,
SrcKey: Clone + Send + Sync,
SrcView: IndexView<SrcKey> + ?Sized,
F1: Fn(&SrcKey) -> DstKey + Send + Sync,
F2: Fn(&DstKey) -> Option<SrcKey> + Send + Sync,
{
@ -98,7 +109,9 @@ where SrcView: IndexView<SrcKey> + ?Sized,
}
impl<DstKey, SrcKey, SrcView, F1, F2> Observer<SrcView> for MapIndexKey<DstKey, SrcKey, SrcView, F1, F2>
where SrcView: IndexView<SrcKey> + ?Sized,
where DstKey: Clone + Send + Sync,
SrcKey: Clone + Send + Sync,
SrcView: IndexView<SrcKey> + ?Sized,
F1: Fn(&SrcKey) -> DstKey + Send + Sync,
F2: Fn(&DstKey) -> Option<SrcKey> + Send + Sync,
{
@ -111,7 +124,7 @@ where SrcView: IndexView<SrcKey> + ?Sized,
if let Some(area) = new_area { self.cast.notify_each(area); }
}
fn notify(&self, msg: &SrcKey) {
fn notify(&mut self, msg: &SrcKey) {
self.cast.notify(&(self.f1)(msg));
}
}

View file

@ -13,7 +13,8 @@ use {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
pub trait IndexView<Key> : View<Msg = Key> {
pub trait IndexView<Key> : View<Msg = Key>
where Key: Send + Sync {
type Item;
fn get(&self, key: &Key) -> Option<Self::Item>;
@ -26,7 +27,10 @@ pub trait IndexView<Key> : View<Msg = Key> {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<Key, V: IndexView<Key> + ?Sized> IndexView<Key> for RwLock<V> {
impl<Key, V> IndexView<Key> for RwLock<V>
where Key: Send + Sync,
V: IndexView<Key> + ?Sized
{
type Item = V::Item;
fn get(&self, key: &Key) -> Option<Self::Item> {
@ -38,7 +42,11 @@ impl<Key, V: IndexView<Key> + ?Sized> IndexView<Key> for RwLock<V> {
}
}
impl<Key, V: IndexView<Key> + ?Sized> IndexView<Key> for Arc<V> {
impl<Key, V> IndexView<Key> for Arc<V>
where Key: Send + Sync,
V: IndexView<Key> + ?Sized
{
type Item = V::Item;
fn get(&self, key: &Key) -> Option<Self::Item> {
@ -50,7 +58,10 @@ impl<Key, V: IndexView<Key> + ?Sized> IndexView<Key> for Arc<V> {
}
}
impl<Key, V: IndexView<Key>> IndexView<Key> for Option<V> {
impl<Key, V> IndexView<Key> for Option<V>
where Key: Send + Sync,
V: IndexView<Key>
{
type Item = V::Item;
fn get(&self, key: &Key) -> Option<Self::Item> {
@ -69,7 +80,7 @@ impl<Key, V: IndexView<Key>> IndexView<Key> for Option<V> {
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
pub trait ImplIndexView : Send + Sync {
type Key;
type Key : Send + Sync;
type Value;
fn get(&self, key: &Self::Key) -> Option<Self::Value>;

View file

@ -53,7 +53,7 @@ impl Add {
b: OuterViewPort<dyn SequenceView<Item = usize>>,
c: InnerViewPort<RwLock<Vec<usize>>>//<dyn SequenceView<Item = usize>>
) -> Arc<RwLock<Self>> {
let mut proj_helper = ProjectionHelper::new();
let mut proj_helper = ProjectionHelper::new(c.0.update_hooks.clone());
let add = Arc::new(RwLock::new(
Add {
radix,

View file

@ -82,7 +82,7 @@ impl Observer<dyn SequenceView<Item = usize>> for RadixProjection {
self.src_digits = view;
}
fn notify(&self, _idx: &usize) {
fn notify(&mut self, _idx: &usize) {
// todo:
// src digit i changed.
// which dst-digits does it affect?

View file

@ -35,7 +35,7 @@ impl LeveledTermView {
src_port: OuterViewPort<dyn TerminalView>,
dst_port: InnerViewPort<dyn TerminalView>
) -> Arc<RwLock<Self>> {
let mut proj_helper = ProjectionHelper::new();
let mut proj_helper = ProjectionHelper::new(dst_port.0.update_hooks.clone());
let v = Arc::new(RwLock::new(
LeveledTermView {

View file

@ -2,21 +2,19 @@ use {
std::{
cmp::{max},
any::Any,
sync::{Arc, Weak},
},
async_std::{
stream::StreamExt
sync::{Arc, Weak}
},
std::sync::RwLock,
crate::{
core::{
View,
Observer, ObserverExt,
port::UpdateTask,
OuterViewPort,
channel::{
channel,
ChannelSender, ChannelReceiver,
ChannelData,
ChannelSender
channel
}
},
singleton::{SingletonView},
@ -29,14 +27,16 @@ use {
pub struct ProjectionHelper<P: Send + Sync + 'static> {
keepalive: Vec<Arc<dyn Any + Send + Sync>>,
proj: Arc<RwLock<Weak<RwLock<P>>>>
proj: Arc<RwLock<Weak<RwLock<P>>>>,
update_hooks: Arc<RwLock<Vec<Arc<dyn UpdateTask>>>>
}
impl<P: Send + Sync + 'static> ProjectionHelper<P> {
pub fn new() -> Self {
pub fn new(update_hooks: Arc<RwLock<Vec<Arc<dyn UpdateTask>>>>) -> Self {
ProjectionHelper {
keepalive: Vec::new(),
proj: Arc::new(RwLock::new(Weak::new()))
proj: Arc::new(RwLock::new(Weak::new())),
update_hooks
}
}
@ -52,6 +52,7 @@ impl<P: Send + Sync + 'static> ProjectionHelper<P> {
port: OuterViewPort<dyn SingletonView<Item = Item>>,
notify: impl Fn(&mut P, &()) + Send + Sync + 'static
) -> Arc<RwLock<Option<Arc<dyn SingletonView<Item = Item>>>>> {
self.update_hooks.write().unwrap().push(Arc::new(port.0.clone()));
port.add_observer(self.new_arg(notify));
port.get_view_arc()
}
@ -61,6 +62,7 @@ impl<P: Send + Sync + 'static> ProjectionHelper<P> {
port: OuterViewPort<dyn SequenceView<Item = Item>>,
notify: impl Fn(&mut P, &usize) + Send + Sync + 'static
) -> Arc<RwLock<Option<Arc<dyn SequenceView<Item = Item>>>>> {
self.update_hooks.write().unwrap().push(Arc::new(port.0.clone()));
port.add_observer(self.new_arg(notify));
port.get_view_arc()
}
@ -70,7 +72,10 @@ impl<P: Send + Sync + 'static> ProjectionHelper<P> {
port: OuterViewPort<dyn IndexView<Key, Item = Item>>,
notify: impl Fn(&mut P, &Key) + Send + Sync + 'static
) -> Arc<RwLock<Option<Arc<dyn IndexView<Key, Item = Item>>>>> {
port.add_observer(self.new_arg(notify));
self.update_hooks.write().unwrap().push(Arc::new(port.0.clone()));
let arg = self.new_arg(notify);
port.add_observer(arg);
port.get_view_arc()
}
@ -79,46 +84,76 @@ impl<P: Send + Sync + 'static> ProjectionHelper<P> {
>(
&mut self,
notify: impl Fn(&mut P, &V::Msg) + Send + Sync + 'static
) -> Arc<RwLock<ProjectionArg<V, Vec<V::Msg>>>>
where V::Msg: Send + Sync {
let (tx, mut rx) = channel::<Vec<V::Msg>>();
) -> Arc<RwLock<ProjectionArg<P, V, Vec<V::Msg>>>>
where V::Msg: Send + Sync
{
let (tx, rx) = channel::<Vec<V::Msg>>();
let arg = Arc::new(RwLock::new(
ProjectionArg {
src: None,
sender: tx
notify: Box::new(notify),
proj: self.proj.clone(),
rx, tx
}));
let proj = self.proj.clone();
async_std::task::spawn(async move {
while let Some(msg) = rx.next().await {
if let Some(proj) = proj.read().unwrap().upgrade() {
notify(&mut *proj.write().unwrap(), &msg);
}
}
});
self.keepalive.push(arg.clone());
self.update_hooks.write().unwrap().push(arg.clone());
arg
}
}
/// Special Observer which can access the state of the projection on notify
/// also handles the reset() and default behaviour of unitinitalized inputs
pub struct ProjectionArg<V, D>
where V: View + ?Sized,
/// also handles the reset()
pub struct ProjectionArg<P, V, D>
where P: Send + Sync + 'static,
V: View + ?Sized,
D: ChannelData<Item = V::Msg>,
D::IntoIter: Send + Sync
{
src: Option<Arc<V>>,
sender: ChannelSender<D>
notify: Box<dyn Fn(&mut P, &V::Msg) + Send + Sync + 'static>,
proj: Arc<RwLock<Weak<RwLock<P>>>>,
rx: ChannelReceiver<D>,
tx: ChannelSender<D>
}
impl<P, V, D> UpdateTask for ProjectionArg<P, V, D>
where P: Send + Sync + 'static,
V: View + ?Sized,
D: ChannelData<Item = V::Msg>,
D::IntoIter: Send + Sync
{
fn update(&self) {
if let Some(p) = self.proj.read().unwrap().upgrade() {
if let Some(data) = self.rx.try_recv() {
for msg in data {
(self.notify)(
&mut *p.write().unwrap(),
&msg
);
}
}
}
}
}
impl<P, V, D> UpdateTask for RwLock<ProjectionArg<P, V, D>>
where P: Send + Sync + 'static,
V: View + ?Sized,
D: ChannelData<Item = V::Msg>,
D::IntoIter: Send + Sync
{
fn update(&self) {
self.read().unwrap().update();
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<Item, D> Observer<dyn SingletonView<Item = Item>> for ProjectionArg<dyn SingletonView<Item = Item>, D>
where D: ChannelData<Item = ()>,
impl<P, Item, D> Observer<dyn SingletonView<Item = Item>> for ProjectionArg<P, dyn SingletonView<Item = Item>, D>
where P: Send + Sync + 'static,
D: ChannelData<Item = ()>,
D::IntoIter: Send + Sync
{
fn reset(&mut self, new_src: Option<Arc<dyn SingletonView<Item = Item>>>) {
@ -126,15 +161,16 @@ where D: ChannelData<Item = ()>,
self.notify(&());
}
fn notify(&self, msg: &()) {
self.sender.send(*msg);
fn notify(&mut self, msg: &()) {
self.tx.send(msg.clone());
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<Item, D> Observer<dyn SequenceView<Item = Item>> for ProjectionArg<dyn SequenceView<Item = Item>, D>
where D: ChannelData<Item = usize>,
impl<P, Item, D> Observer<dyn SequenceView<Item = Item>> for ProjectionArg<P, dyn SequenceView<Item = Item>, D>
where P: Send + Sync + 'static,
D: ChannelData<Item = usize>,
D::IntoIter: Send + Sync
{
fn reset(&mut self, new_src: Option<Arc<dyn SequenceView<Item = Item>>>) {
@ -145,15 +181,17 @@ where D: ChannelData<Item = usize>,
self.notify_each(0 .. max(old_len, new_len));
}
fn notify(&self, msg: &usize) {
self.sender.send(*msg);
fn notify(&mut self, msg: &usize) {
self.tx.send(*msg);
}
}
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
impl<Key: Clone, Item, D> Observer<dyn IndexView<Key, Item = Item>> for ProjectionArg<dyn IndexView<Key, Item = Item>, D>
where D: ChannelData<Item = Key>,
impl<P, Key, Item, D> Observer<dyn IndexView<Key, Item = Item>> for ProjectionArg<P, dyn IndexView<Key, Item = Item>, D>
where P: Send + Sync + 'static,
Key: Clone + Send + Sync,
D: ChannelData<Item = Key>,
D::IntoIter: Send + Sync
{
fn reset(&mut self, new_src: Option<Arc<dyn IndexView<Key, Item = Item>>>) {
@ -165,8 +203,8 @@ where D: ChannelData<Item = Key>,
if let Some(area) = new_area { self.notify_each(area); }
}
fn notify(&self, msg: &Key) {
self.sender.send(msg.clone());
fn notify(&mut self, msg: &Key) {
self.tx.send(msg.clone());
}
}

View file

@ -18,6 +18,8 @@ impl<V: SequenceView + ?Sized + 'static> OuterViewPort<V>
P: Fn(&V::Item) -> bool + Send + Sync + 'static
>(&self, pred: P) -> OuterViewPort<dyn SequenceView<Item = V::Item>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
let filter = Arc::new(RwLock::new(
Filter {
src_view: None,
@ -115,7 +117,7 @@ where SrcView: SequenceView + ?Sized + 'static,
if let Some(len) = new_len { self.cast.notify_each(0 .. len); }
}
fn notify(&self, idx: &usize) {
fn notify(&mut self, idx: &usize) {
let l = self.len().unwrap_or(0)+1;
let np =
if let Some(x) = self.src_view.get(idx) {

View file

@ -65,7 +65,7 @@ where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
self.notify_each(0 .. std::cmp::max(old_len, new_len));
}
fn notify(&self, chunk_idx: &usize) {
fn notify(&mut self, chunk_idx: &usize) {
self.sender.send(*chunk_idx);
}
}
@ -81,7 +81,7 @@ where V2: SequenceView + ?Sized + 'static
self.notify_each(0 .. std::cmp::max(old_len, new_len));
}
fn notify(&self, idx: &usize) {
fn notify(&mut self, idx: &usize) {
self.sender.send(*idx);
}
}
@ -139,7 +139,7 @@ where V1: SequenceView<Item = OuterViewPort<V2>> + ?Sized + 'static,
while let Some(chunk_idx) = recv.next().await {
if let Some(mut chunk_rcv) = f.write().unwrap().update_chunk(chunk_idx) {
let f = f.clone();
let cast = cast.clone();
let mut cast = cast.clone();
async_std::task::spawn(async move {
while let Some(idx) = chunk_rcv.next().await {
let mut flat = f.write().unwrap();

View file

@ -21,6 +21,8 @@ impl<Item: 'static> OuterViewPort<dyn SequenceView<Item = Item>> {
f: F
) -> OuterViewPort<dyn SequenceView<Item = DstItem>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
let map = Arc::new(RwLock::new(MapSequenceItem {
src_view: None,
f,
@ -83,7 +85,7 @@ where SrcView: SequenceView + ?Sized,
if let Some(len) = new_len { self.cast.notify_each(0 .. len ); }
}
fn notify(&self, msg: &usize) {
fn notify(&mut self, msg: &usize) {
self.cast.notify(msg);
}
}

View file

@ -39,6 +39,7 @@ where SrcView: SequenceView + ?Sized + 'static {
impl<Item: 'static> OuterViewPort<dyn SequenceView<Item = Item>> {
pub fn to_index(&self) -> OuterViewPort<dyn IndexView<usize, Item = Item>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
self.add_observer(Sequence2Index::new(port.inner()));
port.into_outer()
}
@ -73,7 +74,7 @@ where SrcView: SequenceView + ?Sized + 'static {
if let Some(area) = new_area { self.cast.notify_each(area); }
}
fn notify(&self, msg: &usize) {
fn notify(&mut self, msg: &usize) {
self.cast.notify(msg);
}
}

View file

@ -61,6 +61,8 @@ impl<T> OuterViewPort<RwLock<Vec<T>>>
where T: Clone + Send + Sync + 'static {
pub fn to_sequence(&self) -> OuterViewPort<dyn SequenceView<Item = T>> {
let port = ViewPort::new();
port.add_update_hook(Arc::new(self.0.clone()));
let vec_seq = VecSequence::new(port.inner());
self.add_observer(vec_seq.clone());
port.into_outer()
@ -114,7 +116,7 @@ where T: Clone + Serialize + Send + Sync + 'static,
out.flush().expect("");
}
fn notify(&self, diff: &VecDiff<T>) {
fn notify(&mut self, diff: &VecDiff<T>) {
let mut out = self.out.write().unwrap();
out.write(&bincode::serialized_size(diff).unwrap().to_le_bytes()).expect("");
out.write(&bincode::serialize(diff).unwrap()).expect("");
@ -142,7 +144,7 @@ where T: Clone + Serialize + Send + Sync + 'static,
self.out.write().unwrap().flush().expect("");
}
fn notify(&self, diff: &VecDiff<T>) {
fn notify(&mut self, diff: &VecDiff<T>) {
self.out.write().unwrap().write(serde_json::to_string(diff).unwrap().as_bytes()).expect("");
self.out.write().unwrap().write(b"\n").expect("");
self.out.write().unwrap().flush().expect("");
@ -184,7 +186,7 @@ where T: Clone + Send + Sync + 'static {
self.cast.notify_each(0 .. std::cmp::max(old_len, new_len));
}
fn notify(&self, diff: &VecDiff<T>) {
fn notify(&mut self, diff: &VecDiff<T>) {
match diff {
VecDiff::Clear => {
let l = {

View file

@ -166,7 +166,7 @@ pub mod insert_view {
data_port: OuterViewPort<dyn SequenceView<Item = char>>,
out_port: InnerViewPort<dyn TerminalView>
) -> Arc<RwLock<Self>> {
let mut proj_helper = ProjectionHelper::new();
let mut proj_helper = ProjectionHelper::new(out_port.0.update_hooks.clone());
let proj = Arc::new(RwLock::new(
StringInsertView {

View file

@ -30,7 +30,7 @@ impl TerminalCompositor {
TerminalCompositor {
layers: Vec::new(),
cast: port.get_broadcast(),
proj_helper: ProjectionHelper::new()
proj_helper: ProjectionHelper::new(port.0.update_hooks.clone())
}
));

View file

@ -135,7 +135,7 @@ impl Observer<dyn TerminalView> for TermOutObserver {
}
}
fn notify(&self, pos: &Point2<i16>) {
fn notify(&mut self, pos: &Point2<i16>) {
self.dirty_pos_tx.send(*pos);
}
}