Tuesday 19 July 2016

[Java 8 / Threads / Parallel stream] How to control pool size while using parallel stream ?

Recently I had to implement huge functionality which among other things is responsible for automatic buying of products available in some shop. The api allows to put many products into single request but due to some legal issues the items have to be bought one by one. So if someone wants to buy:
Product: Witcher 3, quantity: 15
Product: GTA 5, quantity: 5
I have to make 20 requests. It's a soap endpoint so it takes lots of time. Consider the following method:
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String orderId, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes = orderEntriesChunks.stream()
                .map(chunk -> Try.ofFailable(() -> buy(orderEntries, orderId, language))
                                 .whenFailure(t -> log().error("Something went wrong while making order", t))
                                 .orElse(markCodesAsFailed(chunk)))
                .flatMap(Collection::stream)
                .collect(toList());

        return new OrderResult(boughtCodes, orderId);
   }
It splits the order into chunks (one item per chunk) and buys it. buy() method calls soap endpoint which returns the code. When I try to buy 50 codes it takes one minute to complete the order. It's way too long so my first thought was: replace stream() with parallelStream(). And it actually works :)
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String orderId, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes = orderEntriesChunks.parallelStream()
                .map(chunk -> Try.ofFailable(() -> buy(orderEntries, orderId, language))
                                 .whenFailure(t -> log().error("Something went wrong while making order", t))
                                 .orElse(markCodesAsFailed(chunk)))
                .flatMap(Collection::stream)
                .collect(toList());

        return new OrderResult(boughtCodes, orderId);
   }
I'm trying to buy 50 codes so buy() method is being invoked 50 times. For stream() I get: 61.3s For parallelStream() I get: 10.21s 10 seconds is a huge improvement but it's still very long so my second thought was to increase the number of threads in pool. parallelStream() is ok but there's no overloaded parallelStream(int threads) method. Since Java 7 fork / join framework is available directly in the JDK. Parallel stream utilizes the framework in order to perform operations on stream's elements using multiple threads. When you look into ForkJoinPool class you'll see that default construvtor sets default number of threads (parallelism parameter) like that:
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
I takes minimum(availableProcessors, 0x7fff) where 0x7fff = 32767 You'll typically get here min(8, 32767) = 8. Let's make some test.
    public static void main(String [] args) {
        final Set<Object> threadNames = IntStream.range(0, 10).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return i; }))
                .map(i -> Thread.currentThread().getName())
                .collect(toSet());
        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
It prints:
4
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Note that peek operation which sleeps for a second has been added to make operations longer so all the threads are being used. Let's try to increase number of threads in the pool using ForkJoinPool.
    public static void main(String [] args) throws ExecutionException, InterruptedException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool(20);
        final Set<String> threadNames = forkJoinPool.submit(() -> IntStream.range(0, 20).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return true; }).toOptional())
                .map(i -> Thread.currentThread().getName())
                .collect(toSet())).get();

        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
This one prints:
20
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-30, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-23, ForkJoinPool-1-worker-12, ForkJoinPool-1-worker-22, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-20, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-2, ForkJoinPool-1-worker-16, ForkJoinPool-1-worker-27, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-26, ForkJoinPool-1-worker-25, ForkJoinPool-1-worker-19, ForkJoinPool-1-worker-18, ForkJoinPool-1-worker-29]
As you can see number of threads involved in resolving stream's output has been increased to 20. It takes only one additional line because you have to create ForkJoinPool object. I'd really like to get rid of that line so I don't have to remember about ForkJoinPool so I've created this class:
/**
 * @author Grzegorz Taramina
 *         Created on: 18/07/16
 */
public class ForkJoinPoolInvoker {
    private final ForkJoinPool forkJoinPool;

    public static ForkJoinPoolInvoker usePoolWithSize(final int poolSize) {
        return new ForkJoinPoolInvoker(poolSize);
    }

    private ForkJoinPoolInvoker(final int poolSize) {
        this.forkJoinPool = new ForkJoinPool(poolSize);
    }

    public <T> T andInvoke(final Callable<T> task) {
        final ForkJoinTask<T> submit = forkJoinPool.submit(task);
        return Try.ofFailable(submit::get).orElseThrow(RuntimeException::new);
    }
}
Now the previous example would lool like that:
    public static void main(String [] args) throws ExecutionException, InterruptedException {
        final Set<Object> threadNames = usePoolWithSize(20).andInvoke(() -> IntStream.range(0, 20).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return true; }).toOptional())
                .map(i -> Thread.currentThread().getName())
                .collect(toSet()));

        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
Let's get back to the main example that buys codes:
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes =  usePoolWithSize(nexwaySettings.getNumberOfBuyingThreads())
                                                        .andInvoke(() ->
             orderEntriesChunks.stream()
                .parallel()
                .map(chunk -> {
                    final String orderId = uuidProvider.randomUUID();
                    return Try.ofFailable(() -> buy(chunk, orderId, language))
                            .whenFailure(t -> log().error("Something went wrong while making order", t))
                            .orElse(markCodesAsFailed(chunk, orderId));
                })
                .flatMap(Collection::stream)
                .collect(toList())
        );

        return new OrderResult(boughtCodes);
    }
I've also checked how many threads would be sufficient to make order in a reasonable time:
Note that all the results presented in the chart have been averaged (for each number of threads the test has been performed 10 times). The chart shows that using 15 threads is sufficient because it takes slightly more than 4 seconds to make 50 requests. As you can see changing pool size is quite easy. I do realize that I could do that differently but in the end this solution looks good. All the calls to the API have timeout so I shouldn't experience all the typical problems connected to parallel stream that people talk about.

No comments:

Post a Comment