Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Primes example... slower with more workers? #479

Open
davideger opened this issue Aug 12, 2022 · 1 comment
Open

Primes example... slower with more workers? #479

davideger opened this issue Aug 12, 2022 · 1 comment

Comments

@davideger
Copy link

Following along here:

https://timelydataflow.github.io/timely-dataflow/chapter_1/chapter_1_1.html

I've downloaded a fresh copy of timely and find when I run the "parallel primes" program, timely gets much slower with more workers, opposite of what I'd expect. Is there a recent regression with Timely?

``
$ time cargo run --release --example primes -- -w16 > output16.txt
Finished release [optimized + debuginfo] target(s) in 0.02s
Running target/release/examples/primes -w16

real 1m9.235s
user 15m27.958s
sys 0m1.987s

$ time cargo run --release --example primes -- -w1 > output1.txt
Finished release [optimized + debuginfo] target(s) in 0.02s
Running target/release/examples/primes -w1

real 0m0.279s
user 0m0.204s
sys 0m0.076s

``

#![allow(unused_variables)]
extern crate timely;

use timely::dataflow::{InputHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();

        // create a new input, exchange data, and inspect its output
        let probe = worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect( //move |x| println!("worker {}:\thello {}", index, x))
                          |x| {
                              let limit = (*x as f64).sqrt() as u64;
                              if *x > 1 && (2 .. limit + 1).all(|i| x % i > 0) {
                                  // why can't i capture index?
                                  println!("{} is prime", x);
                              }
                          })
                 .probe();
        });

        // introduce data and watch!
        for round in 0..200000 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
        }
    }).unwrap();
}
@frankmcsherry
Copy link
Member

frankmcsherry commented Aug 13, 2022

What's happening is that you are overwhelming the control plane with this line:

            input.advance_to(round + 1);

For each individual record you put in, you advance the round timestamp which introduces coordination information. This can be valuable if you want to be able to track the progress of the computation record by record, but it is expensive to do that (and quadratic with the number of workers).

If you comment it out, and crank the rounds up to 2M, I see at least

cargo run --release --example primes -- -w1  0.82s user 0.13s system 86% cpu 1.091 total
cargo run --release --example primes -- -w4  0.65s user 0.05s system 130% cpu 0.539 total

If you go up again to 20M, it looks like

cargo run --release --example primes -- -w1  15.24s user 0.83s system 98% cpu 16.271 total
cargo run --release --example primes -- -w4  13.62s user 0.17s system 198% cpu 6.935 total

At this point, you would probably go faster with some synchronization: the code puts 20M records into the queue and probably introduces some backlog. Only introducing .. idk .. say 1M at a time and then coordinating the workers (not introducing more records until the probe has advanced to the next time).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants