Futuros en Java, Parte 4: CompletableFuture, uso avanzado

Este último post de la serie de Futuros en Java es la continuación del anterior donde lo dejamos, viendo las funcionalidades de la clase CompletableFuture. Si no lo has leído ya, mejor empieza por ahí 🙂

Esta serie consta de las siguientes partes:
Parte 1: Introducción
Parte 2: interfaz Future
Parte 3: CompletableFuture, introducción
Parte 4: CompletableFuture, uso avanzado

Continúo con el uso de los CompletableFuture, a un nivel más avanzado:

Listeners / Callbacks
Como hemos visto, sobre esos futuros creados en los ejemplos anteriores podemos ahora ejecutar el método ‘get‘ de siempre para obtener el valor. Pero eso es justo lo que queríamos evitar. Para eso tenemos los callbacks y listeners, que serán ejecutados en cuanto el futuro se complete.
Además, la mayoría de estos métodos para crear callbacks devuelven a su vez un CompletableFuture también. Muy útil para encadenar varios futuros, como veremos después.
Todas tienen sus tres versiones, como ya expliqué en el anterior post.

whenComplete: este ya lo hemos visto en el anterior post, añade un callback para ejecutarlo cuando el futuro se complete. La lambda tiene 2 parámetros, uno es el posible resultado, y el otro es la excepción, si la hubiera habido. Veremos el tratamiento de excepciones más adelante.

thenApply: para transformar futuros. La idea es pasarle una función lambda que transforme el resultado del primero. Es similar al ‘map‘ de Scala.

    CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync for thenApply...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync for thenApply!");
        return "Terminado";
    }, executor);

    CompletableFuture<String> futureApply = futureAsync.thenApplyAsync(s -> {
        LOGGER.info("Comenzando applyAsync...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado applyAsync!");
        return s.toUpperCase();
    }, executor);

    futureApply.whenCompleteAsync((s, e) -> LOGGER.info("Resultado applyAsync: {}", s),
        executor);

thenAccept y thenRun: muy similares al whenComplete, ejecutaran el lambda una vez se complete el futuro. El primero recibe un resultado, y el segundo no. Son equivalentes al supplyAsync y runAsync respectivamente.

    // thenAccept
    CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync for thenAccept...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync for thenAccept!");
        return "Terminado";
    }, executor);

    futureAsync.thenAcceptAsync(s -> {
        LOGGER.info("Comenzando thenAccept...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado thenAccept!");
        LOGGER.info("Resultado: {}", s);
    }, executor);

    // thenRun
    CompletableFuture<Void> futureRun = CompletableFuture.runAsync(() -> {
        LOGGER.info("Comenzando runAsync for thenRun...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado runAsync for thenRun!");
    }, executor);

    futureRun.thenRunAsync(() -> {
        LOGGER.info("Comenzando thenRun...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado thenRun!");
    }, executor);

Excepciones
Tenemos varias maneras de gestionar las excepciones de futuros con la clase CompletableFuture, usando estos métodos:

exceptionally: registra un callback para gestionar la excepción. Recibe una lambda que solo tiene de parámetro la excepción, debe retornar un valor del mismo tipo que el futuro en el que se originó la excepción.

    CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync with exception...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync with exception!");
        throw new RuntimeException("Error en el futuro");
    }, executor);

    CompletableFuture<String> futureEx = futureAsync.exceptionally(e -> {
        LOGGER.error("Resultado con excepción!!", e);
        return "StringPorDefecto";
    });

    futureEx.whenCompleteAsync((s, e) -> LOGGER.info("Resultado futureEx: {}", s),
            executor);

handle: registra un callback para gestionar el resultado o excepción. Recibe una lambda que tiene dos parámetros, el resultado y la excepción. Si la excepción no es nula, es que ha habido una excepción. También deber retornar un valor del tipo del futuro que lanzo la excepción.

    CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync with exception...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync with exception!");
        throw new RuntimeException("Error en el futuro");
    }, executor);

    CompletableFuture<String> handledFuture = futureAsync.handleAsync((s, e) -> {
        if (e != null) {
            LOGGER.error("Resultado con excepción!!", e);
            return "StringPorDefecto";
        } else {
            LOGGER.info("Resultado: {}", s);
            return s;
        }
    }, executor);

    handledFuture.whenCompleteAsync((s, e) -> LOGGER.info("Resultado handle: {}", s),
            executor);

whenComplete: con este método que ya hemos explicado podemos hacer algo parecido al ‘handle’, dado que la lambda que registra tiene también los dos parámetros.

    CompletableFuture<String> futureAsync = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync with exception...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync with exception!");
        throw new RuntimeException("Error en el futuro");
    }, executor);

    futureAsync.whenCompleteAsync((s, e) -> {
        if (e != null) {
            LOGGER.error("Resultado con excepción!!", e);
        } else {
            LOGGER.info("Resultado applyAsync: {}", s);
        }
    }, executor);

Combinar futuros
En casi todos los ejemplos anteriores prácticamente solo hemos usado callbacks sobre un mismo futuro. Pero, como hemos visto en algún ejemplo (el ‘thenApply’), podemos encadenar futuros y combinarlos. En esta funcionalidad es donde se ve el verdadero potencial del desarrollo usando futuros.

Tenemos los siguientes métodos:

thenCompose: Muy similar a ‘thenApply’, pero este es equivalente al ‘flatMap’ de Scala. Lo que hace es una cadena de futuro también. Por ejemplo, aquí llamamos a ‘thenCompose’ con una lambda que a su vez es otro futuro:

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando supplyAsync for thenCompose...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado supplyAsync for thenCompose!");
        return "Terminado";
    }, executor);

    CompletableFuture<String> fCompose =
            future1.thenComposeAsync(s -> CompletableFuture.supplyAsync(() -> {
                        LOGGER.info("Comenzando thenCompose...");
                        Sleep.sleepSeconds(2);
                        LOGGER.info("Terminado thenCompose!");
                        return s.concat(" + Terminado other");
                    }, executor),
                    executor);

    fCompose.whenCompleteAsync((s, e) -> LOGGER.info("Resultado thenCompose: {}", s),
            executor);

Parece algo retorcido, pero puede tener su lógica 😉

thenCombine: En este caso, en lugar de una cadena de futuros, espera a que terminen dos futuros, para luego hacer algo. En este caso la lambda tendrá dos parámetros, que son el resultado de cada uno de los dos futuros:

        CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
            LOGGER.info("Comenzando future1 for thenCombine...");
            Sleep.sleepSeconds(2);
            LOGGER.info("Terminado future1 for thenCombine!");
            return "Terminado";
        }, executor);

        CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
            LOGGER.info("Comenzando future2 for thenCombine...");
            Sleep.sleepSeconds(1);
            LOGGER.info("Terminado future2 for thenCombine!");
            return "Terminado other";
        }, executor);

        CompletableFuture<String> fCombine =
                future1.thenCombineAsync(future2, (s1, s2) -> {
                    LOGGER.info("En el thenCombine, recibidos results: {}, {}", s1, s2);
                    return s1 + s2;
                }, executor);

        fCombine.whenCompleteAsync((s, e) -> LOGGER.info("Resultado thenCombine: {}", s),
                executor);

thenAcceptBoth y runAfterBoth: Muy similares al ‘thenCombine’, excepto que no generan un nuevo futuro, simplemente ejecutan la lambda cuando los dos futuros terminen. Es como un ‘whenComplete‘ pero esperando dos futuros:

    // thenAcceptBoth
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future1 for thenAcceptBoth...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado future1 for thenAcceptBoth!");
        return "Terminado";
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future2 for thenAcceptBoth...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for thenAcceptBoth!");
        return "Terminado other";
    }, executor);

    future1.thenAcceptBothAsync(future2, (s1, s2) ->
                    LOGGER.info("En el thenAcceptBoth, recibidos results: {}, {}", s1, s2)
            , executor);

    // runAfterBoth
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        LOGGER.info("Comenzando future1 for runAfterBoth...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado future1 for runAfterBoth!");
    }, executor);

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        LOGGER.info("Comenzando future2 for runAfterBoth...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for runAfterBoth!");
    }, executor);

    future1.runAfterBothAsync(future2, () -> LOGGER.info("En el runAfterBoth, futuros terminados.")
            , executor);

acceptEither y runAfterEither: En algunos casos en que tengamos dos futuros nos interesará hacer algo cuando uno de los dos termine, el primero que lo haga. Para eso están estos dos métodos:

    // acceptEither
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future1 for acceptEither...");
        Sleep.sleepSeconds(3);
        LOGGER.info("Terminado future1 for acceptEither!");
        return "Segundo";
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future2 for acceptEither...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for acceptEither!");
        return "Primero";
    }, executor);

    future1.acceptEitherAsync(future2, (s) ->
                    LOGGER.info("En el acceptEither, recibido el primer resultado: {}", s)
            , executor);

    // runAfterEither
    CompletableFuture<Void> future1 = CompletableFuture.runAsync(() -> {
        LOGGER.info("Comenzando future1 for runAfterEither...");
        Sleep.sleepSeconds(3);
        LOGGER.info("Terminado future1 for runAfterEither!");
    }, executor);

    CompletableFuture<Void> future2 = CompletableFuture.runAsync(() -> {
        LOGGER.info("Comenzando future2 for runAfterEither...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for runAfterEither!");
    }, executor);

    future1.runAfterEitherAsync(future2, () -> LOGGER.info("En el runAfterEither, primero terminado.")
            , executor);

applyToEither: muy similar a ‘acceptEither’, pero este devuelve a su vez un futuro. Es como el ‘thenApply’ pero sobre el futuro que termine antes:

    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future1 for applyToEither...");
        Sleep.sleepSeconds(3);
        LOGGER.info("Terminado future1 for applyToEither!");
        return "Segundo";
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future2 for applyToEither...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for applyToEither!");
        return "Primero";
    }, executor);

    CompletableFuture<String> applyToEitherFuture = future1.applyToEitherAsync(future2, s -> {
        LOGGER.info("Comenzando applyToEither...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado applyToEither!");
        return s.toUpperCase();
    }, executor);

    applyToEitherFuture.whenCompleteAsync((s, e) -> LOGGER.info("Resultado applyToEither: {}", s),
            executor);

allOf y anyOf: Hasta ahora parece que solo podíamos combinar dos futuros. Con estos dos métodos podemos hacer un ‘thenAcceptBoth’ o ‘acceptEither’ sobre un número ilimitado de futuros:

    // allOf
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future1 for allOf...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado future1 for allOf!");
        return "Terminado future1";
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future2 for allOf...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for allOf!");
        return "Terminado future2";
    }, executor);

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future3 for allOf...");
        Sleep.sleepSeconds(3);
        LOGGER.info("Terminado future3 for allOf!");
        return "Terminado future3";
    }, executor);

    CompletableFuture<Void> all = CompletableFuture.allOf(future1, future2, future3);

    all.whenCompleteAsync((s, e) -> LOGGER.info("Resultado all: {}", s), executor);

    // anyOf
    CompletableFuture<String> future1 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future1 for allOf...");
        Sleep.sleepSeconds(2);
        LOGGER.info("Terminado future1 for allOf!");
        return "Terminado future1";
    }, executor);

    CompletableFuture<String> future2 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future2 for allOf...");
        Sleep.sleepSeconds(1);
        LOGGER.info("Terminado future2 for allOf!");
        return "Terminado future2";
    }, executor);

    CompletableFuture<String> future3 = CompletableFuture.supplyAsync(() -> {
        LOGGER.info("Comenzando future3 for allOf...");
        Sleep.sleepSeconds(3);
        LOGGER.info("Terminado future3 for allOf!");
        return "Terminado future3";
    }, executor);

    CompletableFuture<Object> all = CompletableFuture.anyOf(future1, future2, future3);

    all.whenCompleteAsync((s, e) -> LOGGER.info("Resultado any: {}", s), executor);

La única pega es que el tipo que devuelven es Void y Object respectivamente.
Lógicamente, si esperas a que terminen todos los futuros (allOf), y devuelven resultado, no quieres un resultado, querrás todos. Quizá deberían haber hecho que devolviera un List en lugar de Void.

Hasta aquí este pequeño tutorial de cómo usar los futuros en Java con la clase CompletableFuture. Espero que os sea útil. Tenéis todos los ejemplos y alguno más en mi github, proyecto completablefuture-example.

Futuros en Java, Parte 3: CompletableFuture, introducción

En este tercer post sobre Futuros en Java veremos la nueva clase CompletableFuture, añadida en Java 8, y las funcionalidades que nos ofrece, comparado con la simple Future.
La clase CompletableFuture tiene mucha funcionalidad, por lo que he dividido en dos posts su explicación. Este primer post estará enfocado a hacer una introducción, y en el próximo post (y último de esta serie) veremos un uso más avanzado.

Esta serie consta de las siguientes partes:
Parte 1: Introducción
Parte 2: interfaz Future
Parte 3: CompletableFuture, introducción
Parte 4: CompletableFuture, uso avanzado

CompletableFuture: futuros «de verdad», gracias a Java 8
Con Java 8, entre otras muchas cosas, como los streams, lambdas, etc, se añadió la clase CompletableFuture (entre otras), con la que podemos en Java, por fin, tener unos futuros potentes, prácticamente al nivel de los de Scala. Es parecida funcionalmente a la clase ListenableFuture de la librería Guava.
Esta nueva clase implementa la interfaz Future que ya vimos, pero aporta muchísima más funcionalidad. Implementa también la nueva interfaz CompletionStage que es la que tiene todos los nuevos métodos.

Vamos a ver qué podemos hacer con esta clase. Tiene muchas posibilidades. Vamos a verlas directamente con ejemplos sencillos ¡Al lio!

Creación
Veamos tres ejemplos:

CompletableFuture<String> future = CompletableFuture.completedFuture("Prueba");
...
CompletableFuture<String> future = new CompletableFuture<>();
// other stuff
future.complete("Completado!");
...
CompletableFuture<Void> futureAsync = CompletableFuture.runAsync(() -> {
    // Some stuff...
});

Con la nueva clase CompletableFuture podemos crear el futuro ya directamente «completo», con el valor, con el método estático ‘completedFuture‘.
También podemos crearla directamente, con un new. Más adelante podemos simplemente «completar» el futuro, con el método ‘complete‘, como vemos en el segundo ejemplo.
Y, como es lógico, podemos crear un futuro que ejecute un pequeño proceso, como vemos en el tercer ejemplo, pasándole directamente una función lambda. En el usamos el método ‘runAsync‘, aunque tenemos más opciones (las veremos más adelante).

Variedad de métodos
Es importante explicar, para no liarse al ver el API de la clase CompletableFuture, que nos proporciona muchos métodos, casi todos ellos en tres «versiones». Por ejemplo, para el método ‘thenAccept‘ tenemos:

CompletableFuture<Void> thenAccept(Consumer<? super T> action);

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action);

CompletableFuture<Void> thenAcceptAsync(Consumer<? super T> action, Executor executor);

– La versión sin «Async» ejecutará la función lambda en el mismo thread que el que la llama.
– La versión con «Async» ejecutará la ejecutara en un thread nuevo, usando para ello el Executor por defecto.
– La versión con «Async» y con parámetro ExecutorService, la ejecutará en un thread nuevo usando el Executor pasado como parámetro.

Lo normal si buscamos un buen sistema ‘reactivo’ sería usar siempre la versión con «Async», pero eso ya depende de cada situación. Por ejemplo, cuando encadenamos operaciones sobre un futuro, las siguientes puede ser lógico no usar las «Async» para que se sigan ejecutando en el miso thread que la anterior operación.

Métodos run y supply (y runAsync y supplyAsync)
Hemos visto que en la creación de CompletableFutures, la manera más común será pasándole una función lambda. Para ello tendremos dos métodos, ‘runAsync‘ y supplyAync‘ (y su correspondiente versión con parámetro ExecutorService).

CompletableFuture<Void> futureRunAsync = CompletableFuture.runAsync(() -> {
    LOGGER.info("Comenzando runAsync...");
    Sleep.sleepSeconds(3);
    LOGGER.info("Terminado runAsync!");
}, executor);

CompletableFuture<String> futureSupplyAsync = CompletableFuture.supplyAsync(() -> {
    LOGGER.info("Comenzando supplyAsync...");
    Sleep.sleepSeconds(3);
    LOGGER.info("Terminado supplyAsync!");
    return "Terminado";
}, executor);

Básicamente, estos métodos son equivalentes a lo que hacíamos en el post anterior para crear un Future, llamando al método ‘submit‘ del ExecutorService, pasándole nuestra lambda a ejecutar.

¿Cómo obtenemos el resultado?
Para obtener el resultado tenemos siempre la posibilidad de «bloquearnos» en el futuro, llamando al método ‘get‘ (CompletableFuture implementa también la interfaz Future).

Por ejemplo, para obtener el resultado del anterior futuro creado con ‘supplyAsync’:

LOGGER.info("Resultado bloqueando supplyAsync: " + futureSupplyAsync.get());

También nos ofrece un nuevo método ‘getNow‘ el cual lo que hace es, si el futuro se ha completado, devolver el resultado, y si no, devolver un parámetro que le pasamos a ese método.

Pero ninguna de esas opciones es lo que queremos. Ahora vamos a obtener el resultado sin bloquearnos, añadiendo algo así como un listener o callback a nuestro futuro.

CompletableFuture<String> futureSupplyAsync = CompletableFuture.supplyAsync(() -> {
    LOGGER.info("Comenzando supplyAsync...");
    Sleep.sleepSeconds(3);
    LOGGER.info("Terminado supplyAsync!");
    return "Terminado";
}, executor);

futureAsync.whenCompleteAsync((s, e) -> LOGGER.info("Resultado supplyAsync: " + s),
        executor);
LOGGER.info("Terminado main thread");

La llamada al método ‘whenCompleteAsync‘ realmente no se bloquea en el futuro. Lo que hace es «registrar» en el futuro que cuando se complete, ejecute esa función lambda.
El resultado por consola de este último ejemplo sería así:

19:07:34.030 [pool-1-thread-1] INFO CompletableFutureTest - Comenzando supplyAsync...
19:07:34.030 [main] INFO CompletableFutureTest - Terminado main thread
19:07:37.034 [pool-1-thread-1] INFO CompletableFutureTest - Terminado supplyAsync!
19:07:37.035 [pool-1-thread-2] INFO CompletableFutureTest - Resultado supplyAsync: Terminado

Como vemos, el último log «Terminado main thread» se muestra rapidamente, antes que el log que mostramos en la lambda del ‘whenCompleteAsync’.

Y hasta aquí por hoy. En el próximo post veremos a fondo el uso de callbacks, como este último, así como otras funcionalidades para combinar futuros.
Todo el código de los ejemplos está en mi github, proyecto completablefuture-example.