diff --git a/src/threaded_archiver.rs b/src/threaded_archiver.rs index 38755fa..6220c48 100644 --- a/src/threaded_archiver.rs +++ b/src/threaded_archiver.rs @@ -1,7 +1,6 @@ -use bytes; -use futures; -use futures::Sink; +use futures::prelude::*; use tar; +use bytes; use std::io; use std::path::PathBuf; @@ -12,9 +11,8 @@ use std::thread; * don't tar hidden files */ -type Stream = futures::sync::mpsc::Receiver; -type Sender = futures::sync::mpsc::Sender; -type BlockingSender = futures::sink::Wait; +type Stream = futures::channel::mpsc::Receiver; +type Sender = futures::channel::mpsc::Sender; pub fn stream_tar_in_thread(path: PathBuf) -> Stream { let (writer, stream) = StreamWriter::new(64); @@ -31,28 +29,30 @@ pub fn stream_tar_in_thread(path: PathBuf) -> Stream { } struct StreamWriter { - tx: BlockingSender, + tx: Sender, } impl StreamWriter { fn new(size: usize) -> (Self, Stream) { - let (tx, rx) = futures::sync::mpsc::channel(size); - let tx = tx.wait(); + let (tx, rx) = futures::channel::mpsc::channel(size); (StreamWriter { tx }, rx) } } impl io::Write for StreamWriter { - fn write(&mut self, buf: &[u8]) -> io::Result { - self.tx - .send(bytes::Bytes::from(buf)) - .map(|_| buf.len()) - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + fn write(&mut self, data: &[u8]) -> io::Result { + let len = data.len(); + futures::executor::block_on( + async move { + let buf = bytes::Bytes::copy_from_slice(data); + self.tx.send(buf).await; + } + ); + Ok(len) } fn flush(&mut self) -> io::Result<()> { - self.tx - .flush() - .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) + futures::executor::block_on(self.tx.flush()); + Ok(()) } }