From 166e75a5c7409ff4a4588ca43b9625967f37bcf7 Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Fri, 4 Dec 2020 20:38:51 +0100 Subject: [PATCH] initial commit --- Cargo.toml | 14 ++++ src/channel.rs | 194 +++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 42 +++++++++++ src/port.rs | 117 +++++++++++++++++++++++++++++ src/view.rs | 84 +++++++++++++++++++++ 5 files changed, 451 insertions(+) create mode 100644 Cargo.toml create mode 100644 src/channel.rs create mode 100644 src/main.rs create mode 100644 src/port.rs create mode 100644 src/view.rs diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..5ff9d27 --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,14 @@ + +[package] +authors = ["Michael Sippel "] +edition = "2018" +name = "NeStEd" +version = "0.1.0" + +[dependencies] +cgmath = "*" +termion = "*" + +[dependencies.async-std] +version = "1.7.0" +features = ["unstable", "attributes"] diff --git a/src/channel.rs b/src/channel.rs new file mode 100644 index 0000000..4d09190 --- /dev/null +++ b/src/channel.rs @@ -0,0 +1,194 @@ +use { + core::{ + task::{Poll, Context, Waker}, + pin::Pin + }, + std::{ + sync::{Arc, Mutex}, + collections::HashSet, + hash::Hash + }, + async_std::{ + stream::Stream + }, + + crate::{ + view::{View, Observer} + } +}; + + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Traits +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +pub trait ChannelData : Default + IntoIterator { + fn channel_insert(&mut self, x: Self::Item); +} + + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Queue Channel +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +impl ChannelData for Vec { + fn channel_insert(&mut self, x: T) { + self.push(x); + } +} + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Set Channel +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +impl ChannelData for HashSet { + fn channel_insert(&mut self, x: T) { + self.insert(x); + } +} + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Singleton Channel +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +pub struct SingletonChannel(Option); + +impl Default for SingletonChannel { + fn default() -> Self { + SingletonChannel(None) + } +} + +impl IntoIterator for SingletonChannel { + type Item = T; + type IntoIter = std::iter::Once; + + fn into_iter(mut self) -> Self::IntoIter { + std::iter::once(self.0.take().unwrap()) + } +} + +impl ChannelData for SingletonChannel { + fn channel_insert(&mut self, x: T) { + self.0 = Some(x); + } +} + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Channel +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +struct ChannelState { + send_buf: Option, + recv_iter: Option, + num_senders: usize, + waker: Option +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +pub struct ChannelSender(Arc>>); +pub struct ChannelReceiver(Arc>>); + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +impl Observer for ChannelSender { + type Msg = Data::Item; + + fn notify(&mut self, msg: Data::Item) { + let mut state = self.0.lock().unwrap(); + + if state.send_buf.is_none() { + state.send_buf = Some(Data::default()); + } + + state.send_buf.as_mut().unwrap().channel_insert(msg); + + if let Some(waker) = state.waker.take() { + waker.wake(); + } + } +} + +impl Clone for ChannelSender { + fn clone(&self) -> Self { + self.0.lock().unwrap().num_senders += 1; + ChannelSender(self.0.clone()) + } +} + +impl Drop for ChannelSender { + fn drop(&mut self) { + let mut state = self.0.lock().unwrap(); + state.num_senders -= 1; + if let Some(waker) = state.waker.take() { + waker.wake(); + } + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +impl Stream for ChannelReceiver { + type Item = Data::Item; + + fn poll_next( + self: Pin<&mut Self>, + cx: &mut Context<'_> + ) -> Poll> { + let mut state = self.0.lock().unwrap(); + + if let Some(recv_iter) = state.recv_iter.as_mut() { + if let Some(val) = recv_iter.next() { + return Poll::Ready(Some(val)) + } else { + state.recv_iter = None + } + } + + if let Some(send_buf) = state.send_buf.take() { + state.recv_iter = Some(send_buf.into_iter()); + // recv_iter.next() is guaranteed to be Some(x) + Poll::Ready(state.recv_iter.as_mut().unwrap().next()) + } else if state.num_senders == 0 { + Poll::Ready(None) + } else { + state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Factory Functions +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ +pub fn channel() -> (ChannelSender, ChannelReceiver) { + let state = Arc::new(Mutex::new(ChannelState{ + send_buf: None, + recv_iter: None, + num_senders: 1, + waker: None + })); + + (ChannelSender(state.clone()), ChannelReceiver(state)) +} + +pub fn set_channel() -> (ChannelSender>, ChannelReceiver>) { + channel::>() +} + +pub fn queue_channel() -> (ChannelSender>, ChannelReceiver>) { + channel::>() +} + +pub fn singleton_channel() -> (ChannelSender>, ChannelReceiver>) { + channel::>() +} + diff --git a/src/main.rs b/src/main.rs new file mode 100644 index 0000000..ec1481b --- /dev/null +++ b/src/main.rs @@ -0,0 +1,42 @@ + +#![feature(trait_alias)] + +pub mod view; +pub mod port; +pub mod channel; + +use async_std::task; +use async_std::prelude::*; +use async_std::stream; + +use std::sync::{Arc, RwLock}; +use cgmath::Vector2; + +use view::{View, Observer}; +use port::{ViewPortIn, ViewPortOut}; + +#[async_std::main] +async fn main() { + let (view_in, mut view_out) = port::view_port::(); + + let mut observer_stream = view_in.stream().map({ + let view = view_in.clone(); + move |idx| (idx, view.view(idx).unwrap()) + }); + + let fut = task::spawn(async move { + while let Some((idx, val)) = observer_stream.next().await { + println!("view[{}] = {}", idx, val); + } + }); + + view_out.set_view_fn(|idx| Some(if idx % 2 == 0 { 'λ' } else { 'y' }) ); + + view_out.notify(1); + view_out.notify(2); + view_out.notify(5); + + fut.await; +} + + diff --git a/src/port.rs b/src/port.rs new file mode 100644 index 0000000..e1eb058 --- /dev/null +++ b/src/port.rs @@ -0,0 +1,117 @@ + +use std::{ + sync::{Arc, Weak, RwLock}, + collections::HashSet, + hash::Hash +}; +use crate::{ + view::{View, Observer, FnView, FnObserver}, + channel::{ChannelReceiver} +}; + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + View Port +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ + +pub struct ViewPort { + view: Option + Send + Sync>>, + observers: Vec + Send + Sync>>> +} + +pub fn view_port() -> (ViewPortIn, ViewPortOut) { + let state = Arc::new(RwLock::new(ViewPort{ view: None, observers: Vec::new() })); + (ViewPortIn(state.clone()), ViewPortOut(state)) +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +#[derive(Clone)] +pub struct ViewPortIn(Arc>>); +impl ViewPortIn { + pub fn add_observer(&self, observer: Arc + Send + Sync>>) { + self.0 + .write().unwrap() + .observers + .push(observer); + } + + pub fn add_observer_fn(&self, obs_fn: impl FnMut(K) + Send + Sync + 'static) { + self.add_observer(Arc::new(RwLock::new(FnObserver::new(obs_fn)))); + } +} + +impl ViewPortIn { + pub fn stream(&self) -> ChannelReceiver> { + let (s, r) = crate::channel::set_channel(); + self.add_observer(Arc::new(RwLock::new(s))); + r + } +} + +impl View for ViewPortIn { + type Key = K; + type Value = V; + + fn view(&self, key: K) -> Option { + if let Some(view) = self.0.read().unwrap().view.as_ref() { + view.view(key) + } else { + println!("Warning: trying to access InPort with uninitialized View!"); + None + } + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +pub struct ViewPortOut(Arc>>); +impl ViewPortOut { + pub fn set_view(&self, view: Arc + Send + Sync>) { + self.0.write().unwrap().view = Some(view); + } + + pub fn set_view_fn(&self, view_fn: impl Fn(K) -> Option + Send + Sync + 'static) { + self.set_view(Arc::new(FnView::new(view_fn))) + } +} + +impl Observer for ViewPortOut +where K: Clone { + type Msg = K; + + fn notify(&mut self, msg: K) { + for observer in self.0.read().unwrap().observers.iter() { + observer.write().unwrap().notify(msg.clone()); + } + } +} + + /*\ +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + Stream Port +<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + \*/ + +/* +pub struct StreamPort { + actions: Vec>> +} + +impl StreamPort { + async fn set_stream(&self, stream: impl Stream) -> impl Future<()> { + for msg in stream.next().await.unwrap() { + for act in self.actions.iter() { + (*act.lock().unwrap())(msg); + } + } + } + + fn add_action(&self, action: impl FnMut(T)) { + self.actions.push(Arc::new(Mutex::new(action))) + } +} + */ + + diff --git a/src/view.rs b/src/view.rs new file mode 100644 index 0000000..798c82c --- /dev/null +++ b/src/view.rs @@ -0,0 +1,84 @@ + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +pub trait View { + type Key; + type Value; + + fn view(&self, key: Self::Key) -> Option; +} + +pub trait Observer { + type Msg; + + fn notify(&mut self, key: Self::Msg); +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +use cgmath::Vector2; + +pub trait SingletonView = View; +pub trait SingletonObserver = Observer; + +pub trait SequenceView = View; +pub trait SequenceObserver = Observer; + +pub trait GridView = View>; +pub trait GridObserver = Observer>; + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +pub struct FnView Option> { + f: F, + _phantom0: std::marker::PhantomData, + _phantom1: std::marker::PhantomData +} + +impl FnView +where F: Fn(K) -> Option { + pub fn new(f: F) -> Self { + FnView { + f, + _phantom0: std::marker::PhantomData, + _phantom1: std::marker::PhantomData + } + } +} + +impl View for FnView +where F: Fn(K) -> Option { + type Key = K; + type Value = V; + + fn view(&self, key: K) -> Option { + (self.f)(key) + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + +pub struct FnObserver { + f: F, + _phantom: std::marker::PhantomData +} + +impl FnObserver +where F: FnMut(T) { + pub fn new(f: F) -> Self { + FnObserver { + f, + _phantom: std::marker::PhantomData + } + } +} + +impl Observer for FnObserver +where F: FnMut(T) { + type Msg = T; + + fn notify(&mut self, key: T) { + (self.f)(key); + } +} +