Sunday 6 November 2016

Java 8 CompletableFuture

CompletableFuture:

The CompletableFuture provides various features like below which was difficult to implement using the Future API.

  1. Combining two asynchronous computations in one.
  2. Waiting for the completion of all the tasks performed by a set of Futures.
  3. Waiting for the completion of the quickest task.
  4. Reacting to a future completion i.e to perform a further action using the result instead of being blocked waiting for the result.

Below is the example of CompletableFuture used in code:

       public Future<Double> getPriceAsync(String product){
              CompletableFuture<Double> futurePrice = new CompletableFuture<>();
              new Thread(() -> {
                     try{
                           double price = calculatePrice(product);
                           futurePrice.complete(price);
                     }catch(Exception ex){
                           futurePrice.completeExceptionally(ex);
                     }
              }).start();
              return futurePrice;
     }

The equivalent code for this is:

       public Future<Double> getPriceAsync(String product){
              return CompletableFuture.supplyAsync(() -> calculatePrice(product));
       }

The supplyAsync returns a CompletableFuture<U> where U is the type which we have supplied.Also internally it makes use of the executor using which the supplier executes.

Make your code non blocking:

Sequential Mode:  Here due to blocking this gives a poorest performance as we are not using all the resources.

       public static List<String> findPricesSequentially(String product){
              return shops.stream().map(shop -> String.format("%s price is %.2f",
                            shop.getName(),shop.getPrice(product))).collect(Collectors.toList());
       }

Parallel Mode: This mode is faster then the sequential mode and it utilizes all the required resources. Suppose I have 4 core available in my system to execute the task then this would use a threadpool of size 4. The number of procesors is got by using the below command.

Runtime.getRuntime().availableProcessors();

       public static List<String> findPricesUsingParallel(String product){
              return shops.parallelStream().map(shop -> String.format("%s price is %.2f",
                            shop.getName(),shop.getPrice(product))).collect(Collectors.toList());
       }

ComplitableFuture Mode: Below is the code for this:

       public static List<Double> findPricesUsingFuture(String product){
               List<CompletableFuture<Double>> priceFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() ->shop.getPrice(product))).collect(Collectors.toList());
               return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
       }

This mode performance is faster then sequential mode but it is slower then the parallel mode.

Performance of Various Computation:

In this we are going to analyse the performance of different computations. PFB the code.

public class Shop1 {
       private String name;

       public String getName() {
              return name;
       }

       public void setName(String name) {
              this.name = name;
       }

       public Shop1(String name){
              this.name = name;
       }
}

import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

public class ComparisionTest {

       static Random random = new Random();

       static List<Shop1> shops = Arrays.asList(new Shop1("BestPrice"),
                     new Shop1("LetsSaveBig"),
                     new Shop1("MyFavouriteShop"),
                     new Shop1("BuyItAll"));

       public static double getPrice(String product){
              return calculatePrice(product);
       }

       private static double calculatePrice(String product){
              delay();
              return random.nextDouble() * product.charAt(0) + product.charAt(1);
       }

       private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),new ThreadFactory(){
              public Thread newThread(Runnable r){
                     Thread t = new Thread(r);
                     t.setDaemon(true);
                     return t;
              }
       });

       public static void delay(){
              try{
                     Thread.sleep(1000L);
              } catch (InterruptedException ex){
                     throw new RuntimeException(ex);
              }
       }

       public static List<String> findPrices(String product){
              return shops.stream().map(shop -> String.format("%s price is %.2f",
                            shop.getName(),getPrice(product))).collect(Collectors.toList());
       }

       public static List<String> findPricesUsingParallel(String product){
              return shops.parallelStream().map(shop -> String.format("%s price is %.2f",
                            shop.getName(),getPrice(product))).collect(Collectors.toList());
       }

       public static List<Double> findPricesUsingFuture(String product){
               List<CompletableFuture<Double>> priceFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() ->getPrice(product))).collect(Collectors.toList());
               return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
       }

       public static List<Double> findPricesUsingFutureExecutor(String product){
               List<CompletableFuture<Double>> priceFutures = shops.stream().map(shop -> CompletableFuture.supplyAsync(() ->getPrice(product),executor)).collect(Collectors.toList());
               return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
       }

       public static void main(String[] args) throws Exception {
              long start = System.nanoTime();
              findPrices("myPhone27S");
              long duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Sequential Stream in "+ duration + " msecs");

              start = System.nanoTime();
              findPricesUsingParallel("myPhone27S");
              duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Parallel Stream in "+ duration + " msecs");

              start = System.nanoTime();
              findPricesUsingFuture("myPhone27S");
              duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Completable Future in "+ duration + " msecs");

              start = System.nanoTime();
              findPricesUsingFutureExecutor("myPhone27S");
              duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Completable Future Using Executor in "+ duration + " msecs");
       }
}

Case 1: Tasks Submitted : 4 (Number if shops)
Output of this is:

Sequential Stream in 4162 msecs
Parallel Stream in 1005 msecs
Completable Future in 2003 msecs
Completable Future Using Executor in 1014 msecs

Analysing the output:

  1. The Sequential mode is the slowest as it does not use all the available cores and runs sequentially.
  2. The Parallel Stream runs all the 4 parallel tasks parallely using the default thread pool which creates a pool of size 4 which is nothing but the number of processes available.
  3. The CompletableFuture is faster then the Sequential mode but is slower then the CompletableFuture one.
  4. The CompletableFuture with executor is the fastest of these.

Case 2: Tasks Submitted : 5 (Increasing the number of shops)
Output of this is:

Sequential Stream in 5204 msecs
Parallel Stream in 2005 msecs
Completable Future in 2004 msecs
Completable Future Using Executor in 1003 msecs

Analysing the output:

  1. The Sequential mode takes an extra second as it is working in non parallel.
  2. The Paralllel mode also takes an extra second because it uses the thread pool of only 4 threads only which gets it using Runtime.getRuntime().availableProcessors(). So this waits for one of the threads to be completed, so that it submits the next task.
  3. The ComplitableFuture is faster or almost equivalent to the parallel mode. These two uses the same threadpool and hence the performance is similar.
  4. The ComplitableFuture is the fastest of all the above. Here we are providing the customizable executor then using the default one.

Lets analyse why the ComplitableFuture with executor is faster then the others after doing the next test.

Case 3: Tasks Submitted :9
Output of this is:

Sequential Stream in 9244 msecs
Parallel Stream in 3028 msecs
Completable Future in 3019 msecs
Completable Future Using Executor in 1003 msecs

Analysing the output:

Lets analyse the CompletableFuture using customizable executor only because we know the output of the rest based on our previous analysis.
First lets understand the benefits of creating a threadpool with the appropriate size.

Sizing thread pools:

Creating a thread for every task is a costly affair and slows down the performance of the application since we need to allocate the required size. In order to save us from this we create a threadpool in which we can reuse the threads that have completed the tasks and this improves the applications performance. But the issue in this is what would be the ideal size of the threadpool.

From the great book of Java Concurrency in practice has advised us to give the optimal size of the threadpool. This is because if the number of threads in the pool is too big , they 'll end up competing for scarce CPU and memory resources, wasting their time performing context switching. Conversely, if the number is too small, some of the cores of the CPU will be underutilised. The ideal pool size can be determined using the below formula.

N of threads = N of CPU + U of CPU + (1 + W/C)

N of CPU : is the number of cores available through Runtime.getRuntime().availableProcessors().
U of CPU : is the target CPU utilization (between 0 and 1) and
W/C : is the ratio of wait time to compute time. Wait time is the one which is got if the process is waiting for the response from other resources.

In our case below are the values:

N of CPU : 4
U of CPU : 1
W/C : Since we are waiting for the response from the shops then it is 100%.

Hence N of threads = 400.

Below is the executor we used in the code.

       private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),new ThreadFactory(){
              public Thread newThread(Runnable r){
                     Thread t = new Thread(r);
                     t.setDaemon(true);
                     return t;
              }
       });

Coming back to the performance of CompletableFuture using executors, since each shop uses a single thread and there is no much context switching and hence this is the fastest.

Parallelism - via streams Vs ComplitableFuture:

If you are doing computation heavy operation without any IO then the stream interface gives the simpliest implementation and one likely the most efficient  - if all threads are compute bound , then there is no point of having more threds then the processor core.

On the other hand if your parallel units of work involve waiting for I/O then CompletbleFuture gives more flexibility.

Pipelining asynchronous tasks:

Below is the code for this. In this I am comparing the performance between stream, parallel stream and asynchronous pipelining.

package com.futures;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;

import com.futures.ExchangeService.Money;

public class Shop {

       private String name;
       Random random = new Random();

       static List<Shop> shops = Arrays.asList(new Shop("BestPrice"),
                     new Shop("LetsSaveBig"),
                     new Shop("MyFavouriteShop"),
                     new Shop("BuyItAll")
                     );

       private static final Executor executor = Executors.newFixedThreadPool(Math.min(shops.size(), 100),new ThreadFactory(){
              public Thread newThread(Runnable r){
                     Thread t = new Thread(r);
                     t.setDaemon(true);
                     return t;
              }
       });


       public String getName() {
              return name;
       }

       public void setName(String name) {
              this.name = name;
       }

       public Shop(String name){
              this.name = name;
       }

       public double getPrice(String product){
              return calculatePrice(product);
       }

       public String getPriceWithDiscount(String product){
              double price = calculatePrice(product);
              Discount.Code code = Discount.Code.values()[random.nextInt(Discount.Code.values().length)];
              return String.format("%s:%.2f:%s", name,price,code);
       }


       public Future<Double> getPriceAsync(String product){
              return CompletableFuture.supplyAsync(() -> calculatePrice(product));

       }

       public static void delay(){
              try{
                     Thread.sleep(1000L);
              } catch (InterruptedException ex){
                     throw new RuntimeException(ex);
              }
       }

       private double calculatePrice(String product){
              delay();
              return random.nextDouble() * product.charAt(0) + product.charAt(1);
       }


       public static  List<String> findPricesUsingDiscountedService(String product){
              return shops.stream()
                           .map(shop -> shop.getPriceWithDiscount(product))
                           .map(Quote::parse)
                           .map(Discount::applyDiscount)
                           .collect(Collectors.toList());
       }

       public static  List<String> findPricesUsingParallelDiscountedService(String product){
              return shops.parallelStream()
                           .map(shop -> shop.getPriceWithDiscount(product))
                           .map(Quote::parse)
                           .map(Discount::applyDiscount)
                           .collect(Collectors.toList());
       }

       public static List<String> findPricesUsingFutureOpr(String product){
              List<CompletableFuture<String>> priceFutures = shops.stream()
                           .map(shop -> CompletableFuture.supplyAsync(() -> shop.getPriceWithDiscount(product),executor))
                           .map(future -> future.thenApply(Quote::parse))
                           .map(future -> future.thenCompose(quote -> CompletableFuture.supplyAsync(() -> Discount.applyDiscount(quote),executor)))
                           .collect(Collectors.toList());

              return priceFutures.stream().map(CompletableFuture::join).collect(Collectors.toList());
       }



       public static void main(String[] args) throws Exception {

              long start = System.nanoTime();
              findPricesUsingDiscountedService("myPhone27S");
              long duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Completable Future Using stream "+ duration + " msecs");

              start = System.nanoTime();
              findPricesUsingParallelDiscountedService("myPhone27S");
              duration = (System.nanoTime() - start) / 1_000_000;
              System.out.println("Completable Future Using parallel stream "+ duration + " msecs");

              start = System.nanoTime();
              findPricesUsingFutureOpr("myPhone27S");
              duration = (System.nanoTime() - start) / 1_000_000;Th
              System.out.println("Completable Future Operation Using asynchronous pipeline "+ duration + " msecs");

       }
}

Output of this is:

Completable Future Using stream 8183 msecs
Completable Future Using parallel stream 2019 msecs
Completable Future Operation Using asynchronous pipeline 2013 msecs

Lets discuss on the various methods of asynchronous pipelining on ComplitableFuture.

  1. thenApply: This method does not block till the CompletableFuture is finally completes. But this tells what to do once we get the required output.
  2. thenCompose: This method is specifically for the purpose of pipelining two asynchronous operations passing the result of the first operation to the second operation. This would use the same thread of the first task and it has a variant called thenComposeAsync and this would submit the task to a threadpool.
  3. thenAccept: This takes an argument as Consumer and returns a CompletableFuture<Void>. This is usually the final step and is used when we have various tasks and want to do a third operation on the completion of the tasks and we want to ensure we finish the tasks as and when they are completed instead of waiting for all the tasks to be completed in a given order.Below is the code.

        CompletableFuture[] futures = findPricesStream(product)
                .map(f -> f.thenAccept(s -> System.out.println(s + " (done in " + ((System.nanoTime() - start) / 1_000_000) + " msecs)")))
                .toArray(size -> new CompletableFuture[size]);
        CompletableFuture.allOf(futures).join();

        Output of this is:

BuyItAll price is 136.72 (done in 1928 msecs)
MyFavouriteShop price is 175.36 (done in 2664 msecs)
BestPrice price is 193.51 (done in 2792 msecs)
LetsSaveBig price is 173.58 (done in 3420 msecs)

        We submit all the tasks to the thenAccept method which ever completes first prints the values as shown in the output. In order to ensure that all the tasks are completed then we save it to an Array and then we call the allOf       method on the CompletableFuture which waits for all the tasks to be completed.
     4.  thenCombine: If i want to do a task on the completion of two different asynchrnous tasks then I make use of this. Below is the code example of this.

    public static List<String> findPricesInUSD(String product) {
        List<CompletableFuture<Double>> priceFutures = new ArrayList<>();
        for (Shop shop : shops) {
            CompletableFuture<Double> futurePriceInUSD =
                CompletableFuture.supplyAsync(() -> shop.getPrice(product))
                .thenCombine(
                    CompletableFuture.supplyAsync(
                        () ->  ExchangeService.getRate(Money.EUR, Money.USD)),
                    (price, rate) -> price * rate
                );
            priceFutures.add(futurePriceInUSD);
        }
        List<String> prices = priceFutures
                .stream()
                .map(CompletableFuture::join)
                .map(price -> " price is " + price)
                .collect(Collectors.toList());
        return prices;
    }

Output of this is:

 price is 163.88456508124327
 price is 91.52218405797247
 price is 128.35890489760422
 price is 118.42066871703744


Here in the first task I am calculating the price and in the second the conversion ratio and finally combining these two to calculate the final price.

No comments:

Post a Comment