Java Concurrency: CountDownLatch vs CyclicBarrier vs Phaser
Ich spreche seit vielen Jahren in meinem Extreme-Java-Concurrency-Performance-Kurs über Phaser
und habe noch keinen Teilnehmer gefunden, der mir sagt: „Oh ja, das ist eine tolle Klasse, wir benutzen sie ständig!“ Die Teilnehmer haben in der Regel schon von CountDownLatch
und vielleicht CyclicBarrier
gehört, aber selten von Phaser
. Wie kann das sein, wenn Phaser seit Java 7 existiert und die Synchronisierung zwischen Threads so viel einfacher macht als andere ähnliche Konstrukte?
CountDownLatch
ist einfach zu verstehen, aber schwierig zu handhaben. Phaser
hingegen ist schwer zu verstehen, aber einfach anzuwenden. Kürzlich unterrichtete ich eine Gruppe cleverer Programmierer in Athen. Eine der vielen klugen Fragen lautete: „Wie können wir eine Reihe von Aufgaben koordinieren, die alle unterschiedlich viel Zeit in Anspruch nehmen?“ Meine erste, spontane Antwort lautete, CompletionStage
zu benutzen. Aber je intensiver wir uns mit dem Problem auseinandersetzen, desto besser schien Phaser
zu passen. Im Kurs habe ich zuerst mittels Phaser
codiert. Jemand fragte dann, ob dasselbe mit CountDownLatch
möglich wäre. Also haben wir auch das kodiert. Hier werde ich es umgekehrt machen. Wir beginnen mit CountDownLatch
und refaktorieren dann, um stattdessen Phaser
zu verwenden.
Erst mal aufteilen
Wir werden fünf Batches ausführen. Jedes Batch besteht aus drei Aufgaben, die zwischen 500 Millisekunden und 3 Sekunden dauern. Aufgaben innerhalb eines Batches sollten alle gleichzeitig beginnen. Um den Code leichter lesbar zu machen, definieren wir eine gemeinsame Oberklasse LockStepExample
:
import java.util.concurrent.*; public abstract class LockStepExample { protected final static int TASKS_PER_BATCH = 3; protected final static int BATCHES = 5; protected final void doTask(int batch) { System.out.printf("Task start %d%n", batch); int ms = ThreadLocalRandom.current().nextInt(500, 3000); try { Thread.sleep(ms); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.printf("Task in batch %d took %dms%n", batch, ms); } }
Als Nächstes erweitern wir den Code mit unserem LockStepCountDownLatch
. Da das CountDownLatch
nicht zurückgesetzt werden kann, müssen wir für jeden Batch ein neues latch
erstellen. latch
verfügt über ein ziemlich veraltetes Interrupt Handling. Es gibt keine Möglichkeit, den Interrupt so lange zu speichern, bis wir fertig sind, wie wir es mit Semaphore.acquireUninterruptibly()
oder Lock.lock()
tun können. Der Code in task()
ist wichtig. Wir rufen zuerst latch.countDown()
auf, um zu signalisieren, dass unser Thread bereit zur Ausführung ist. Danach rufen wir latch.await()
auf, müssen aber die InterruptedException
selbst verwalten. Wir tun dies, indem wir den unterbrochenen Zustand mit Thread.interrupted()
löschen. Wir rufen dann weiter latch.await()
auf, bis wir kontrolliert beenden können.
Wenn wir während weiterer latch.await()
-Ausführungen erneut unterbrochen werden, merken wir uns den Zustand, warten aber weiter. Sobald wir die while(true)
-Schleife mit dem break
beenden, unterbrechen wir uns letztendlich selbst, wenn wir an einem Punkt in unserem wartenden Code unterbrochen wurden. interrupt()
stoppt den Thread nicht, es ändert lediglich den Zustand zu unterbrochen. Danach rufen wir rufen die Methode doTask()
auf und übergeben ihr die Batch-Nummer.
import java.util.concurrent.*; import static java.util.concurrent.Executors.newFixedThreadPool; public class LockStepCountDownLatch extends LockStepExample { public static void main(String... args) { LockStepCountDownLatch lse = new LockStepCountDownLatch(); ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH); for (int batch = 0; batch < BATCHES; batch++) { // We need a new CountDownLatch per batch, since they // cannot be reset to their initial value CountDownLatch latch = new CountDownLatch(TASKS_PER_BATCH); for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(latch, batchNumber)); } } pool.shutdown(); } public void task(CountDownLatch latch, int batch) { latch.countDown(); boolean interrupted = Thread.interrupted(); while (true) { try { latch.await(); break; } catch (InterruptedException e) { interrupted = true; } } if (interrupted) Thread.currentThread().interrupt(); doTask(batch); } }
Unsere erste Herausforderung bestand darin, für jedes Batch einen neuen CountDownLatch
zu erstellen. Dies können wir durch den Einsatz einer CyclicBarrier
vermeiden. Das erlaubt es uns, die barrier
wiederzuverwenden, aber das Interrupt Handling ist immer noch aus dem letzten Jahrtausend.
import java.util.concurrent.*; import static java.util.concurrent.Executors.*; public class LockStepCyclicBarrier extends LockStepExample { public static void main(String... args) { LockStepCyclicBarrier lse = new LockStepCyclicBarrier(); ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH); CyclicBarrier barrier = new CyclicBarrier(TASKS_PER_BATCH); for (int batch = 0; batch < BATCHES; batch++) { for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(barrier, batchNumber)); } } pool.shutdown(); } public void task(CyclicBarrier barrier, int batch) { boolean interrupted = Thread.interrupted(); while (true) { try { barrier.await(); break; } catch (InterruptedException e) { interrupted = true; } catch (BrokenBarrierException e) { throw new AssertionError(e); } } if (interrupted) Thread.currentThread().interrupt(); doTask(batch); } }
Nun wenden wir uns dem LockStepPhaser
zu. Wir können den Phaser
für die Batches wiederverwenden, wie die CyclicBarrier
. Der Phaser
weiß auch, in welchem Batch er sich befindet, sodass wir die Batch-Nummer nicht weitergeben müssen. Und die task()
-Methode? Der gesamte komplizierte Interrupt-Handling-Code wird auf ein einzeiliges phaser.arriveAndAwaitAdvance()
reduziert. Einfach großartig!
import java.util.concurrent.*; import static java.util.concurrent.Executors.newFixedThreadPool; public class LockStepPhaser extends LockStepExample { public static void main(String... args) { LockStepPhaser lse = new LockStepPhaser(); ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH); Phaser phaser = new Phaser(TASKS_PER_BATCH); for (int batch = 0; batch < BATCHES; batch++) { for (int i = 0; i < TASKS_PER_BATCH; i++) { pool.submit(() -> lse.task(phaser)); } } pool.shutdown(); } public void task(Phaser phaser) { phaser.arriveAndAwaitAdvance(); doTask(phaser.getPhase()); } }
Wer das Rennen macht
Weitere Gründe, warum Phaser
die bevorzugte Lösung gegenüber CountDownLatch
und CyclicBarrier
ist: Er wird mit einem ManagedBlocker
realisiert; das bedeutet, dass, wenn unser Phaser einen Thread im gemeinsamen Fork-Join-Pool blockiert, ein weiterer erstellt wird, um die Parallelität auf dem gewünschten Niveau zu halten. Außerdem können wir Phaser
in einem Baum einrichten, um weniger Konflikte zu erhalten. Das ist ein bisschen kompliziert, zugegeben, aber es ist möglich. Das können wir mit den anderen Synchronisatoren wie CountDownLatch
und CyclicBarrier
nicht machen.