Add new tasks to tokio event loop and retry tasks on failure












0















I am trying to write a tokio event loop that can perform get request from the same server, with the following characteristics:




  • A connection pool should be used

  • The get requests are generally slow (>1s), so they need to be performed in parallel

  • The server may not respond so I need a timeout. If the request wasn't received, send it again

  • Poll a receiver for new urls that have to be downloaded. They should be added to the event loop


In my tries so far, I've managed to get different combinations of the 4 items working, but never all together. My main problem is that I don't quite understand how I can add new futures to the tokio event loop.



I assume I need to use loop_fn for the main loop that polls the receiver, and handle.spawn to spawn new tasks? handle.spawn only allows futures of Result<(),()>, so I can't use its output to respawn a job on failure, so I need to move the retry check into that future?



Below is an attempt that accepts and processes urls in batch (so no continuous polling), and has a timeout (but no retry):



fn place_dls(&mut self, reqs: Vec<String>) {
let mut core = Core::new().unwrap();
let handle = core.handle();

let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();

let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
// send with request through an async reqwest client in self
}));

let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
n.into_body().concat2().and_then(|full_body| {
debug!("Received: {:#?}", full_body);

// TODO: how to put the download back in the queue if failure code is received?
})
});

let work = rec_dls.select2(timeout).then(|res| match res {
Ok(Either::A((got, _timeout))) => {
Ok(got)
},
Ok(Either::B((_timeout_error, _get))) => {
// TODO: put back in queue
Err(io::Error::new(
io::ErrorKind::TimedOut,
"Client timed out while connecting",
).into())
}
Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
});

core.run(work);
}


My try with a loop_fn was sadly unsuccessful.










share|improve this question



























    0















    I am trying to write a tokio event loop that can perform get request from the same server, with the following characteristics:




    • A connection pool should be used

    • The get requests are generally slow (>1s), so they need to be performed in parallel

    • The server may not respond so I need a timeout. If the request wasn't received, send it again

    • Poll a receiver for new urls that have to be downloaded. They should be added to the event loop


    In my tries so far, I've managed to get different combinations of the 4 items working, but never all together. My main problem is that I don't quite understand how I can add new futures to the tokio event loop.



    I assume I need to use loop_fn for the main loop that polls the receiver, and handle.spawn to spawn new tasks? handle.spawn only allows futures of Result<(),()>, so I can't use its output to respawn a job on failure, so I need to move the retry check into that future?



    Below is an attempt that accepts and processes urls in batch (so no continuous polling), and has a timeout (but no retry):



    fn place_dls(&mut self, reqs: Vec<String>) {
    let mut core = Core::new().unwrap();
    let handle = core.handle();

    let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();

    let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
    // send with request through an async reqwest client in self
    }));

    let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
    n.into_body().concat2().and_then(|full_body| {
    debug!("Received: {:#?}", full_body);

    // TODO: how to put the download back in the queue if failure code is received?
    })
    });

    let work = rec_dls.select2(timeout).then(|res| match res {
    Ok(Either::A((got, _timeout))) => {
    Ok(got)
    },
    Ok(Either::B((_timeout_error, _get))) => {
    // TODO: put back in queue
    Err(io::Error::new(
    io::ErrorKind::TimedOut,
    "Client timed out while connecting",
    ).into())
    }
    Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
    Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
    });

    core.run(work);
    }


    My try with a loop_fn was sadly unsuccessful.










    share|improve this question

























      0












      0








      0








      I am trying to write a tokio event loop that can perform get request from the same server, with the following characteristics:




      • A connection pool should be used

      • The get requests are generally slow (>1s), so they need to be performed in parallel

      • The server may not respond so I need a timeout. If the request wasn't received, send it again

      • Poll a receiver for new urls that have to be downloaded. They should be added to the event loop


      In my tries so far, I've managed to get different combinations of the 4 items working, but never all together. My main problem is that I don't quite understand how I can add new futures to the tokio event loop.



      I assume I need to use loop_fn for the main loop that polls the receiver, and handle.spawn to spawn new tasks? handle.spawn only allows futures of Result<(),()>, so I can't use its output to respawn a job on failure, so I need to move the retry check into that future?



      Below is an attempt that accepts and processes urls in batch (so no continuous polling), and has a timeout (but no retry):



      fn place_dls(&mut self, reqs: Vec<String>) {
      let mut core = Core::new().unwrap();
      let handle = core.handle();

      let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();

      let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
      // send with request through an async reqwest client in self
      }));

      let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
      n.into_body().concat2().and_then(|full_body| {
      debug!("Received: {:#?}", full_body);

      // TODO: how to put the download back in the queue if failure code is received?
      })
      });

      let work = rec_dls.select2(timeout).then(|res| match res {
      Ok(Either::A((got, _timeout))) => {
      Ok(got)
      },
      Ok(Either::B((_timeout_error, _get))) => {
      // TODO: put back in queue
      Err(io::Error::new(
      io::ErrorKind::TimedOut,
      "Client timed out while connecting",
      ).into())
      }
      Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
      Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
      });

      core.run(work);
      }


      My try with a loop_fn was sadly unsuccessful.










      share|improve this question














      I am trying to write a tokio event loop that can perform get request from the same server, with the following characteristics:




      • A connection pool should be used

      • The get requests are generally slow (>1s), so they need to be performed in parallel

      • The server may not respond so I need a timeout. If the request wasn't received, send it again

      • Poll a receiver for new urls that have to be downloaded. They should be added to the event loop


      In my tries so far, I've managed to get different combinations of the 4 items working, but never all together. My main problem is that I don't quite understand how I can add new futures to the tokio event loop.



      I assume I need to use loop_fn for the main loop that polls the receiver, and handle.spawn to spawn new tasks? handle.spawn only allows futures of Result<(),()>, so I can't use its output to respawn a job on failure, so I need to move the retry check into that future?



      Below is an attempt that accepts and processes urls in batch (so no continuous polling), and has a timeout (but no retry):



      fn place_dls(&mut self, reqs: Vec<String>) {
      let mut core = Core::new().unwrap();
      let handle = core.handle();

      let timeout = Timeout::new(Duration::from_millis(5000), &handle).unwrap();

      let send_dls = stream::iter_ok::<_, reqwest::Error>(reqs.iter().map(|o| {
      // send with request through an async reqwest client in self
      }));

      let rec_dls = send_dls.buffer_unordered(dls.len()).for_each(|n| {
      n.into_body().concat2().and_then(|full_body| {
      debug!("Received: {:#?}", full_body);

      // TODO: how to put the download back in the queue if failure code is received?
      })
      });

      let work = rec_dls.select2(timeout).then(|res| match res {
      Ok(Either::A((got, _timeout))) => {
      Ok(got)
      },
      Ok(Either::B((_timeout_error, _get))) => {
      // TODO: put back in queue
      Err(io::Error::new(
      io::ErrorKind::TimedOut,
      "Client timed out while connecting",
      ).into())
      }
      Err(Either::A((get_error, _timeout))) => Err(get_error.into()),
      Err(Either::B((timeout_error, _get))) => Err(timeout_error.into()),
      });

      core.run(work);
      }


      My try with a loop_fn was sadly unsuccessful.







      rust rust-tokio






      share|improve this question













      share|improve this question











      share|improve this question




      share|improve this question










      asked Nov 16 '18 at 10:09









      Ben RuijlBen Ruijl

      3,03722237




      3,03722237
























          1 Answer
          1






          active

          oldest

          votes


















          1















          I assume I need to use loop_fn for the main loop




          I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver stream consumer instead of a loop.



          It could be viewed as some kind of master process: after receiving an url via Receiver a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender endpoint.



          Here is a working code sketch:



          extern crate futures;
          extern crate tokio;

          use std::{io, time::{Duration, Instant}};
          use futures::{
          Sink,
          Stream,
          stream,
          sync::mpsc,
          future::Future,
          };
          use tokio::{
          runtime::Runtime,
          timer::{Delay, Timeout},
          };

          fn main() -> Result<(), io::Error> {
          let mut rt = Runtime::new()?;
          let executor = rt.executor();

          let (tx, rx) = mpsc::channel(0);
          let master_tx = tx.clone();
          let master = rx.for_each(move |url: String| {
          let download_future = download(&url)
          .map(|_download_contents| {
          // TODO: actually do smth with contents
          ()
          });
          let timeout_future =
          Timeout::new(download_future, Duration::from_millis(2000));
          let job_tx = master_tx.clone();
          let task = timeout_future
          .or_else(move |error| {
          // actually download error or timeout, retry
          println!("retrying {} because of {:?}", url, error);
          job_tx.send(url).map(|_| ()).map_err(|_| ())
          });
          executor.spawn(task);
          Ok(())
          });

          rt.spawn(master);

          let urls = vec![
          "http://url1".to_string(),
          "http://url2".to_string(),
          "http://url3".to_string(),
          ];
          rt.executor()
          .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

          rt.shutdown_on_idle().wait()
          .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
          }

          #[derive(Debug)]
          struct DownloadContents;
          #[derive(Debug)]
          struct DownloadError;

          fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
          // TODO: actually download here

          match url {
          // url2 always fails
          "http://url2" => {
          println!("FAILED downloading: {}", url);
          let future = Delay::new(Instant::now() + Duration::from_millis(1000))
          .map_err(|_| DownloadError)
          .and_then(|()| Err(DownloadError));
          Box::new(future)
          },
          // url3 always timeouts
          "http://url3" => {
          println!("TIMEOUT downloading: {}", url);
          let future = Delay::new(Instant::now() + Duration::from_millis(5000))
          .map_err(|_| DownloadError)
          .and_then(|()| Ok(DownloadContents));
          Box::new(future)
          },
          // everything else succeeds
          _ => {
          println!("SUCCESS downloading: {}", url);
          let future = Delay::new(Instant::now() + Duration::from_millis(50))
          .map_err(|_| DownloadError)
          .and_then(|()| Ok(DownloadContents));
          Box::new(future)
          },
          }
          }





          share|improve this answer
























            Your Answer






            StackExchange.ifUsing("editor", function () {
            StackExchange.using("externalEditor", function () {
            StackExchange.using("snippets", function () {
            StackExchange.snippets.init();
            });
            });
            }, "code-snippets");

            StackExchange.ready(function() {
            var channelOptions = {
            tags: "".split(" "),
            id: "1"
            };
            initTagRenderer("".split(" "), "".split(" "), channelOptions);

            StackExchange.using("externalEditor", function() {
            // Have to fire editor after snippets, if snippets enabled
            if (StackExchange.settings.snippets.snippetsEnabled) {
            StackExchange.using("snippets", function() {
            createEditor();
            });
            }
            else {
            createEditor();
            }
            });

            function createEditor() {
            StackExchange.prepareEditor({
            heartbeatType: 'answer',
            autoActivateHeartbeat: false,
            convertImagesToLinks: true,
            noModals: true,
            showLowRepImageUploadWarning: true,
            reputationToPostImages: 10,
            bindNavPrevention: true,
            postfix: "",
            imageUploader: {
            brandingHtml: "Powered by u003ca class="icon-imgur-white" href="https://imgur.com/"u003eu003c/au003e",
            contentPolicyHtml: "User contributions licensed under u003ca href="https://creativecommons.org/licenses/by-sa/3.0/"u003ecc by-sa 3.0 with attribution requiredu003c/au003e u003ca href="https://stackoverflow.com/legal/content-policy"u003e(content policy)u003c/au003e",
            allowUrls: true
            },
            onDemand: true,
            discardSelector: ".discard-answer"
            ,immediatelyShowMarkdownHelp:true
            });


            }
            });














            draft saved

            draft discarded


















            StackExchange.ready(
            function () {
            StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53335596%2fadd-new-tasks-to-tokio-event-loop-and-retry-tasks-on-failure%23new-answer', 'question_page');
            }
            );

            Post as a guest















            Required, but never shown

























            1 Answer
            1






            active

            oldest

            votes








            1 Answer
            1






            active

            oldest

            votes









            active

            oldest

            votes






            active

            oldest

            votes









            1















            I assume I need to use loop_fn for the main loop




            I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver stream consumer instead of a loop.



            It could be viewed as some kind of master process: after receiving an url via Receiver a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender endpoint.



            Here is a working code sketch:



            extern crate futures;
            extern crate tokio;

            use std::{io, time::{Duration, Instant}};
            use futures::{
            Sink,
            Stream,
            stream,
            sync::mpsc,
            future::Future,
            };
            use tokio::{
            runtime::Runtime,
            timer::{Delay, Timeout},
            };

            fn main() -> Result<(), io::Error> {
            let mut rt = Runtime::new()?;
            let executor = rt.executor();

            let (tx, rx) = mpsc::channel(0);
            let master_tx = tx.clone();
            let master = rx.for_each(move |url: String| {
            let download_future = download(&url)
            .map(|_download_contents| {
            // TODO: actually do smth with contents
            ()
            });
            let timeout_future =
            Timeout::new(download_future, Duration::from_millis(2000));
            let job_tx = master_tx.clone();
            let task = timeout_future
            .or_else(move |error| {
            // actually download error or timeout, retry
            println!("retrying {} because of {:?}", url, error);
            job_tx.send(url).map(|_| ()).map_err(|_| ())
            });
            executor.spawn(task);
            Ok(())
            });

            rt.spawn(master);

            let urls = vec![
            "http://url1".to_string(),
            "http://url2".to_string(),
            "http://url3".to_string(),
            ];
            rt.executor()
            .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

            rt.shutdown_on_idle().wait()
            .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
            }

            #[derive(Debug)]
            struct DownloadContents;
            #[derive(Debug)]
            struct DownloadError;

            fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
            // TODO: actually download here

            match url {
            // url2 always fails
            "http://url2" => {
            println!("FAILED downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(1000))
            .map_err(|_| DownloadError)
            .and_then(|()| Err(DownloadError));
            Box::new(future)
            },
            // url3 always timeouts
            "http://url3" => {
            println!("TIMEOUT downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(5000))
            .map_err(|_| DownloadError)
            .and_then(|()| Ok(DownloadContents));
            Box::new(future)
            },
            // everything else succeeds
            _ => {
            println!("SUCCESS downloading: {}", url);
            let future = Delay::new(Instant::now() + Duration::from_millis(50))
            .map_err(|_| DownloadError)
            .and_then(|()| Ok(DownloadContents));
            Box::new(future)
            },
            }
            }





            share|improve this answer




























              1















              I assume I need to use loop_fn for the main loop




              I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver stream consumer instead of a loop.



              It could be viewed as some kind of master process: after receiving an url via Receiver a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender endpoint.



              Here is a working code sketch:



              extern crate futures;
              extern crate tokio;

              use std::{io, time::{Duration, Instant}};
              use futures::{
              Sink,
              Stream,
              stream,
              sync::mpsc,
              future::Future,
              };
              use tokio::{
              runtime::Runtime,
              timer::{Delay, Timeout},
              };

              fn main() -> Result<(), io::Error> {
              let mut rt = Runtime::new()?;
              let executor = rt.executor();

              let (tx, rx) = mpsc::channel(0);
              let master_tx = tx.clone();
              let master = rx.for_each(move |url: String| {
              let download_future = download(&url)
              .map(|_download_contents| {
              // TODO: actually do smth with contents
              ()
              });
              let timeout_future =
              Timeout::new(download_future, Duration::from_millis(2000));
              let job_tx = master_tx.clone();
              let task = timeout_future
              .or_else(move |error| {
              // actually download error or timeout, retry
              println!("retrying {} because of {:?}", url, error);
              job_tx.send(url).map(|_| ()).map_err(|_| ())
              });
              executor.spawn(task);
              Ok(())
              });

              rt.spawn(master);

              let urls = vec![
              "http://url1".to_string(),
              "http://url2".to_string(),
              "http://url3".to_string(),
              ];
              rt.executor()
              .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

              rt.shutdown_on_idle().wait()
              .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
              }

              #[derive(Debug)]
              struct DownloadContents;
              #[derive(Debug)]
              struct DownloadError;

              fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
              // TODO: actually download here

              match url {
              // url2 always fails
              "http://url2" => {
              println!("FAILED downloading: {}", url);
              let future = Delay::new(Instant::now() + Duration::from_millis(1000))
              .map_err(|_| DownloadError)
              .and_then(|()| Err(DownloadError));
              Box::new(future)
              },
              // url3 always timeouts
              "http://url3" => {
              println!("TIMEOUT downloading: {}", url);
              let future = Delay::new(Instant::now() + Duration::from_millis(5000))
              .map_err(|_| DownloadError)
              .and_then(|()| Ok(DownloadContents));
              Box::new(future)
              },
              // everything else succeeds
              _ => {
              println!("SUCCESS downloading: {}", url);
              let future = Delay::new(Instant::now() + Duration::from_millis(50))
              .map_err(|_| DownloadError)
              .and_then(|()| Ok(DownloadContents));
              Box::new(future)
              },
              }
              }





              share|improve this answer


























                1












                1








                1








                I assume I need to use loop_fn for the main loop




                I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver stream consumer instead of a loop.



                It could be viewed as some kind of master process: after receiving an url via Receiver a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender endpoint.



                Here is a working code sketch:



                extern crate futures;
                extern crate tokio;

                use std::{io, time::{Duration, Instant}};
                use futures::{
                Sink,
                Stream,
                stream,
                sync::mpsc,
                future::Future,
                };
                use tokio::{
                runtime::Runtime,
                timer::{Delay, Timeout},
                };

                fn main() -> Result<(), io::Error> {
                let mut rt = Runtime::new()?;
                let executor = rt.executor();

                let (tx, rx) = mpsc::channel(0);
                let master_tx = tx.clone();
                let master = rx.for_each(move |url: String| {
                let download_future = download(&url)
                .map(|_download_contents| {
                // TODO: actually do smth with contents
                ()
                });
                let timeout_future =
                Timeout::new(download_future, Duration::from_millis(2000));
                let job_tx = master_tx.clone();
                let task = timeout_future
                .or_else(move |error| {
                // actually download error or timeout, retry
                println!("retrying {} because of {:?}", url, error);
                job_tx.send(url).map(|_| ()).map_err(|_| ())
                });
                executor.spawn(task);
                Ok(())
                });

                rt.spawn(master);

                let urls = vec![
                "http://url1".to_string(),
                "http://url2".to_string(),
                "http://url3".to_string(),
                ];
                rt.executor()
                .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

                rt.shutdown_on_idle().wait()
                .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
                }

                #[derive(Debug)]
                struct DownloadContents;
                #[derive(Debug)]
                struct DownloadError;

                fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
                // TODO: actually download here

                match url {
                // url2 always fails
                "http://url2" => {
                println!("FAILED downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(1000))
                .map_err(|_| DownloadError)
                .and_then(|()| Err(DownloadError));
                Box::new(future)
                },
                // url3 always timeouts
                "http://url3" => {
                println!("TIMEOUT downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(5000))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
                Box::new(future)
                },
                // everything else succeeds
                _ => {
                println!("SUCCESS downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(50))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
                Box::new(future)
                },
                }
                }





                share|improve this answer














                I assume I need to use loop_fn for the main loop




                I'd suggest slightly another approach: implement futures::sync::mpsc::Receiver stream consumer instead of a loop.



                It could be viewed as some kind of master process: after receiving an url via Receiver a tokio task could be spawned for contents downloading. Then there will be no problem with retrying: just send the failed or timed out url again to master channel via its Sender endpoint.



                Here is a working code sketch:



                extern crate futures;
                extern crate tokio;

                use std::{io, time::{Duration, Instant}};
                use futures::{
                Sink,
                Stream,
                stream,
                sync::mpsc,
                future::Future,
                };
                use tokio::{
                runtime::Runtime,
                timer::{Delay, Timeout},
                };

                fn main() -> Result<(), io::Error> {
                let mut rt = Runtime::new()?;
                let executor = rt.executor();

                let (tx, rx) = mpsc::channel(0);
                let master_tx = tx.clone();
                let master = rx.for_each(move |url: String| {
                let download_future = download(&url)
                .map(|_download_contents| {
                // TODO: actually do smth with contents
                ()
                });
                let timeout_future =
                Timeout::new(download_future, Duration::from_millis(2000));
                let job_tx = master_tx.clone();
                let task = timeout_future
                .or_else(move |error| {
                // actually download error or timeout, retry
                println!("retrying {} because of {:?}", url, error);
                job_tx.send(url).map(|_| ()).map_err(|_| ())
                });
                executor.spawn(task);
                Ok(())
                });

                rt.spawn(master);

                let urls = vec![
                "http://url1".to_string(),
                "http://url2".to_string(),
                "http://url3".to_string(),
                ];
                rt.executor()
                .spawn(tx.send_all(stream::iter_ok(urls)).map(|_| ()).map_err(|_| ()));

                rt.shutdown_on_idle().wait()
                .map_err(|()| io::Error::new(io::ErrorKind::Other, "shutdown failure"))
                }

                #[derive(Debug)]
                struct DownloadContents;
                #[derive(Debug)]
                struct DownloadError;

                fn download(url: &str) -> Box<Future<Item = DownloadContents, Error = DownloadError> + Send> {
                // TODO: actually download here

                match url {
                // url2 always fails
                "http://url2" => {
                println!("FAILED downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(1000))
                .map_err(|_| DownloadError)
                .and_then(|()| Err(DownloadError));
                Box::new(future)
                },
                // url3 always timeouts
                "http://url3" => {
                println!("TIMEOUT downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(5000))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
                Box::new(future)
                },
                // everything else succeeds
                _ => {
                println!("SUCCESS downloading: {}", url);
                let future = Delay::new(Instant::now() + Duration::from_millis(50))
                .map_err(|_| DownloadError)
                .and_then(|()| Ok(DownloadContents));
                Box::new(future)
                },
                }
                }






                share|improve this answer












                share|improve this answer



                share|improve this answer










                answered Nov 24 '18 at 21:38









                swizardswizard

                1,670718




                1,670718
































                    draft saved

                    draft discarded




















































                    Thanks for contributing an answer to Stack Overflow!


                    • Please be sure to answer the question. Provide details and share your research!

                    But avoid



                    • Asking for help, clarification, or responding to other answers.

                    • Making statements based on opinion; back them up with references or personal experience.


                    To learn more, see our tips on writing great answers.




                    draft saved


                    draft discarded














                    StackExchange.ready(
                    function () {
                    StackExchange.openid.initPostLogin('.new-post-login', 'https%3a%2f%2fstackoverflow.com%2fquestions%2f53335596%2fadd-new-tasks-to-tokio-event-loop-and-retry-tasks-on-failure%23new-answer', 'question_page');
                    }
                    );

                    Post as a guest















                    Required, but never shown





















































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown

































                    Required, but never shown














                    Required, but never shown












                    Required, but never shown







                    Required, but never shown







                    Popular posts from this blog

                    Florida Star v. B. J. F.

                    Danny Elfman

                    Retrieve a Users Dashboard in Tumblr with R and TumblR. Oauth Issues