use futures::sync::mpsc::channel() with 4MB buffer

… instead of unbounded() with no limits in buffering

this should implement a back pressure, when the client can't read the
response fast enough, the thread creating the archive would block the
write, instead of filling the memory.

Use Sink::wait() to make a blocking Sender
This commit is contained in:
Дамјан Георгиевски 2018-07-07 15:22:18 +02:00
parent 62a99f8dc3
commit 803ffc0c99
3 changed files with 33 additions and 32 deletions

View file

@ -1,6 +1,6 @@
[package]
name = "http-server"
version = "0.2.0"
version = "0.3.0"
authors = ["Damjan Georgievski <gdamjan@gmail.com>"]
license = "MIT"
readme = "README.md"
@ -8,9 +8,9 @@ readme = "README.md"
[dependencies]
actix = "*"
actix-web = { git = "https://github.com/actix/actix-web.git" }
bytes = "0.4"
futures = "0.1"
tar = "0.4"
bytes = "0.4"
percent-encoding = "1.0"
htmlescape = "0.3"

View file

@ -1,7 +1,7 @@
a simple http server like `python -m http.server` but:
* written in rust with actix, should be faster
* allow concurency
* allow concurrency
* download whole directories in .tar format
* better auto index
* maybe announce itself on mDNS (avahi)

View file

@ -1,4 +1,5 @@
use futures;
use futures::Sink;
use bytes;
use tar;
@ -6,52 +7,52 @@ use std::thread;
use std::path::PathBuf;
use std::io;
pub fn run_tar_in_thread(path: PathBuf) -> futures::sync::mpsc::UnboundedReceiver<bytes::Bytes> {
let (writer, stream) = MpscWriter::new();
/*
* TODO:
* don't tar hidden files
*/
type Stream = futures::sync::mpsc::Receiver<bytes::Bytes>;
type Sender = futures::sync::mpsc::Sender<bytes::Bytes>;
type BlockingSender = futures::sink::Wait<Sender>;
pub fn run_tar_in_thread(path: PathBuf) -> Stream {
let (writer, stream) = StreamWriter::new(4 * 1024 * 1024);
thread::spawn(move || {
let mut a = tar::Builder::new(writer);
let last_path_component = path.file_name().unwrap();
a.mode(tar::HeaderMode::Deterministic);
a.append_dir_all(last_path_component, &path);
a.finish();
a.append_dir_all(last_path_component, &path)
.unwrap_or_else(|e| println!("{}", e));
a.finish()
.unwrap_or_else(|e| println!("{}", e));
});
stream
}
/*
* TODO:
*
* there are 2 features important about futures::sync::mpsc
* - it works with tokio (and so with actix), so the stream is async friendly
* - it can be sent across threads (more importantly, the tx part)
* cons:
* futures::sync::mpsc::unbounded() is unbounded, which means the tar thread will
* just push everything in memory as fast as it can (as cpu allows).
* a better implementation would use a bounded channel, so that the thread would block
* if the async core can't send data from the stream fast enough, and wouldn't fill up
* GBs of memory. Alas, there doesn't seem to be a bounded channel compatible
* with futures at this time (05-07-2018, but pending work on futures 0.3 might help).
*/
struct MpscWriter {
tx: futures::sync::mpsc::UnboundedSender<bytes::Bytes>
struct StreamWriter {
tx: BlockingSender
}
impl MpscWriter {
fn new() -> (Self, futures::sync::mpsc::UnboundedReceiver<bytes::Bytes>) {
let (tx, rx) = futures::sync::mpsc::unbounded();
(MpscWriter{tx:tx}, rx)
impl StreamWriter {
fn new(size: usize) -> (Self, Stream) {
let (tx, rx) = futures::sync::mpsc::channel(size);
let tx = tx.wait();
(StreamWriter{tx:tx}, rx)
}
}
impl io::Write for MpscWriter {
impl io::Write for StreamWriter {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.tx.unbounded_send(bytes::Bytes::from(buf));
Ok(buf.len())
self.tx.send(bytes::Bytes::from(buf))
.map(|_| buf.len())
.map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
fn flush(&mut self) -> io::Result<()> {
Ok(())
self.tx.flush().map_err(|e| io::Error::new(io::ErrorKind::Other, e))
}
}