Este último post de la serie de Futuros en Java es la continuación del anterior donde lo dejamos, viendo las funcionalidades de la clase CompletableFuture. Si no lo has leído ya, mejor empieza por ahí 🙂
Esta serie consta de las siguientes partes:
– Parte 1: Introducción
– Parte 2: interfaz Future
– Parte 3: CompletableFuture, introducción
– Parte 4: CompletableFuture, uso avanzado
Continúo con el uso de los CompletableFuture, a un nivel más avanzado:
Listeners / Callbacks
Como hemos visto, sobre esos futuros creados en los ejemplos anteriores podemos ahora ejecutar el método ‘get‘ de siempre para obtener el valor. Pero eso es justo lo que queríamos evitar. Para eso tenemos los callbacks y listeners, que serán ejecutados en cuanto el futuro se complete.
Además, la mayoría de estos métodos para crear callbacks devuelven a su vez un CompletableFuture también. Muy útil para encadenar varios futuros, como veremos después.
Todas tienen sus tres versiones, como ya expliqué en el anterior post.
whenComplete: este ya lo hemos visto en el anterior post, añade un callback para ejecutarlo cuando el futuro se complete. La lambda tiene 2 parámetros, uno es el posible resultado, y el otro es la excepción, si la hubiera habido. Veremos el tratamiento de excepciones más adelante.
thenApply: para transformar futuros. La idea es pasarle una función lambda que transforme el resultado del primero. Es similar al ‘map‘ de Scala.
CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync for thenApply...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync for thenApply!");
return "Terminado";
}, executor);
CompletableFuture<String> futureApply = futureAsync.thenApplyAsync(s -> {
LOGGER.info("Comenzando applyAsync...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado applyAsync!");
return s.toUpperCase();
}, executor);
futureApply.whenCompleteAsync((s, e) -> LOGGER.info("Resultado applyAsync: {}", s),
executor);
thenAccept y thenRun: muy similares al whenComplete, ejecutaran el lambda una vez se complete el futuro. El primero recibe un resultado, y el segundo no. Son equivalentes al supplyAsync y runAsync respectivamente.
// thenAccept
CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync for thenAccept...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync for thenAccept!");
return "Terminado";
}, executor);
futureAsync.thenAcceptAsync(s -> {
LOGGER.info("Comenzando thenAccept...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado thenAccept!");
LOGGER.info("Resultado: {}", s);
}, executor);
// thenRun
CompletableFuture<Void> futureRun = CompletableFuture.runAsync(() -> {
LOGGER.info("Comenzando runAsync for thenRun...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado runAsync for thenRun!");
}, executor);
futureRun.thenRunAsync(() -> {
LOGGER.info("Comenzando thenRun...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado thenRun!");
}, executor);
Excepciones
Tenemos varias maneras de gestionar las excepciones de futuros con la clase CompletableFuture, usando estos métodos:
exceptionally: registra un callback para gestionar la excepción. Recibe una lambda que solo tiene de parámetro la excepción, debe retornar un valor del mismo tipo que el futuro en el que se originó la excepción.
CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync with exception...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync with exception!");
throw new RuntimeException("Error en el futuro");
}, executor);
CompletableFuture<String> futureEx = futureAsync.exceptionally(e -> {
LOGGER.error("Resultado con excepción!!", e);
return "StringPorDefecto";
});
futureEx.whenCompleteAsync((s, e) -> LOGGER.info("Resultado futureEx: {}", s),
executor);
handle: registra un callback para gestionar el resultado o excepción. Recibe una lambda que tiene dos parámetros, el resultado y la excepción. Si la excepción no es nula, es que ha habido una excepción. También deber retornar un valor del tipo del futuro que lanzo la excepción.
CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync with exception...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync with exception!");
throw new RuntimeException("Error en el futuro");
}, executor);
CompletableFuture<String> handledFuture = futureAsync.handleAsync((s, e) -> {
if (e != null) {
LOGGER.error("Resultado con excepción!!", e);
return "StringPorDefecto";
} else {
LOGGER.info("Resultado: {}", s);
return s;
}
}, executor);
handledFuture.whenCompleteAsync((s, e) -> LOGGER.info("Resultado handle: {}", s),
executor);
whenComplete: con este método que ya hemos explicado podemos hacer algo parecido al ‘handle’, dado que la lambda que registra tiene también los dos parámetros.
CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync with exception...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync with exception!");
throw new RuntimeException("Error en el futuro");
}, executor);
futureAsync.whenCompleteAsync((s, e) -> {
if (e != null) {
LOGGER.error("Resultado con excepción!!", e);
} else {
LOGGER.info("Resultado applyAsync: {}", s);
}
}, executor);
Combinar futuros
En casi todos los ejemplos anteriores prácticamente solo hemos usado callbacks sobre un mismo futuro. Pero, como hemos visto en algún ejemplo (el ‘thenApply’), podemos encadenar futuros y combinarlos. En esta funcionalidad es donde se ve el verdadero potencial del desarrollo usando futuros.
Tenemos los siguientes métodos:
thenCompose: Muy similar a ‘thenApply’, pero este es equivalente al ‘flatMap’ de Scala. Lo que hace es una cadena de futuro también. Por ejemplo, aquí llamamos a ‘thenCompose’ con una lambda que a su vez es otro futuro:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando supplyAsync for thenCompose...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado supplyAsync for thenCompose!");
return "Terminado";
}, executor);
CompletableFuture<String> fCompose =
future1.thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando thenCompose...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado thenCompose!");
return s.concat(" + Terminado other");
}, executor),
executor);
fCompose.whenCompleteAsync((s, e) -> LOGGER.info("Resultado thenCompose: {}", s),
executor);
Parece algo retorcido, pero puede tener su lógica 😉
thenCombine: En este caso, en lugar de una cadena de futuros, espera a que terminen dos futuros, para luego hacer algo. En este caso la lambda tendrá dos parámetros, que son el resultado de cada uno de los dos futuros:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for thenCombine...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado future1 for thenCombine!");
return "Terminado";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for thenCombine...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for thenCombine!");
return "Terminado other";
}, executor);
CompletableFuture<String> fCombine =
future1.thenCombineAsync(future2, (s1, s2) -> {
LOGGER.info("En el thenCombine, recibidos results: {}, {}", s1, s2);
return s1 + s2;
}, executor);
fCombine.whenCompleteAsync((s, e) -> LOGGER.info("Resultado thenCombine: {}", s),
executor);
thenAcceptBoth y runAfterBoth: Muy similares al ‘thenCombine’, excepto que no generan un nuevo futuro, simplemente ejecutan la lambda cuando los dos futuros terminen. Es como un ‘whenComplete‘ pero esperando dos futuros:
// thenAcceptBoth
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for thenAcceptBoth...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado future1 for thenAcceptBoth!");
return "Terminado";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for thenAcceptBoth...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for thenAcceptBoth!");
return "Terminado other";
}, executor);
future1.thenAcceptBothAsync(future2, (s1, s2) ->
LOGGER.info("En el thenAcceptBoth, recibidos results: {}, {}", s1, s2)
, executor);
// runAfterBoth
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
LOGGER.info("Comenzando future1 for runAfterBoth...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado future1 for runAfterBoth!");
}, executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
LOGGER.info("Comenzando future2 for runAfterBoth...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for runAfterBoth!");
}, executor);
future1.runAfterBothAsync(future2, () -> LOGGER.info("En el runAfterBoth, futuros terminados.")
, executor);
acceptEither y runAfterEither: En algunos casos en que tengamos dos futuros nos interesará hacer algo cuando uno de los dos termine, el primero que lo haga. Para eso están estos dos métodos:
// acceptEither
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for acceptEither...");
Sleep.sleepSeconds(3);
LOGGER.info("Terminado future1 for acceptEither!");
return "Segundo";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for acceptEither...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for acceptEither!");
return "Primero";
}, executor);
future1.acceptEitherAsync(future2, (s) ->
LOGGER.info("En el acceptEither, recibido el primer resultado: {}", s)
, executor);
// runAfterEither
CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
LOGGER.info("Comenzando future1 for runAfterEither...");
Sleep.sleepSeconds(3);
LOGGER.info("Terminado future1 for runAfterEither!");
}, executor);
CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
LOGGER.info("Comenzando future2 for runAfterEither...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for runAfterEither!");
}, executor);
future1.runAfterEitherAsync(future2, () -> LOGGER.info("En el runAfterEither, primero terminado.")
, executor);
applyToEither: muy similar a ‘acceptEither’, pero este devuelve a su vez un futuro. Es como el ‘thenApply’ pero sobre el futuro que termine antes:
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for applyToEither...");
Sleep.sleepSeconds(3);
LOGGER.info("Terminado future1 for applyToEither!");
return "Segundo";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for applyToEither...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for applyToEither!");
return "Primero";
}, executor);
CompletableFuture<String> applyToEitherFuture = future1.applyToEitherAsync(future2, s -> {
LOGGER.info("Comenzando applyToEither...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado applyToEither!");
return s.toUpperCase();
}, executor);
applyToEitherFuture.whenCompleteAsync((s, e) -> LOGGER.info("Resultado applyToEither: {}", s),
executor);
allOf y anyOf: Hasta ahora parece que solo podíamos combinar dos futuros. Con estos dos métodos podemos hacer un ‘thenAcceptBoth’ o ‘acceptEither’ sobre un número ilimitado de futuros:
// allOf
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for allOf...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado future1 for allOf!");
return "Terminado future1";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for allOf...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for allOf!");
return "Terminado future2";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future3 for allOf...");
Sleep.sleepSeconds(3);
LOGGER.info("Terminado future3 for allOf!");
return "Terminado future3";
}, executor);
CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);
all.whenCompleteAsync((s, e) -> LOGGER.info("Resultado all: {}", s), executor);
// anyOf
CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future1 for allOf...");
Sleep.sleepSeconds(2);
LOGGER.info("Terminado future1 for allOf!");
return "Terminado future1";
}, executor);
CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future2 for allOf...");
Sleep.sleepSeconds(1);
LOGGER.info("Terminado future2 for allOf!");
return "Terminado future2";
}, executor);
CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
LOGGER.info("Comenzando future3 for allOf...");
Sleep.sleepSeconds(3);
LOGGER.info("Terminado future3 for allOf!");
return "Terminado future3";
}, executor);
CompletableFuture<Object> all = CompletableFuture.anyOf(future1, future2, future3);
all.whenCompleteAsync((s, e) -> LOGGER.info("Resultado any: {}", s), executor);
La única pega es que el tipo que devuelven es Void y Object respectivamente.
Lógicamente, si esperas a que terminen todos los futuros (allOf), y devuelven resultado, no quieres un resultado, querrás todos. Quizá deberían haber hecho que devolviera un List en lugar de Void.
Hasta aquí este pequeño tutorial de cómo usar los futuros en Java con la clase CompletableFuture. Espero que os sea útil. Tenéis todos los ejemplos y alguno más en mi github, proyecto completablefuture-example.