В параллельном программировании нам часто приходится обрабатывать многопоточные задачи, которые часто имеют зависимости, асинхронный характер и требуют получения результатов после завершения всех задач. В Java 8 появился класс CompletableFuture, который представляет новую модель программирования, которая позволяет нам решать параллельные задачи в стиле функционального программирования, что значительно улучшает читаемость и простоту кода.
В этом блоге мы углубимся в принципы проектирования CompletableFuture, подробно представим, как использовать его API, и на конкретных примерах продемонстрируем его применение при параллельной обработке задач. Мы также рассмотрим его сравнение с Future, CompletableFuture и другими инструментами в пакете параллелизма Java, чтобы понять, когда и почему вам нужно использовать CompletableFuture. Давайте вместе отправимся в это непростое учебное путешествие!
Прежде чем начать, давайте рассмотрим историю развития языка Java.
С момента своего создания Java стремилась предоставить мощные инструменты для параллельного и асинхронного программирования. В начальный период JDK 1.4 разработчикам Java приходилось использовать низкоуровневые инструменты управления параллелизмом, такие как синхронизация и ожидание/уведомление. Хотя эти инструменты являются мощными, их очень сложно использовать.
Чтобы упростить параллельное программирование, Java представила пакет JUC в JDK 1.5, который предоставляет ряд расширенных инструментов управления параллелизмом, таких как ExecutorService, Semaphore и Future.
Давайте сначала посмотрим, как Future выполняет асинхронное программирование.
Прежде чем начать наше путешествие, давайте взглянем на это требование.
Допустим, вы работаете в онлайн-туристическом агентстве, где пользователи могут искать и бронировать авиабилеты и отели. Вот ряд операций, которые вам необходимо выполнить:
Для нужд восстановления,первый,Нам нужно создать ExecutorService.,:
ExecutorService executor = Executors.newFixedThreadPool(10);
// 1. Проверить авиабилеты
Future<List<Flight>> futureFlights = executor.submit(() -> searchFlights(searchCondition));
List<Flight> flights;
try {
flights = futureFlights.get();
} catch (InterruptedException | ExecutionException e) {
// Обработка исключений
}
// 2. Поиск отелей по каждому авиабилету
List<Future<List<Hotel>>> futureHotelsList = new ArrayList<>();
for (Flight flight : flights) {
Future<List<Hotel>> futureHotels = executor.submit(() -> searchHotels(flight));
futureHotelsList.add(futureHotels);
}
List<Future<List<TravelPackage>>> futureTravelPackagesList = new ArrayList<>();
for (Future<List<Hotel>> futureHotels : futureHotelsList) {
List<Hotel> hotels;
try {
hotels = futureHotels.get();
} catch (InterruptedException | ExecutionException e) {
// Обработка исключений
}
// 3. Рассчитайте общую стоимость каждого авиабилета и отеля в сочетании
for (Hotel hotel : hotels) {
Future<List<TravelPackage>> futureTravelPackages = executor.submit(() -> calculatePrices(flight, hotel));
futureTravelPackagesList.add(futureTravelPackages);
}
}
List<TravelPackage> travelPackages = new ArrayList<>();
for (Future<List<TravelPackage>> futureTravelPackages : futureTravelPackagesList) {
try {
travelPackages.addAll(futureTravelPackages.get());
} catch (InterruptedException | ExecutionException e) {
// Обработка исключений
}
}
// 4. Сортировать все турпакеты по цене
travelPackages.sort(Comparator.comparing(TravelPackage::getPrice));
// 5. Возврат результатов
return travelPackages;
Требование наконец выполнено (вздох). В данный момент, если вы родились в JDK8+, будете ли вы чувствовать то же самое? Это без обработки исключений и без большого количества бизнес-кода. Хорошо, теперь помедленнее и продолжим. Что мы можем наиболее интуитивно увидеть из приведенного выше кода?
Каким бы совершенным ни было выражение, оно не может конкурировать с примером, который заставляет вас чувствовать интуитивно. Далее разберем недостатки Future.
сверху Future
На примере мы отчетливо видим следующие недостатки:
Future извыполнить заставляет нас существовать каждый Future Когда закончите, начните другой Future, что делает код похожим на постоянно вложенные обратные вызовы. Такой подход может затруднить чтение и понимание кода, особенно когда речь идет о сложности и асинхронности. время задачи.
Хотя Future.get()
Можно получить задание по результатам, но это операция блокировки,Это предотвратит выполнение текущего потока,пока асинхронная операция не завершится. Этот дизайн подходит для неблокирующего и асинхронного программирования.,да очень неудовлетворительно из.
в использовании Future При обработке асинхронных задач в цепочке, если в каком-то звене посередине возникает ошибка, сложность обработки ошибок сильно возрастает. Это нужно сделать в каждом Future Обработка добавляется при обработке из исключений кода, что делает код более сложным и трудным в сопровождении.
использовать Future Трудно визуально представить зависимости между задачами. Например, вы не можете использовать Future Чтобы указать, что задачу необходимо запустить до завершения двух других задач или что несколько задач могут выполняться параллельно, но должны быть завершены до выполнения общей задачи. Это ограничение делает Future существоватьсправиться со сложностьюизасинхронныйцепочка Становится очень сложно, когда задача.
поэтому,Чтобы решить эти проблемы,CompletableFuture
был представлен Java 8. Предоставляет более мощные и гибкие инструменты асинхронного программирования.
Аналогично приведенному выше примеру, давайте посмотрим на его код:
CompletableFuture.supplyAsync(() -> searchFlights()) // 1. Проверить авиабилеты
.thenCompose(flights -> { // 2. Поиск отелей по каждому авиабилету
List<CompletableFuture<List<TravelPackage>>> travelPackageFutures = flights.stream()
.map(flight -> CompletableFuture.supplyAsync(() -> searchHotels(flight)) // Поиск отеля
.thenCompose(hotels -> { // 3. Рассчитайте общую стоимость каждого авиабилета и отеля в сочетании
List<CompletableFuture<TravelPackage>> packageFutures = hotels.stream()
.map(hotel -> CompletableFuture.supplyAsync(() -> new TravelPackage(flight, hotel)))
.collect(Collectors.toList());
return CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> packageFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
}))
.collect(Collectors.toList());
return CompletableFuture.allOf(travelPackageFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> travelPackageFutures.stream()
.flatMap(future -> future.join().stream())
.collect(Collectors.toList()));
})
.thenApply(travelPackages -> { // 4. Сортировать все турпакеты по цене
return travelPackages.stream()
.sorted(Comparator.comparing(TravelPackage::getPrice))
.collect(Collectors.toList());
})
.exceptionally(e -> { // Обработка всех исключений
// Обработка исключений
return null;
});
Вы можете на первый взгляд,Это кажется более сложным, чем Future. Но даактуальносуществовать бизнес,На самом деле читать легче. каждый шаг,За каждой операцией можно следитьthenCompose
Спуститься。
CompletableFuture
да Java 8 из вводится для решения проблемы в использовании Future
Я столкнулся с некоторыми проблемами. Он восстанавливает Future
и CompletionStage
интерфейс и предоставляет большое количество методов, которые помогут вам лучше контролировать и управлять асинхронными операциями. Давайте проанализируем его преимущества на основе приведенных выше примеров:
насиспользовать CompletableFuture
в supplyAsync
Метод для запуска асинхронного запроса рейсов:
CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() ->
searchFlights(source, destination));
Затем мы используем thenCompose
Метод будет запрашивать рейсыи Поиск отеляиз Операционная компаниясуществовать Вместе:
CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->
CompletableFuture.supplyAsync(() -> flights.stream()
.map(flight -> searchHotels(flight))
.collect(Collectors.toList())
));
выше thenCompose
методда Нетблокироватьиз,Прямо сейчас Поиск отеляиз Операция будет немедленной Прямо сейчасначинать,Нет необходимости ждать завершения операции запроса рейса.
насиспользовать exceptionally
метод Обработка запросов на полетыи Поиск отеляможет произойти во времяизаномальный:
CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCompose(flights ->
CompletableFuture.supplyAsync(() -> flights.stream()
.map(flight -> searchHotels(flight))
.collect(Collectors.toList())
)).exceptionally(ex -> {
System.out.println("Ошибка: " + ex);
return new ArrayList<>();
});
насиспользовать CompletableFuture.allOf
метод, указывающий, что все задачи по расчету туристических пакетов должны быть выполнены перед началом сортировки:
CompletableFuture<List<TravelPackage>> sortedTravelPackagesFuture = travelPackagesFuture.thenApply(travelPackages ->
travelPackages.stream()
.flatMap(List::stream)
.sorted(Comparator.comparing(TravelPackage::getPrice))
.collect(Collectors.toList())
);
Сделайте паузу на минуту и рассмотрите приведенный выше пример. Давайте сосредоточимся на сравнении двух
Future
Существующий способ получить результаты в какой-то момент в будущем существует, но это основная проблема дасуществовать. При получении результатов, если результаты еще не готовы, это приведет к блокировке. Также используйте isDone()
Метод опроса также не является хорошим выбором, поскольку он потребляет ресурсы процессора.CompletableFuture
предоставил Нетблокироватьиз Получение результатаметод,thenApply
, thenAccept
, thenRun
Метод ожидания может выполняться автоматически, когда результат готов, поэтому нам не нужно вручную проверять и ждать результата.Future
цепная не поддерживается операция,нас Не могущийсуществовать Future
Автоматически запускать другую задачу после завершения.CompletableFuture
предоставил thenApply
, thenAccept
, thenRun
, thenCompose
, thenCombine
Ряд методов используется для автоматического выполнения другой задачи после завершения текущей задачи, образуя цепочку. задач。Future
в, можно только пройти get()
методполучатьаномальный,Но этот метод заблокирует поток,пока задача не будет выполнена. Гибкий контроль выполнения задачCompletableFuture
предоставил exceptionally
, handle
ждатьметод,Мы можем использовать эти методы для обеспечения резервного копирования результатов в случае возникновения исключения.,Или обрабатывать исключения. сочетание задачFuture
Никакой возможности объединения задач не предусмотрено.CompletableFuture
предоставил allOf
, anyOf
, thenCombine
ждатьметод,С помощью этих методов мы можем выразить параллельную связь между задачами.,Или отношения конвергенции.Future
существовать Относительно жесткое выполнение задач,Мы не можем отменить задачу на полпути,Также невозможно выполнить определенные операции после завершения задачи.CompletableFuture
предоставил cancel
, complete
ждатьметод,Используется для отмены задач на полпути,Или закончить задание раньше времени. также,whenComplete
и whenCompleteAsync
методпозволятьнассуществоватьв конце миссии,независимо от успеха или неудачи,Все могут выполнять определенные операции.Если бы интервьюер спросил, в чем разница между ними, вы бы ответили? Дальше давайте проанализируем
Чтобы вам было менее непонятно для понимания, я расскажу вам примеры из жизни:
мы можем поставить CompletableFuture
Думайте об этом как о цехе конвейерного производства. Обработка каждой части (задачи) завершена (Будущее Завершение) может инициировать следующий этап работы (операция следующего шага), а завершение каждого этапа работы будет уведомлять цех (Будущее) для начала следующего этапа производства. Этот процесс подобен сборочной линии, где каждый шаг автоматически переходит к следующему.
Помня об этой сцене, мы продолжаем смотреть вниз.
CompletableFuture
В исходном коде есть внутренний класс Completion
,Представляет собой цепочку задач в задаче. Всякий раз, когда задача выполнена,Он попытается выполнить задачи, которые от него зависят.,Точно так же, как рабочие на сборочной линии выполнили часть своей работы.,Полуфабрикат передается следующему работнику.
abstract static class Completion extends ForkJoinTask<Void> implements Runnable, AsynchronousCompletionTask {
// ...
}
CompletableFuture
Сам по себе контейнер результатов,Он содержит результат выполнения,Включая обычные результаты вычислений или исключения во время выполнения.
volatile Object result; // The outcome of the computation
Все асинхронные задачи будут отправлены в ForkJoinPool.commonPool()
Исполнение в , конечно, можно указать и собственное Executor
для выполнения задач.
static final ForkJoinPool ASYNC_POOL = ForkJoinPool.commonPool();
Когда задача выполнена,CompletableFuture
пройдет tryFire
Метод запускает следующую задачу, связанную с ним. Это похоже на то, что после того, как рабочий выполнил часть работы, он уведомляет следующего рабочего на конвейере, чтобы тот продолжил выполнение следующей работы.
final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
// ...
if (a != null && a.stack != null) {
if (mode < 0)
a.cleanStack();
else
a.postComplete();
}
if (b != null && b.stack != null) {
if (mode < 0)
b.cleanStack();
else
b.postComplete();
}
return null;
}
да Не понимаю немного? Могу с уверенностью сказать, что вы превзошли 80% людей!
Будьте осторожны и вы обязательно это заметите,CompletableFutureбольшинствометод Всевыполнитьв одномCompletionStage
интерфейс。конечно,Я существую, могу попробовать все методы для тебя здесь,Но вы определенно почувствуете себя очень уставшим от просмотра. так! Я объясню вам все методы, использованные в приведенных выше требованиях.,В остальном, пожалуйста, объедините это с онлайн-кейсом.
Этот метод используется для асинхронного выполнения функции снабжения.,и возвращаетCompletableFuture
объект。существоватьнасиз В примере,Этот метод используется для запуска асинхронной задачи по поиску рейсов.
CompletableFuture<List<Flight>> flightsFuture = CompletableFuture.supplyAsync(() -> searchFlights(destination));
этотметод Используется для связи несколькихCompletableFuture
объект,Сформируйте цепочку операций. Когда операция завершена,thenCompose()
методбудет действоватьиз Результат передается следующей операции。существоватьнасиз В примере,Этот метод используется для поиска отеля после нахождения рейса.
CompletableFuture<List<Hotel>> hotelsFuture = flightsFuture.thenCompose(flights -> CompletableFuture.supplyAsync(() -> searchHotels(destination)));
этотметодиспользуется для объединения двух независимыхизCompletableFuture
объектиз Результаты объединены в один результат。существоватьнасиз В примере,Этот метод используется для объединения результатов поиска авиабилетов и отелей в туристический пакет.
CompletableFuture<List<TravelPackage>> travelPackagesFuture = flightsFuture.thenCombine(hotelsFuture, (flights, hotels) -> createTravelPackages(flights, hotels));
этотметодсуществоватьCompletableFuture
объект После завершения расчета выполните функцию потребления,Получать результаты расчета в виде параметров,Не возвращает новое рассчитанное значение. существуем из примера,Этот метод используется для распечатки всех туристических пакетов.
travelPackagesFuture.thenAccept(travelPackages -> printTravelPackages(travelPackages));
этотметодиспользуется для преобразованияCompletableFuture
объектиз Массивы объединяются в новыйизCompletableFuture
объект,Этот новыйизCompletableFuture
объектсуществоватьвсе в массивеизCompletableFuture
объект Все完成时完成。существоватьнасиз В примере,Этот метод используется для объединения каждого рейса с каждой существующей комбинацией комбинации отелей (также с туристическим пакетом).
CompletableFuture.allOf(packageFutures.toArray(new CompletableFuture[0]))
.thenApply(v -> packageFutures.stream()
.map(CompletableFuture::join)
.collect(Collectors.toList()));
этотметодиспользуется дляCompletableFuture
из Преобразуйте результат,и возвращаетновыйизCompletableFuture
объект。существоватьнасиз В примере,Этот метод используется для сортировки туристических пакетов по цене.
.thenApply(travelPackages -> { // 4. Сортировать все турпакеты по цене
return travelPackages.stream()
.sorted(Comparator.comparing(TravelPackage::getPrice))
.collect(Collectors.toList());
})
этотметоддля обработкиCompletableFuture
изаномальный Состояние。еслиCompletableFuture
извыброшено во время расчетааномальный,Затем будет вызван этот метод. существуем из примера,Этот метод используется для обработки любых исключений, которые могут возникнуть во время запроса туристических пакетов.
.exceptionally(e -> { // Обработка всех исключений
// Обработка исключений
return null;
});
Конечно, этих методов вам достаточно. Если это требование не сложнее, чем я думал, то вы считаетесь потрясающим. О нет, это считается извращенным требованием. Можете ли вы продолжить писать историю?
JDK 1.5 из Future решено много Параллельное программирование из сложности, но оно все же имеет некоторые ограничения. Будущее могу описать только одинасинхронныйдействовать,Он не может описать асинхронную операцию, состоящую из нескольких шагов. Например,Когда вам необходимо обработать бизнес-процесс, состоящий из нескольких последовательностей асинхронных операций.,Вы можете обнаружить, что ваш код перегружен сложной логикой обратного вызова.,Вот и вседалюди часто говорятизад обратного вызова。также,Future Не существует эффективного способа обработки результатов асинхронных операций. Можно использовать только блокирующие вызовы. get() метод получения результата.
Чтобы решить эти проблемы, Java существовать JDK 1.8 введено в CompletableFuture。CompletableFuture да Future из Расширенная версия, она может не только представлять асинхронную операцию, но и передавать thenCompose(), thenCombine(), allOf() и другие методы для описания асинхронной операции, состоящей из нескольких шагов. С помощью этих методов CompletableFuture может описывать сложные асинхронные бизнес-процессы плавным и связным образом.,Это значительно снижает сложность асинхронного программирования.
Прочитав статью, сможете ли вы ответить на эти вопросы? Я существую, оставляю сообщение, ожидающее вас.
хорошо,Это здесь,нас Давайте рассмотрим。первый,Я познакомлю вас с хроникой мира параллельной обработки Java. Сразу после этого,Я познакомлю вас с древними людьми, которые часто используют это в будущем. После того, как почувствовал что-то плохое по этому поводу,Я возвращаю вас в CompletableFuture . Тогда я глубоко узнал всю картину. такжеиспользоватьметод。наконец,Надеюсь, ты сможешь дочитать до этого места,Не забудьте ответить на вопросы.