Blog

Extreme Java Concurrency: CountDownLatch vs CyclicBarrier vs Phaser

6 Jul 2018

Java 7 brachte uns eine neue Klasse namens Phaser, mit der wir Aktivitäten zwischen Threads koordinieren können. Sie ersetzt sowohl CountDownLatch als auch CyclicBarrier, die zwar einfacher zu verstehen, aber schwieriger zu bedienen sind.

Java Concurrency: CountDownLatch vs CyclicBarrier vs Phaser

Ich spreche seit vielen Jahren in meinem Extreme-Java-Concurrency-Performance-Kurs über Phaser und habe noch keinen Teilnehmer gefunden, der mir sagt: „Oh ja, das ist eine tolle Klasse, wir benutzen sie ständig!“ Die Teilnehmer haben in der Regel schon von CountDownLatch und vielleicht CyclicBarrier gehört, aber selten von Phaser. Wie kann das sein, wenn Phaser seit Java 7 existiert und die Synchronisierung zwischen Threads so viel einfacher macht als andere ähnliche Konstrukte?
CountDownLatch ist einfach zu verstehen, aber schwierig zu handhaben. Phaser hingegen ist schwer zu verstehen, aber einfach anzuwenden. Kürzlich unterrichtete ich eine Gruppe cleverer Programmierer in Athen. Eine der vielen klugen Fragen lautete: „Wie können wir eine Reihe von Aufgaben koordinieren, die alle unterschiedlich viel Zeit in Anspruch nehmen?“ Meine erste, spontane Antwort lautete, CompletionStage zu benutzen. Aber je intensiver wir uns mit dem Problem auseinandersetzen, desto besser schien Phaser zu passen. Im Kurs habe ich zuerst mittels Phaser codiert. Jemand fragte dann, ob dasselbe mit CountDownLatch möglich wäre. Also haben wir auch das kodiert. Hier werde ich es umgekehrt machen. Wir beginnen mit CountDownLatch und refaktorieren dann, um stattdessen Phaser zu verwenden.

Erst mal aufteilen

Wir werden fünf Batches ausführen. Jedes Batch besteht aus drei Aufgaben, die zwischen 500 Millisekunden und 3 Sekunden dauern. Aufgaben innerhalb eines Batches sollten alle gleichzeitig beginnen. Um den Code leichter lesbar zu machen, definieren wir eine gemeinsame Oberklasse LockStepExample:

import java.util.concurrent.*;
public abstract class LockStepExample {
  protected final static int TASKS_PER_BATCH = 3;
  protected final static int BATCHES = 5;
  protected final void doTask(int batch) {
    System.out.printf("Task start %d%n", batch);
    int ms = ThreadLocalRandom.current().nextInt(500, 3000);
    try {
      Thread.sleep(ms);
    } catch (InterruptedException e) {
      Thread.currentThread().interrupt();
    }
    System.out.printf("Task in batch %d took %dms%n", batch, ms);
  }
}

Als Nächstes erweitern wir den Code mit unserem LockStepCountDownLatch. Da das CountDownLatch nicht zurückgesetzt werden kann, müssen wir für jeden Batch ein neues latch erstellen. latch verfügt über ein ziemlich veraltetes Interrupt Handling. Es gibt keine Möglichkeit, den Interrupt so lange zu speichern, bis wir fertig sind, wie wir es mit Semaphore.acquireUninterruptibly() oder Lock.lock() tun können. Der Code in task() ist wichtig. Wir rufen zuerst latch.countDown() auf, um zu signalisieren, dass unser Thread bereit zur Ausführung ist. Danach rufen wir latch.await() auf, müssen aber die InterruptedException selbst verwalten. Wir tun dies, indem wir den unterbrochenen Zustand mit Thread.interrupted() löschen. Wir rufen dann weiter latch.await() auf, bis wir kontrolliert beenden können.
Wenn wir während weiterer latch.await()-Ausführungen erneut unterbrochen werden, merken wir uns den Zustand, warten aber weiter. Sobald wir die while(true)-Schleife mit dem break beenden, unterbrechen wir uns letztendlich selbst, wenn wir an einem Punkt in unserem wartenden Code unterbrochen wurden. interrupt() stoppt den Thread nicht, es ändert lediglich den Zustand zu unterbrochen. Danach rufen wir rufen die Methode doTask() auf und übergeben ihr die Batch-Nummer.

import java.util.concurrent.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class LockStepCountDownLatch extends LockStepExample {
  public static void main(String... args) {
    LockStepCountDownLatch lse = new LockStepCountDownLatch();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      // We need a new CountDownLatch per batch, since they
      // cannot be reset to their initial value
      CountDownLatch latch = new CountDownLatch(TASKS_PER_BATCH);
      for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(latch, batchNumber));
      }
    }
    pool.shutdown();
  }
  public void task(CountDownLatch latch, int batch) {
    latch.countDown();
    boolean interrupted = Thread.interrupted();
    while (true) {
      try {
        latch.await();
        break;
      } catch (InterruptedException e) {
        interrupted = true;
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
    doTask(batch);
  }
}

Unsere erste Herausforderung bestand darin, für jedes Batch einen neuen CountDownLatch zu erstellen. Dies können wir durch den Einsatz einer CyclicBarrier vermeiden. Das erlaubt es uns, die barrier wiederzuverwenden, aber das Interrupt Handling ist immer noch aus dem letzten Jahrtausend.

import java.util.concurrent.*;
import static java.util.concurrent.Executors.*;
public class LockStepCyclicBarrier extends LockStepExample {
  public static void main(String... args) {
    LockStepCyclicBarrier lse = new LockStepCyclicBarrier();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    CyclicBarrier barrier = new CyclicBarrier(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      for (int i = 0; i < TASKS_PER_BATCH; i++) { int batchNumber = batch + 1; pool.submit(() -> lse.task(barrier, batchNumber));
      }
    }
    pool.shutdown();
  }
  public void task(CyclicBarrier barrier, int batch) {
    boolean interrupted = Thread.interrupted();
    while (true) {
      try {
        barrier.await();
        break;
      } catch (InterruptedException e) {
        interrupted = true;
      } catch (BrokenBarrierException e) {
        throw new AssertionError(e);
      }
    }
    if (interrupted) Thread.currentThread().interrupt();
    doTask(batch);
  }
}

Nun wenden wir uns dem LockStepPhaser zu. Wir können den Phaser für die Batches wiederverwenden, wie die CyclicBarrier. Der Phaser weiß auch, in welchem Batch er sich befindet, sodass wir die Batch-Nummer nicht weitergeben müssen. Und die task()-Methode? Der gesamte komplizierte Interrupt-Handling-Code wird auf ein einzeiliges phaser.arriveAndAwaitAdvance() reduziert. Einfach großartig!

import java.util.concurrent.*;
import static java.util.concurrent.Executors.newFixedThreadPool;
public class LockStepPhaser extends LockStepExample {
  public static void main(String... args) {
    LockStepPhaser lse = new LockStepPhaser();
    ExecutorService pool = newFixedThreadPool(TASKS_PER_BATCH);
    Phaser phaser = new Phaser(TASKS_PER_BATCH);
    for (int batch = 0; batch < BATCHES; batch++) {
      for (int i = 0; i < TASKS_PER_BATCH; i++) { pool.submit(() -> lse.task(phaser));
      }
    }
    pool.shutdown();
  }
  public void task(Phaser phaser) {
    phaser.arriveAndAwaitAdvance();
    doTask(phaser.getPhase());
  }
}

Wer das Rennen macht

Weitere Gründe, warum Phaser die bevorzugte Lösung gegenüber CountDownLatch und CyclicBarrier ist: Er wird mit einem ManagedBlocker realisiert; das bedeutet, dass, wenn unser Phaser einen Thread im gemeinsamen Fork-Join-Pool blockiert, ein weiterer erstellt wird, um die Parallelität auf dem gewünschten Niveau zu halten. Außerdem können wir Phaser in einem Baum einrichten, um weniger Konflikte zu erhalten. Das ist ein bisschen kompliziert, zugegeben, aber es ist möglich. Das können wir mit den anderen Synchronisatoren wie CountDownLatch und CyclicBarrier nicht machen.

 

Keine Infos mehr verpassen!