Java's CompletableFuture reference
Since the addition of CompletableFuture in Java 8, running background task or parallel processing or multi-threading got really simplified. These are some of the examples on how to use CompletableFuture in standalone java or with Spring Boot to perform background tasks.
Complete list of methods available in CompletableFuture can be found here at
Basic example to call a method in background and return result
The task gets executed in Java's default ForkJoin.commonPool()CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "Hello");
String result = completableFuture.join();
System.out.println("result = " + result);
Basic example to call a method in background and without returning a result
The task gets executed in Java's default ForkJoin.commonPool()CompletableFuture<Void> completableFuture = CompletableFuture
.runAsync(() -> System.out.println("Hello"));
completableFuture.join();
Handle exception before returning the result
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> {
throw new RuntimeException("blah");
})
.handle((o, throwable) -> {
if (throwable != null) {
System.out.println("throwable = " + throwable);
} else if (o instanceof String) {
return (String) o;
}
return null;
});
String result = completableFuture.join();
System.out.println("result = " + result);Handle exception without returning the result
CompletableFuture<Void> completableFuture = CompletableFuture
.runAsync(() -> {
throw new RuntimeException("blah");
})
.handle((o, throwable) -> {
if (throwable != null) {
System.out.println("throwable = " + throwable);
}
return null;
});
completableFuture.join();Handle exceptions using exceptionally block
The exceptionally block is executed only when an uncaught exception is propagated from previous stage of the future. So the parameter Exception is non-null and the block is not executed if now exceptions were thrown in previous stages.CompletableFuture<Void> completableFuture = CompletableFuture
.runAsync(() -> {
throw new RuntimeException("blah");
})
.exceptionally((ex) -> {
System.out.println("exception = " + ex);
return null;
});
completableFuture.join();Execute the task / method in Java's Virtual Thread (from Java 21)
try(ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "Hello", executorService);
String result = completableFuture.join();
System.out.println("result = " + result);
}Execute the task / method in Java's Virtual Thread with max concurrency or throttling (from Java 21)
Using semaphonefinal int max_concurrency = 10;Without semaphore
final Semaphore semaphore = new Semaphore(max_concurrency);
try (ExecutorService executorService = Executors.newVirtualThreadPerTaskExecutor()) {
List<CompletableFuture<String>> tasks = IntStream.range(0, 100)
.mapToObj(index -> CompletableFuture
.supplyAsync(() -> {
try {
semaphore.acquire();
String result = index + " " + Instant.now();
System.out.println(result);
Thread.sleep(1000);
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Execution interrupted " + e.getMessage());
} finally {
semaphore.release();
}
return null;
}, executorService))
.toList();
List<String> results = tasks.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList();
System.out.println("results = " + results);
}final int max_concurrency = 10;
try (ExecutorService executor = new ThreadPoolExecutor(
max_concurrency,
max_concurrency,
0L, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>(),
Thread.ofVirtual().factory() // Uses virtual threads as the pool workers
)) {
List<CompletableFuture<String>> tasks = IntStream.range(0, 100)
.mapToObj(index -> CompletableFuture
.supplyAsync(() -> {
try {
String result = index + " " + Instant.now();
System.out.println(result);
Thread.sleep(1000);
return result;
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
System.out.println("Execution interrupted " + e.getMessage());
}
return null;
}, executor))
.toList();
List<String> results = tasks.stream()
.map(CompletableFuture::join)
.filter(Objects::nonNull)
.toList();
System.out.println("results = " + results);
}Execute the task / method in Java's Fixed Thread Pool
try(ExecutorService executorService = Executors.newFixedThreadPool(20)) {
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "Hello", executorService);
String result = completableFuture.join();
System.out.println("result = " + result);
}Execute the task / method in Spring Virtual Thread Executor (from Java 21)
// Declare SimpleAsyncTaskExecutor as @Bean in a @Configuration file use @Autowired to inject
// If injected using @Autowired, then close() method is done by spring framework
try(SimpleAsyncTaskExecutor simpleAsyncTaskExecutor = new SimpleAsyncTaskExecutor()) {
simpleAsyncTaskExecutor.setVirtualThreads(true);
simpleAsyncTaskExecutor.setThreadNamePrefix("AsyncTaskExecutor-");
Executor executor = simpleAsyncTaskExecutor;
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "Hello", executor);
String result = completableFuture.join();
System.out.println("result = " + result);
}Execute the task / method in Spring Thread Pool Executor
// Declare ThreadPoolTaskExecutor as @Bean in a @Configuration file use @Autowired to inject
ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor();
threadPoolTaskExecutor.setCorePoolSize(3);
threadPoolTaskExecutor.setMaxPoolSize(10);
threadPoolTaskExecutor.setQueueCapacity(3000);
threadPoolTaskExecutor.setThreadNamePrefix("AsyncTaskExecutor-");
threadPoolTaskExecutor.setWaitForTasksToCompleteOnShutdown(true);
threadPoolTaskExecutor.initialize();
Executor executor = threadPoolTaskExecutor;
CompletableFuture<String> completableFuture = CompletableFuture
.supplyAsync(() -> "Hello", executor);
String result = completableFuture.join();
System.out.println("result = " + result);
// If injected using @Autowired, shutdown is done by spring framework
threadPoolTaskExecutor.initiateShutdown();
Comments
Post a Comment