re-implement io::Write for StreamWriter

since the `.wait()` has been removed from futures 0.1 sink::Sink trait,
use futures 0.3 `executor::block_on` function to run an async block in
the sync environment of `write`.

https://docs.rs/futures/0.1.29/futures/sink/trait.Sink.html#method.wait
https://docs.rs/futures/0.3.5/futures/executor/fn.block_on.html
This commit is contained in:
Дамјан Георгиевски 2020-09-14 23:37:25 +02:00
parent 4b25b26bb2
commit 070a878c75

View file

@ -1,7 +1,6 @@
use bytes; use futures::prelude::*;
use futures;
use futures::Sink;
use tar; use tar;
use bytes;
use std::io; use std::io;
use std::path::PathBuf; use std::path::PathBuf;
@ -12,9 +11,8 @@ use std::thread;
* don't tar hidden files * don't tar hidden files
*/ */
type Stream = futures::sync::mpsc::Receiver<bytes::Bytes>; type Stream = futures::channel::mpsc::Receiver<bytes::Bytes>;
type Sender = futures::sync::mpsc::Sender<bytes::Bytes>; type Sender = futures::channel::mpsc::Sender<bytes::Bytes>;
type BlockingSender = futures::sink::Wait<Sender>;
pub fn stream_tar_in_thread(path: PathBuf) -> Stream { pub fn stream_tar_in_thread(path: PathBuf) -> Stream {
let (writer, stream) = StreamWriter::new(64); let (writer, stream) = StreamWriter::new(64);
@ -31,28 +29,30 @@ pub fn stream_tar_in_thread(path: PathBuf) -> Stream {
} }
struct StreamWriter { struct StreamWriter {
tx: BlockingSender, tx: Sender,
} }
impl StreamWriter { impl StreamWriter {
fn new(size: usize) -> (Self, Stream) { fn new(size: usize) -> (Self, Stream) {
let (tx, rx) = futures::sync::mpsc::channel(size); let (tx, rx) = futures::channel::mpsc::channel(size);
let tx = tx.wait();
(StreamWriter { tx }, rx) (StreamWriter { tx }, rx)
} }
} }
impl io::Write for StreamWriter { impl io::Write for StreamWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> { fn write(&mut self, data: &[u8]) -> io::Result<usize> {
self.tx let len = data.len();
.send(bytes::Bytes::from(buf)) futures::executor::block_on(
.map(|_| buf.len()) async move {
.map_err(|e| io::Error::new(io::ErrorKind::Other, e)) let buf = bytes::Bytes::copy_from_slice(data);
self.tx.send(buf).await;
}
);
Ok(len)
} }
fn flush(&mut self) -> io::Result<()> { fn flush(&mut self) -> io::Result<()> {
self.tx futures::executor::block_on(self.tx.flush());
.flush() Ok(())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
} }
} }