From 9faaf8e38ef38d16dce7567742d68c2cb60747dc Mon Sep 17 00:00:00 2001 From: Michael Sippel Date: Wed, 9 Dec 2020 12:51:43 +0100 Subject: [PATCH] ChannelReceiver: add recv() function to receive the complete chunk instead of iterating --- src/channel.rs | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/src/channel.rs b/src/channel.rs index e6bd5ac..8416416 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -121,6 +121,31 @@ impl Drop for ChannelSender { //<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> +impl ChannelReceiver { + pub async fn recv(&self) -> Option { + ChannelRead(self.0.clone()).await + } +} + +struct ChannelRead(Arc>>); +impl std::future::Future for ChannelRead { + type Output = Option; + + fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll { + let mut state = self.0.lock().unwrap(); + if let Some(buf) = state.send_buf.take() { + Poll::Ready(Some(buf)) + } else if state.num_senders == 0 { + Poll::Ready(None) + } else { + state.waker = Some(cx.waker().clone()); + Poll::Pending + } + } +} + +//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>> + impl Stream for ChannelReceiver { type Item = Data::Item;