From 070a878c75ef3550ffcf4486068a70557803c7c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=94=D0=B0=D0=BC=D1=98=D0=B0=D0=BD=20=D0=93=D0=B5=D0=BE?= =?UTF-8?q?=D1=80=D0=B3=D0=B8=D0=B5=D0=B2=D1=81=D0=BA=D0=B8?= Date: Mon, 14 Sep 2020 23:37:25 +0200 Subject: [PATCH] re-implement io::Write for StreamWriter since the `.wait()` has been removed from futures 0.1 sink::Sink trait, use futures 0.3 `executor::block_on` function to run an async block in the sync environment of `write`. https://docs.rs/futures/0.1.29/futures/sink/trait.Sink.html#method.wait https://docs.rs/futures/0.3.5/futures/executor/fn.block_on.html --- src/threaded_archiver.rs | 34 +++++++++++++++++----------------- 1 file changed, 17 insertions(+), 17 deletions(-) diff --git a/src/threaded_archiver.rs b/src/threaded_archiver.rs index 38755fa..6220c48 100644 --- a/src/threaded_archiver.rs +++ b/src/threaded_archiver.rs @@ -1,7 +1,6 @@ -use bytes; -use futures; -use futures::Sink; +use futures::prelude::*; use tar; +use bytes; use std::io; use std::path::PathBuf; @@ -12,9 +11,8 @@ use std::thread; * don't tar hidden files */ -type Stream = futures::sync::mpsc::Receiver; -type Sender = futures::sync::mpsc::Sender; -type BlockingSender = futures::sink::Wait; +type Stream = futures::channel::mpsc::Receiver; +type Sender = futures::channel::mpsc::Sender; pub fn stream_tar_in_thread(path: PathBuf) -> Stream { let (writer, stream) = StreamWriter::new(64); @@ -31,28 +29,30 @@ pub fn stream_tar_in_thread(path: PathBuf) -> Stream { } struct StreamWriter { - tx: BlockingSender, + tx: Sender, } impl StreamWriter { fn new(size: usize) -> (Self, Stream) { - let (tx, rx) = futures::sync::mpsc::channel(size); - let tx = tx.wait(); + let (tx, rx) = futures::channel::mpsc::channel(size); (StreamWriter { tx }, rx) } } impl io::Write for StreamWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.tx - .send(bytes::Bytes::from(buf)) - .map(|_| buf.len()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + fn write(&mut self, data: &[u8]) -> io::Result { + let len = data.len(); + futures::executor::block_on( + async move { + let buf = bytes::Bytes::copy_from_slice(data); + self.tx.send(buf).await; + } + ); + Ok(len) } fn flush(&mut self) -> io::Result<()> { - self.tx - .flush() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + futures::executor::block_on(self.tx.flush()); + Ok(()) } }