Tuesday, 19 July 2016

[Java 8 / Threads / Parallel stream] How to control pool size while using parallel stream ?

Recently I had to implement huge functionality which among other things is responsible for automatic buying of products available in some shop. The api allows to put many products into single request but due to some legal issues the items have to be bought one by one. So if someone wants to buy:
Product: Witcher 3, quantity: 15
Product: GTA 5, quantity: 5
I have to make 20 requests. It's a soap endpoint so it takes lots of time. Consider the following method:
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String orderId, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes = orderEntriesChunks.stream()
                .map(chunk -> Try.ofFailable(() -> buy(orderEntries, orderId, language))
                                 .whenFailure(t -> log().error("Something went wrong while making order", t))
                                 .orElse(markCodesAsFailed(chunk)))
                .flatMap(Collection::stream)
                .collect(toList());

        return new OrderResult(boughtCodes, orderId);
   }
It splits the order into chunks (one item per chunk) and buys it. buy() method calls soap endpoint which returns the code. When I try to buy 50 codes it takes one minute to complete the order. It's way too long so my first thought was: replace stream() with parallelStream(). And it actually works :)
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String orderId, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes = orderEntriesChunks.parallelStream()
                .map(chunk -> Try.ofFailable(() -> buy(orderEntries, orderId, language))
                                 .whenFailure(t -> log().error("Something went wrong while making order", t))
                                 .orElse(markCodesAsFailed(chunk)))
                .flatMap(Collection::stream)
                .collect(toList());

        return new OrderResult(boughtCodes, orderId);
   }
I'm trying to buy 50 codes so buy() method is being invoked 50 times. For stream() I get: 61.3s For parallelStream() I get: 10.21s 10 seconds is a huge improvement but it's still very long so my second thought was to increase the number of threads in pool. parallelStream() is ok but there's no overloaded parallelStream(int threads) method. Since Java 7 fork / join framework is available directly in the JDK. Parallel stream utilizes the framework in order to perform operations on stream's elements using multiple threads. When you look into ForkJoinPool class you'll see that default construvtor sets default number of threads (parallelism parameter) like that:
    public ForkJoinPool() {
        this(Math.min(MAX_CAP, Runtime.getRuntime().availableProcessors()),
             defaultForkJoinWorkerThreadFactory, null, false);
    }
I takes minimum(availableProcessors, 0x7fff) where 0x7fff = 32767 You'll typically get here min(8, 32767) = 8. Let's make some test.
    public static void main(String [] args) {
        final Set<Object> threadNames = IntStream.range(0, 10).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return i; }))
                .map(i -> Thread.currentThread().getName())
                .collect(toSet());
        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
It prints:
4
[ForkJoinPool.commonPool-worker-1, ForkJoinPool.commonPool-worker-2, main, ForkJoinPool.commonPool-worker-3]
Note that peek operation which sleeps for a second has been added to make operations longer so all the threads are being used. Let's try to increase number of threads in the pool using ForkJoinPool.
    public static void main(String [] args) throws ExecutionException, InterruptedException {
        final ForkJoinPool forkJoinPool = new ForkJoinPool(20);
        final Set<String> threadNames = forkJoinPool.submit(() -> IntStream.range(0, 20).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return true; }).toOptional())
                .map(i -> Thread.currentThread().getName())
                .collect(toSet())).get();

        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
This one prints:
20
[ForkJoinPool-1-worker-8, ForkJoinPool-1-worker-30, ForkJoinPool-1-worker-9, ForkJoinPool-1-worker-23, ForkJoinPool-1-worker-12, ForkJoinPool-1-worker-22, ForkJoinPool-1-worker-11, ForkJoinPool-1-worker-20, ForkJoinPool-1-worker-1, ForkJoinPool-1-worker-4, ForkJoinPool-1-worker-5, ForkJoinPool-1-worker-2, ForkJoinPool-1-worker-16, ForkJoinPool-1-worker-27, ForkJoinPool-1-worker-15, ForkJoinPool-1-worker-26, ForkJoinPool-1-worker-25, ForkJoinPool-1-worker-19, ForkJoinPool-1-worker-18, ForkJoinPool-1-worker-29]
As you can see number of threads involved in resolving stream's output has been increased to 20. It takes only one additional line because you have to create ForkJoinPool object. I'd really like to get rid of that line so I don't have to remember about ForkJoinPool so I've created this class:
/**
 * @author Grzegorz Taramina
 *         Created on: 18/07/16
 */
public class ForkJoinPoolInvoker {
    private final ForkJoinPool forkJoinPool;

    public static ForkJoinPoolInvoker usePoolWithSize(final int poolSize) {
        return new ForkJoinPoolInvoker(poolSize);
    }

    private ForkJoinPoolInvoker(final int poolSize) {
        this.forkJoinPool = new ForkJoinPool(poolSize);
    }

    public <T> T andInvoke(final Callable<T> task) {
        final ForkJoinTask<T> submit = forkJoinPool.submit(task);
        return Try.ofFailable(submit::get).orElseThrow(RuntimeException::new);
    }
}
Now the previous example would lool like that:
    public static void main(String [] args) throws ExecutionException, InterruptedException {
        final Set<Object> threadNames = usePoolWithSize(20).andInvoke(() -> IntStream.range(0, 20).parallel()
                .boxed()
                .peek(i -> Try.ofFailable(() -> { Thread.sleep(1000); return true; }).toOptional())
                .map(i -> Thread.currentThread().getName())
                .collect(toSet()));

        System.out.println(threadNames.size());
        System.out.println(threadNames);
    }
Let's get back to the main example that buys codes:
    public OrderResult makeOrder(final List<ExternalSupplierOrderEntry> orderEntries, final String language) {
        final List<List<ExternalSupplierOrderEntry>> orderEntriesChunks = orderSplitter.split(orderEntries);
        final List<ExternalSupplierCode> boughtCodes =  usePoolWithSize(nexwaySettings.getNumberOfBuyingThreads())
                                                        .andInvoke(() ->
             orderEntriesChunks.stream()
                .parallel()
                .map(chunk -> {
                    final String orderId = uuidProvider.randomUUID();
                    return Try.ofFailable(() -> buy(chunk, orderId, language))
                            .whenFailure(t -> log().error("Something went wrong while making order", t))
                            .orElse(markCodesAsFailed(chunk, orderId));
                })
                .flatMap(Collection::stream)
                .collect(toList())
        );

        return new OrderResult(boughtCodes);
    }
I've also checked how many threads would be sufficient to make order in a reasonable time:
Note that all the results presented in the chart have been averaged (for each number of threads the test has been performed 10 times). The chart shows that using 15 threads is sufficient because it takes slightly more than 4 seconds to make 50 requests. As you can see changing pool size is quite easy. I do realize that I could do that differently but in the end this solution looks good. All the calls to the API have timeout so I shouldn't experience all the typical problems connected to parallel stream that people talk about.

Thursday, 14 July 2016

[Spring / Async] How to invoke method in separate thread ?

[Spring / Async] How to invoke method in separate thread ? Some tasks need to be invoked in a separate thread - asynchronously. For instance when the operation needs a lot of time to finish. You just want to run it and return its id or some kind of status. In pure Java you would use Threads / Runnables and other stuff available in low-level cuncurrency API which JDK provides. Actually it isn't very convenient so Spring developers have taken care of that. I've prepared very simple Spring application which contains only two beans: RequestHandler and OperationRunner. The first one invokes the other. Here's the application runner:
    public class App {
        public static void main(final String [] args) throws InterruptedException {
            final AnnotationConfigApplicationContext context = new AnnotationConfigApplicationContext(AppConfig.class);
            final RequestHandler handler = context.getBean(RequestHandler.class);
            handler.handle();
        }
    }
And a configuration:
@Configuration
@ComponentScan("gt.dev.spring.async")
public class AppConfig {
}
Implementation of components:
    @Component
    public class RequestHandler {
        private static final Logger LOG = Logger.getLogger(RequestHandler.class);

        private final OperationRunner operationRunner;

        @Autowired
        public RequestHandler(final OperationRunner operationRunner) {
            this.operationRunner = operationRunner;
        }

        public void handle() throws InterruptedException {
            LOG.info("Request processing started. Invoking operation...");
            operationRunner.run();
            LOG.info("Request processed");
        }
    }
And OperationRunner:
    @Component
    public class OperationRunner {
        private static final Logger LOG = Logger.getLogger(OperationRunner.class);

        public void run() throws InterruptedException {
            LOG.info("Operation started. Sleeping...");
            Thread.sleep(5000);
            LOG.info("Operation finished");
        }
    }
After running main method in App class I get the following logs:
maj 07, 2015 3:39:28 PM gt.dev.spring.async.RequestHandler handle
INFO: Request processing started. Invoking operation...
maj 07, 2015 3:39:28 PM gt.dev.spring.async.OperationRunner run
INFO: Operation started. Sleeping...
maj 07, 2015 3:39:33 PM gt.dev.spring.async.OperationRunner run
INFO: Operation finished
maj 07, 2015 3:39:33 PM gt.dev.spring.async.RequestHandler handle
INFO: Request processed
It works as expected:
  • RequestHandler logs information which indicates that process has been started
  • RequestHandler invokes OperationRunner
  • OperationRunner logs that operation started
  • OperationRunner performs long operation (sleeps for 5 seconds)
  • OperationRunner logs that operation finished
  • control gets back to RequestHandler
  • RequestHandler logs that request has been processed
Every client has to wait until the operation is completed. In some cases we may want to return some kind of id of operation to the client and process in the background asynchronously. Let's make the run() method async. To do that we need @Async annotation that indicates that particular method should be run in separate thread.
    @Component
    public class OperationRunner {
        private static final Logger LOG = Logger.getLogger(OperationRunner.class);

        @Async
        public void run() throws InterruptedException {
            LOG.info("Operation started. Sleeping...");
            Thread.sleep(5000);
            LOG.info("Operation finished");
        }
    }
Spring also has to know that async calls are enabled so we need one additional annotation in spring configuration - @EnableAsync. Lots of people forget about that. It's actually quite clever. You can put @Async on all the methods that need to be asynchronous and when needed disable all async calls by removing @EnableAsync.
    @Configuration
    @ComponentScan("gt.dev.spring.async")
    @EnableAsync
    public class AppConfig {
    }
After running the app I get the following log which proves that request has been processed before the async operation finished.
maj 07, 2015 3:54:13 PM gt.dev.spring.async.RequestHandler handle
INFO: Request processing started. Invoking operation...
maj 07, 2015 3:54:13 PM gt.dev.spring.async.RequestHandler handle
INFO: Request processed
maj 07, 2015 3:54:13 PM gt.dev.spring.async.OperationRunner run
INFO: Operation started. Sleeping...
maj 07, 2015 3:54:18 PM gt.dev.spring.async.OperationRunner run
INFO: Operation finished

Tuesday, 12 July 2016

[Java 8 / Functional programming] Functional util that invokes a command n times.

Recently I've been doing major refactoring of integration tests. I've found many tests which do stuff like that:
for (int i = 0; i < 256; i++) {
     addProduct(UUID.randomUUID().toString);
 }
It's pretty ugly, isn't it ? It would be nice to have a small tool that invokes given piece of code n times. In Java 8 we can use IntStream:
IntStream.range(0, 256).forEach(i -> addProduct(UUID.randomUUID().toString()));
Looks better but it's still not very readable. Again I've started with a test that specifies how the tool should work:
    @Test
    public void shouldInvokeCommandFiveTimes() throws Exception {
        // given
        final List<String> list = newArrayList();

        // when
        times(5).invoke(() -> list.add("item"));

        // then
        assertThat(list).containsExactly("item", "item", "item", "item", "item");
    }
I've come up with the following class:
/**
 * @author Grzegorz Taramina
 *         Created on: 12/07/16
 */
public class Times {
    private final int times;

    private Times(final int times) {
        this.times = times;
    }

    public static Times times(final int times) {
        Assert.isTrue(times >= 0, "times must be at least equal to zero");
        return new Times(times);
    }

    public void invoke(final Runnable runnable) {
        IntStream.range(0, times).forEach(i -> runnable.run());
    }
}
It's very simple but makes code concise and readable:
times(5).invoke(() -> addProduct(randomUUID().toString));
I might have exaggerated saying that this is functional tool. It's simply higher order function but very useful.

Monday, 11 July 2016

[Java8 / Functional programming] How to create object that will be created lazily ?

Sometimes you may want to create some objects lazily. Especially when it comes to really heavy objects that may or may not be used in runtime. In one of the companies I used to work we had to use Java 6. Some developers must have read some articles about laziness and started to create literally all the objects lazily like that:
    public static class OldFashionedHeavyObjectHolder {
        private HeavyObject heavyObject;

        public synchronized HeavyObject getHeavyObject() {
            if (heavyObject == null) {
                heavyObject = new HeavyObject();
            }

            return heavyObject;
        }
    }
After couple of months we had a lot of classes with tons of getters that check if an object is null and so on. I can notice at least four disadvantages of that approach:
  • the method has to be synchronized because more than one thread can invoke the method when heavyObject == null
  • even if heavyObject has already been created you have to check that
  • it's extremely ugly
  • it's hard to test it
Luckily I've changed the company and now I can use all those fancy streams, lambdas and everything that Java 8 comes with. Basically I wanted to create a tool which works like that:
/**
 * @author Grzegorz Taramina
 *         Created on: 23/06/16
 */
public class LazyInstanceTest {
    @Test
    public void shouldCreateLazyInstance() throws Exception {
        // given
        LazyInstance<String> instance = LazyInstance.of(() -> "i'm lazy");

        // when
        String result = instance.get();

        // then
        assertThat(result).isEqualTo("i'm lazy");
    }
}
String is obviously just a simplification. So some kind of factory that creates a holder of a heavy instance and takes care of creating it lazily. I've figured out the following class:
 *
 * @author Grzegorz Taramina
 *         Created on: 23/06/16
 */
public class LazyInstance<T> {
    private final Supplier<T> instanceSupplier;
    private Supplier<T> instance = this::create;

    public static <T> LazyInstance<T> of(final Supplier<T> instanceSupplier) {
        return new LazyInstance<>(instanceSupplier);
    }

    /**
     * Creates LazyInstance
     * @param instanceSupplier supplier that will be lazily used while creating instance
     */
    private LazyInstance(final Supplier<T> instanceSupplier) {
        this.instanceSupplier = instanceSupplier;
    }

    public T get() {
        return instance.get();
    }

    private synchronized T create() {
        class InstanceFactory implements Supplier<T> {
            private final T instance = instanceSupplier.get();

            public T get() {
                return instance;
            }
        }

        if (!InstanceFactory.class.isInstance(instance)) {
            instance = new InstanceFactory();
        }

        return instance.get();
    }
}
It works like that:
final LazyInstance<HeavyObject> heavy = new LazyInstance<>(HeavyObject::new);
HeavyObject heavyObject = heavy.get();
The main idea of this class is that the supplier is being invoked lazily. Synchronized create method returns value returned by InstanceFactory (in fact it's a Supplier). Instance factory in turn returns value that returns Supplier provided to the LazyInstance. So basically we're invoking supplier that invokes supplier that creates real instance. Another thing: instance = new InstanceFactory(); - this line's really important because it swaps suppliers which means that synchronized block and if-else statement are being invoked only once. After the object is created the instance field (in LazyInstance not the InstanceFactory) contains InstanceFactory instance which returns real instance. It may look a bit complicated but I think it does all the stuff quite elegantly. Just to prove that the instance is being created lazily:
    public static class HeavyObject {
        public HeavyObject() {
            System.out.println("heavy's being created...");
        }
    }

    public static void main(String [] args) {
        System.out.println("Started executing main method");
        final LazyInstance<HeavyObject> heavy = LazyInstance.of(HeavyObject::new);
        System.out.println("Created lazy instance");
        System.out.println("Calling heavy.get()");
        HeavyObject heavyObject = heavy.get();
        System.out.println("End of main");
    }
The output:
Started executing main method
Created lazy instance
Calling heavy.get()
heavy's being created...
End of main
I should also mention that it looks good in Java 8 because of lambdas but it can be also implemented in older versions using anonymous classes.