diff --git a/Cargo.toml b/Cargo.toml index 879ac22..31b2e66 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "http-server" -version = "0.2.0" +version = "0.3.0" authors = ["Damjan Georgievski "] license = "MIT" readme = "README.md" @@ -8,9 +8,9 @@ readme = "README.md" [dependencies] actix = "*" actix-web = { git = "https://github.com/actix/actix-web.git" } +bytes = "0.4" futures = "0.1" tar = "0.4" -bytes = "0.4" percent-encoding = "1.0" htmlescape = "0.3" diff --git a/README.md b/README.md index 19bda43..92a67ab 100644 --- a/README.md +++ b/README.md @@ -1,7 +1,7 @@ a simple http server like `python -m http.server` but: * written in rust with actix, should be faster -* allow concurency +* allow concurrency * download whole directories in .tar format * better auto index * maybe announce itself on mDNS (avahi) diff --git a/src/channel.rs b/src/channel.rs index 24bcb64..e748294 100644 --- a/src/channel.rs +++ b/src/channel.rs @@ -1,4 +1,5 @@ use futures; +use futures::Sink; use bytes; use tar; @@ -6,52 +7,52 @@ use std::thread; use std::path::PathBuf; use std::io; -pub fn run_tar_in_thread(path: PathBuf) -> futures::sync::mpsc::UnboundedReceiver { - let (writer, stream) = MpscWriter::new(); + +/* + * TODO: + * don't tar hidden files + */ + +type Stream = futures::sync::mpsc::Receiver; +type Sender = futures::sync::mpsc::Sender; +type BlockingSender = futures::sink::Wait; + + +pub fn run_tar_in_thread(path: PathBuf) -> Stream { + let (writer, stream) = StreamWriter::new(4 * 1024 * 1024); thread::spawn(move || { let mut a = tar::Builder::new(writer); let last_path_component = path.file_name().unwrap(); a.mode(tar::HeaderMode::Deterministic); - a.append_dir_all(last_path_component, &path); - a.finish(); + a.append_dir_all(last_path_component, &path) + .unwrap_or_else(|e| println!("{}", e)); + a.finish() + .unwrap_or_else(|e| println!("{}", e)); }); stream } - -/* - * TODO: - * - * there are 2 features important about futures::sync::mpsc - * - it works with tokio (and so with actix), so the stream is async friendly - * - it can be sent across threads (more importantly, the tx part) - * cons: - * futures::sync::mpsc::unbounded() is unbounded, which means the tar thread will - * just push everything in memory as fast as it can (as cpu allows). - * a better implementation would use a bounded channel, so that the thread would block - * if the async core can't send data from the stream fast enough, and wouldn't fill up - * GBs of memory. Alas, there doesn't seem to be a bounded channel compatible - * with futures at this time (05-07-2018, but pending work on futures 0.3 might help). - */ -struct MpscWriter { - tx: futures::sync::mpsc::UnboundedSender +struct StreamWriter { + tx: BlockingSender } -impl MpscWriter { - fn new() -> (Self, futures::sync::mpsc::UnboundedReceiver) { - let (tx, rx) = futures::sync::mpsc::unbounded(); - (MpscWriter{tx:tx}, rx) +impl StreamWriter { + fn new(size: usize) -> (Self, Stream) { + let (tx, rx) = futures::sync::mpsc::channel(size); + let tx = tx.wait(); + (StreamWriter{tx:tx}, rx) } } -impl io::Write for MpscWriter { +impl io::Write for StreamWriter { fn write(&mut self, buf: &[u8]) -> io::Result { - self.tx.unbounded_send(bytes::Bytes::from(buf)); - Ok(buf.len()) + self.tx.send(bytes::Bytes::from(buf)) + .map(|_| buf.len()) + .map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } fn flush(&mut self) -> io::Result<()> { - Ok(()) + self.tx.flush().map_err(|e| io::Error::new(io::ErrorKind::Other, e)) } }