ChannelReceiver: add recv() function to receive the complete chunk instead of iterating
This commit is contained in:
parent
4988db36e8
commit
9faaf8e38e
1 changed files with 25 additions and 0 deletions
|
@ -121,6 +121,31 @@ impl<Data: ChannelData> Drop for ChannelSender<Data> {
|
||||||
|
|
||||||
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
|
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
|
||||||
|
|
||||||
|
impl<Data: ChannelData> ChannelReceiver<Data> {
|
||||||
|
pub async fn recv(&self) -> Option<Data> {
|
||||||
|
ChannelRead(self.0.clone()).await
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
struct ChannelRead<Data: ChannelData>(Arc<Mutex<ChannelState<Data>>>);
|
||||||
|
impl<Data: ChannelData> std::future::Future for ChannelRead<Data> {
|
||||||
|
type Output = Option<Data>;
|
||||||
|
|
||||||
|
fn poll(self: Pin<&mut Self>, cx: &mut Context) -> Poll<Self::Output> {
|
||||||
|
let mut state = self.0.lock().unwrap();
|
||||||
|
if let Some(buf) = state.send_buf.take() {
|
||||||
|
Poll::Ready(Some(buf))
|
||||||
|
} else if state.num_senders == 0 {
|
||||||
|
Poll::Ready(None)
|
||||||
|
} else {
|
||||||
|
state.waker = Some(cx.waker().clone());
|
||||||
|
Poll::Pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//<<<<>>>><<>><><<>><<<*>>><<>><><<>><<<<>>>>
|
||||||
|
|
||||||
impl<Data: ChannelData> Stream for ChannelReceiver<Data> {
|
impl<Data: ChannelData> Stream for ChannelReceiver<Data> {
|
||||||
type Item = Data::Item;
|
type Item = Data::Item;
|
||||||
|
|
||||||
|
|
Loading…
Reference in a new issue