Co to są egzekutory – pule wątków?
Egzektory, czyli pule wątków (ang. Executors, Thread Pools) to mechanizm w Javie, który ułatwia zarządzanie wątkami poprzez tworzenie i wykorzystywanie pul wątków do wykonywania zadań. Pozwalają one na kontrolowane uruchamianie wielu zadań asynchronicznie w ramach ustalonej puli wątków, co może poprawić wydajność i responsywność aplikacji.
Podstawowym celem egzekutorów jest minimalizacja kosztów przełączania kontekstu (ang. context switching) oraz ograniczenie liczby utworzonych wątków, co może być przydatne w aplikacjach, gdzie wykonanie dużej ilości zadań asynchronicznych jest wymagane.
Główne zalety egzekutorów/puli wątków to:
-
Reużywalność wątków: Istniejące wątki w puli mogą być używane wielokrotnie do wykonywania różnych zadań, co eliminuje konieczność ciągłego tworzenia i usuwania wątków.
-
Zarządzanie zasobami: Egzekutory pozwalają na kontrolowane zarządzanie zasobami, takimi jak maksymalna liczba wątków w puli, czas życia wątków, opóźnienia itp.
-
Uproszczenie kodu: Użycie egzekutorów może uprościć kod, ponieważ programiści nie muszą samodzielnie zarządzać wątkami i synchronizacją dostępu do współdzielonych zasobów.
-
Monitorowanie postępu: Egzekutory często oferują mechanizmy monitorowania postępu wykonywania zadań, takie jak obiekty Future, które umożliwiają kontrolę statusu i wyników zadań.
-
Zabezpieczenie przed przepełnieniem zadań: Egzekutory mogą obsługiwać sytuacje, w których liczba zadań przekracza dostępną pulę wątków, poprzez kolejkowanie zadań do wykonania w przyszłości.
ThreadPoolExecutor: Jest to jedna z najbardziej elastycznych implementacji ExecutorService. Pozwala ona na tworzenie puli wątków o stałej liczbie wątków lub o zmiennej liczbie wątków, w zależności od potrzeb. Można dostosować wiele parametrów, takich jak maksymalna liczba wątków w puli, czas życia wątków, strategia kolejkowania zadań itp.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3); // Tworzenie puli wątków o stałej liczbie (3) wątków
// Dodawanie zadań do wykonania
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println("Wykonywanie zadania w wątku: " + Thread.currentThread().getName());
});
}
executor.shutdown(); // Zamknięcie egzekutora po zakończeniu dodawania zadań
}
}
ScheduledThreadPoolExecutor: Ta implementacja jest rozszerzeniem ThreadPoolExecutor i dodaje możliwość planowania zadań do wykonania w przyszłości lub cyklicznego wykonywania zadań w określonych odstępach czasu.
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
public class Main {
public static void main(String[] args) {
ScheduledExecutorService executor = Executors.newScheduledThreadPool(1); // Tworzenie puli wątków z planowaniem (1 wątek)
// Planowanie zadania do wykonania po 3 sekundach
executor.schedule(() -> {
System.out.println("Zadanie wykonane po 3 sekundach w wątku: " + Thread.currentThread().getName());
}, 3, TimeUnit.SECONDS);
executor.shutdown(); // Zamknięcie egzekutora po zakończeniu planowanego zadania
}
}
FixedThreadPool: Jest to uproszczona wersja ThreadPoolExecutor, która tworzy pulę wątków o stałej liczbie wątków. Wszystkie wątki w puli są utworzone, gdy jest tworzona instancja FixedThreadPool, i nie są usuwane z puli, chyba że jest to konieczne ze względów na awarię.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newFixedThreadPool(3); // Tworzenie puli wątków o stałej liczbie (3) wątków
// Dodawanie zadań do wykonania
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println("Wykonywanie zadania w wątku: " + Thread.currentThread().getName());
});
}
executor.shutdown(); // Zamknięcie egzekutora po zakończeniu dodawania zadań
}
}
CachedThreadPool: Ta implementacja tworzy pulę wątków, które są tworzone i usuwane dynamicznie w zależności od obciążenia. Jeśli wątek w puli przez jakiś czas nie jest używany, zostanie usunięty. Jeśli natomiast istniejące wątki nie są wystarczające do obsługi nowych zadań, będą tworzone nowe wątki.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool(); // Tworzenie puli wątków z dynamiczną liczbą wątków
// Dodawanie zadań do wykonania
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println("Wykonywanie zadania w wątku: " + Thread.currentThread().getName());
});
}
executor.shutdown(); // Zamknięcie egzekutora po zakończeniu dodawania zadań
}
}
SingleThreadExecutor: Jest to implementacja, która tworzy pulę wątków zawierającą tylko jeden wątek. Jest to przydatne, gdy chcemy upewnić się, że wszystkie zadania są wykonywane sekwencyjnie, w jednym wątku.
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
public class Main {
public static void main(String[] args) {
ExecutorService executor = Executors.newSingleThreadExecutor(); // Tworzenie puli wątków zawierającej tylko jeden wątek
// Dodawanie zadań do wykonania
for (int i = 0; i < 5; i++) {
executor.submit(() -> {
System.out.println("Wykonywanie zadania w wątku: " + Thread.currentThread().getName());
});
}
executor.shutdown(); // Zamknięcie egzekutora po zakończeniu dodawania zadań
}
}
ForkJoinPool jest specjalną implementacją ExecutorService
wprowadzoną w Javie w wersji 7. Jest używana głównie do wykonywania zadań, które mogą być podzielone na mniejsze podzadania, a następnie wykonywane równolegle. Jest to szczególnie przydatne w algorytmach dziel i zwyciężaj, takich jak sortowanie, przeszukiwanie, przetwarzanie rekursywne itp.
import java.util.concurrent.RecursiveTask;
import java.util.concurrent.ForkJoinPool;
public class Main {
public static void main(String[] args) {
// Przykładowa tablica danych
int[] array = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
// Tworzenie obiektu zadania ForkJoinTask
SumTask task = new SumTask(array, 0, array.length);
// Tworzenie puli wątków ForkJoinPool
ForkJoinPool pool = new ForkJoinPool();
// Wywołanie zadania w puli wątków
int result = pool.invoke(task);
System.out.println("Suma elementów tablicy: " + result);
}
}
// Klasa reprezentująca zadanie do zsumowania elementów tablicy
class SumTask extends RecursiveTask {
private int[] array;
private int start;
private int end;
public SumTask(int[] array, int start, int end) {
this.array = array;
this.start = start;
this.end = end;
}
@Override
protected Integer compute() {
// Jeśli liczba elementów w tablicy jest mniejsza niż pewna wartość graniczna,
// wykonaj obliczenia sekwencyjnie
if (end - start <= 3) {
int sum = 0;
for (int i = start; i < end; i++) {
sum += array[i];
}
return sum;
} else {
// Podziel zadanie na mniejsze podzadania
int mid = (start + end) / 2;
SumTask leftTask = new SumTask(array, start, mid);
SumTask rightTask = new SumTask(array, mid, end);
// Wykonaj podzadania równolegle
leftTask.fork();
int rightResult = rightTask.compute();
// Pobierz wynik z lewego podzadania i zsumuj z wynikiem z prawego podzadania
int leftResult = leftTask.join();
return leftResult + rightResult;
}
}
}
W tym przykładzie tworzymy pulę wątków ForkJoinPool
, a następnie używamy jej do wywołania zadania SumTask
, które zsumuje elementy tablicy. Kluczową klasą jest SumTask
, która rozszerza RecursiveTask<Integer>
i rekursywnie dzieli zadanie na mniejsze podzadania, a następnie wykonuje je równolegle. W rezultacie uzyskujemy równoległe sumowanie elementów tablicy, co może poprawić wydajność w przypadku dużych zestawów danych.