From ebd11796adc98a4476d9703293c36f1a4d6caeb3 Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Mon, 7 Dec 2020 18:09:48 +0100 Subject: [PATCH] improve ports add Send+Sync as supertraits for View and Observer and notify() immutable --- src/channel.rs | 22 ++++++---- src/main.rs | 82 +++++++++++++++++++++++++++-------- src/port.rs | 115 ++++++++++++++++++++++++++++--------------------- src/view.rs | 96 ++++++++++++++++++++++++++++++++++++----- 4 files changed, 228 insertions(+), 87 deletions(-) diff --git a/src/channel.rs b/src/channel.rs index 6f7daf4..e6bd5ac 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -23,7 +23,7 @@ use { Traits <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ -pub trait ChannelData : Default + IntoIterator { +pub trait ChannelData : Default + IntoIterator + Send + Sync { fn channel_insert(&mut self, x: Self::Item); } @@ -33,7 +33,8 @@ pub trait ChannelData : Default + IntoIterator { Queue Channel <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ -impl ChannelData for Vec { +impl ChannelData for Vec +where T: Send + Sync { fn channel_insert(&mut self, x: T) { self.push(x); } @@ -44,7 +45,8 @@ impl ChannelData for Vec { Set Channel <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ -impl ChannelData for HashSet { +impl ChannelData for HashSet +where T: Eq + Hash + Send + Sync { fn channel_insert(&mut self, x: T) { self.insert(x); } @@ -55,7 +57,8 @@ impl ChannelData for HashSet { Singleton Channel <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ -impl ChannelData for Option { +impl ChannelData for Option +where T: Send + Sync { fn channel_insert(&mut self, x: T) { *self = Some(x); } @@ -80,10 +83,11 @@ pub struct ChannelReceiver(Arc>>); //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -impl Observer for ChannelSender { +impl Observer for ChannelSender +where Data::IntoIter: Send + Sync { type Msg = Data::Item; - fn notify(&mut self, msg: Data::Item) { + fn notify(&self, msg: Data::Item) { let mut state = self.0.lock().unwrap(); if state.send_buf.is_none() { @@ -163,15 +167,15 @@ pub fn channel() -> (ChannelSender, ChannelReceiver() -> (ChannelSender>, ChannelReceiver>) { +pub fn set_channel() -> (ChannelSender>, ChannelReceiver>) { channel::>() } -pub fn queue_channel() -> (ChannelSender>, ChannelReceiver>) { +pub fn queue_channel() -> (ChannelSender>, ChannelReceiver>) { channel::>() } -pub fn singleton_channel() -> (ChannelSender>, ChannelReceiver>) { +pub fn singleton_channel() -> (ChannelSender>, ChannelReceiver>) { channel::>() } diff --git a/src/main.rs b/src/main.rs index ec1481b..4ab047f 100644 --- a/src/main.rs +++ b/src/main.rs @@ -5,36 +5,82 @@ pub mod view; pub mod port; pub mod channel; -use async_std::task; -use async_std::prelude::*; -use async_std::stream; +use { + async_std::{ + prelude::*, task, stream + }, + std::{ + sync::{Arc, RwLock} + }, + cgmath::{Vector2}, + crate::{ + view::{View, Observer}, + port::{InnerViewPort, OuterViewPort} + } +}; -use std::sync::{Arc, RwLock}; -use cgmath::Vector2; +struct SingletonBuffer { + data: Arc>>, + port: InnerViewPort<(), T> +} -use view::{View, Observer}; -use port::{ViewPortIn, ViewPortOut}; +impl SingletonBuffer { + fn new( + port: InnerViewPort<(), T> + ) -> Self { + let data = Arc::new(RwLock::new(None)); + + port.set_view_fn({ + let data = data.clone(); + move |new_val| data.read().unwrap().clone() + }); + + SingletonBuffer { + data, + port + } + } + + fn update(&mut self, new_value: T) { + let mut data = self.data.write().unwrap(); + if *data != Some(new_value.clone()) { + *data = Some(new_value); + drop(data); + self.port.notify(()); + } + } +} #[async_std::main] async fn main() { - let (view_in, mut view_out) = port::view_port::(); + let view_port = port::ViewPort::<(), char>::new(); - let mut observer_stream = view_in.stream().map({ - let view = view_in.clone(); - move |idx| (idx, view.view(idx).unwrap()) + let mut buf = SingletonBuffer::new(view_port.inner()); + + let view = view_port.outer().get_view(); + let mut stream = view_port.outer().stream().map({ + move |_| view.read().unwrap().as_ref().unwrap().view(()).unwrap() }); - let fut = task::spawn(async move { - while let Some((idx, val)) = observer_stream.next().await { - println!("view[{}] = {}", idx, val); + let fut = task::spawn({ + async move { + while let Some(val) = stream.next().await { + println!("{}", val); + } + println!("end print task"); } }); - view_out.set_view_fn(|idx| Some(if idx % 2 == 0 { 'λ' } else { 'y' }) ); + buf.update('a'); + buf.update('b'); + task::sleep(std::time::Duration::from_secs(1)).await; + buf.update('c'); + buf.update('d'); + task::sleep(std::time::Duration::from_secs(1)).await; + buf.update('e'); - view_out.notify(1); - view_out.notify(2); - view_out.notify(5); + drop(buf); + drop(view_port); fut.await; } diff --git a/src/port.rs b/src/port.rs index e1eb058..9f99a7b 100644 --- a/src/port.rs +++ b/src/port.rs @@ -1,12 +1,13 @@ - -use std::{ - sync::{Arc, Weak, RwLock}, - collections::HashSet, - hash::Hash -}; -use crate::{ - view::{View, Observer, FnView, FnObserver}, - channel::{ChannelReceiver} +use { + std::{ + sync::{Arc, Weak, RwLock}, + collections::HashSet, + hash::Hash, + }, + crate::{ + view::{View, Observer, FnView, FnObserver}, + channel::{ChannelReceiver} + } }; /*\ @@ -14,62 +15,77 @@ use crate::{ View Port <<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> \*/ - -pub struct ViewPort { - view: Option + Send + Sync>>, - observers: Vec + Send + Sync>>> +#[derive(Clone)] +pub struct ViewPort { + view: Arc>>>>, + observers: Arc>>>> } -pub fn view_port() -> (ViewPortIn, ViewPortOut) { - let state = Arc::new(RwLock::new(ViewPort{ view: None, observers: Vec::new() })); - (ViewPortIn(state.clone()), ViewPortOut(state)) +impl ViewPort +where K: Send + Sync + 'static, + V: Send + Sync + 'static { + pub fn new() -> Self { + ViewPort { + view: Arc::new(RwLock::new(None)), + observers: Arc::new(RwLock::new(Vec::new())) + } + } + + pub fn set_view(&self, view: Arc>) { + *self.view.write().unwrap() = Some(view); + } + + pub fn add_observer(&self, observer: Arc>) { + self.observers.write().unwrap().push(observer); + } + + pub fn inner(&self) -> InnerViewPort { + InnerViewPort(ViewPort{ view: self.view.clone(), observers: self.observers.clone() }) + } + + pub fn outer(&self) -> OuterViewPort { + OuterViewPort(ViewPort{ view: self.view.clone(), observers: self.observers.clone() }) + } } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> #[derive(Clone)] -pub struct ViewPortIn(Arc>>); -impl ViewPortIn { - pub fn add_observer(&self, observer: Arc + Send + Sync>>) { - self.0 - .write().unwrap() - .observers - .push(observer); +pub struct InnerViewPort(ViewPort); + +#[derive(Clone)] +pub struct OuterViewPort(ViewPort); + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +impl OuterViewPort { + pub fn get_view(&self) -> Arc>>>> { + self.0.view.clone() } - pub fn add_observer_fn(&self, obs_fn: impl FnMut(K) + Send + Sync + 'static) { - self.add_observer(Arc::new(RwLock::new(FnObserver::new(obs_fn)))); + pub fn add_observer(self, observer: Arc>) -> Arc>>>> { + self.0.add_observer(observer); + self.0.view + } + + pub fn add_observer_fn(self, obs_fn: impl Fn(K) + Send + Sync + 'static) -> Arc>>>> { + self.add_observer(Arc::new(FnObserver::new(obs_fn))) } } -impl ViewPortIn { +impl OuterViewPort { pub fn stream(&self) -> ChannelReceiver> { let (s, r) = crate::channel::set_channel(); - self.add_observer(Arc::new(RwLock::new(s))); + self.0.add_observer(Arc::new(s)); r } } -impl View for ViewPortIn { - type Key = K; - type Value = V; - - fn view(&self, key: K) -> Option { - if let Some(view) = self.0.read().unwrap().view.as_ref() { - view.view(key) - } else { - println!("Warning: trying to access InPort with uninitialized View!"); - None - } - } -} - //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub struct ViewPortOut(Arc>>); -impl ViewPortOut { +impl InnerViewPort { pub fn set_view(&self, view: Arc + Send + Sync>) { - self.0.write().unwrap().view = Some(view); + *self.0.view.write().unwrap() = Some(view); } pub fn set_view_fn(&self, view_fn: impl Fn(K) -> Option + Send + Sync + 'static) { @@ -77,13 +93,14 @@ impl ViewPortOut { } } -impl Observer for ViewPortOut -where K: Clone { +impl Observer for InnerViewPort +where K: Clone + Send + Sync + 'static, + V: Send + Sync + 'static { type Msg = K; - fn notify(&mut self, msg: K) { - for observer in self.0.read().unwrap().observers.iter() { - observer.write().unwrap().notify(msg.clone()); + fn notify(&self, msg: K) { + for observer in self.0.observers.read().unwrap().iter() { + observer.notify(msg.clone()); } } } diff --git a/src/view.rs b/src/view.rs index 798c82c..75ad91d 100644 --- a/src/view.rs +++ b/src/view.rs @@ -1,17 +1,17 @@ //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub trait View { +pub trait View : Send + Sync { type Key; type Value; fn view(&self, key: Self::Key) -> Option; } -pub trait Observer { +pub trait Observer : Send + Sync { type Msg; - fn notify(&mut self, key: Self::Msg); + fn notify(&self, key: Self::Msg); } //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> @@ -29,14 +29,19 @@ pub trait GridObserver = Observer>; //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub struct FnView Option> { +pub struct FnView +where K: Send + Sync, + V: Send + Sync, + F: Fn(K) -> Option + Send + Sync { f: F, _phantom0: std::marker::PhantomData, _phantom1: std::marker::PhantomData } impl FnView -where F: Fn(K) -> Option { +where K: Send + Sync, + V: Send + Sync, + F: Fn(K) -> Option + Send + Sync { pub fn new(f: F) -> Self { FnView { f, @@ -47,7 +52,9 @@ where F: Fn(K) -> Option { } impl View for FnView -where F: Fn(K) -> Option { +where K: Send + Sync, + V: Send + Sync, + F: Fn(K) -> Option + Send + Sync { type Key = K; type Value = V; @@ -58,13 +65,16 @@ where F: Fn(K) -> Option { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> -pub struct FnObserver { +pub struct FnObserver +where T: Send + Sync, + F: Fn(T) + Send + Sync { f: F, _phantom: std::marker::PhantomData } impl FnObserver -where F: FnMut(T) { +where T: Send + Sync, + F: Fn(T) + Send + Sync { pub fn new(f: F) -> Self { FnObserver { f, @@ -74,11 +84,75 @@ where F: FnMut(T) { } impl Observer for FnObserver -where F: FnMut(T) { +where T: Send + Sync, + F: Fn(T) + Send + Sync { type Msg = T; - fn notify(&mut self, key: T) { - (self.f)(key); + fn notify(&self, msg: T) { + (self.f)(msg); + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +impl View for Option { + type Key = T::Key; + type Value = T::Value; + + fn view(&self, key: T::Key) -> Option { + if let Some(view) = self.as_ref() { + view.view(key) + } else { + None + } + } +} + +impl Observer for Option { + type Msg = T::Msg; + + fn notify(&self, msg: T::Msg) { + if let Some(obs) = self.as_ref() { + obs.notify(msg); + } + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +impl View for std::sync::RwLock { + type Key = T::Key; + type Value = T::Value; + + fn view(&self, key: T::Key) -> Option { + self.read().unwrap().view(key) + } +} + +impl Observer for std::sync::RwLock { + type Msg = T::Msg; + + fn notify(&self, msg: T::Msg) { + self.read().unwrap().notify(msg) + } +} + +use std::ops::Deref; + +impl View for std::sync::Arc { + type Key = T::Key; + type Value = T::Value; + + fn view(&self, key: T::Key) -> Option { + self.deref().view(key) + } +} + +impl Observer for std::sync::Arc { + type Msg = T::Msg; + + fn notify(&self, msg: T::Msg) { + self.deref().notify(msg) } }