Рубрики
Без рубрики

Руководство по CountDownLatch в Java

Руководство по java.util.concurrent.Обратный отсчет с конкретными примерами

Автор оригинала: baeldung.

1. введение

В этой статье мы дадим руководство по классу CountDownLatch | и продемонстрируем, как его можно использовать на нескольких практических примерах.

По сути, используя CountDownLatch , мы можем заставить поток блокироваться до тех пор, пока другие потоки не выполнят заданную задачу.

2. Использование в параллельном программировании

Проще говоря, CountDownLatch имеет поле counter , которое вы можете уменьшить по мере необходимости. Затем мы можем использовать его для блокировки вызывающего потока до тех пор, пока он не будет отсчитан до нуля.

Если бы мы выполняли некоторую параллельную обработку, мы могли бы создать экземпляр CountDownLatch с тем же значением для счетчика, что и количество потоков, с которыми мы хотим работать. Затем мы могли бы просто вызвать countdown() после завершения каждого потока, гарантируя, что зависимый поток, вызывающий wait () , будет блокироваться до тех пор, пока рабочие потоки не будут завершены.

3. Ожидание завершения пула потоков

Давайте попробуем этот паттерн, создав Worker и используя поле CountDownLatch для сигнала о его завершении:

public class Worker implements Runnable {
    private List outputScraper;
    private CountDownLatch countDownLatch;

    public Worker(List outputScraper, CountDownLatch countDownLatch) {
        this.outputScraper = outputScraper;
        this.countDownLatch = countDownLatch;
    }

    @Override
    public void run() {
        doSomeWork();
        outputScraper.add("Counted down");
        countDownLatch.countDown();
    }
}

Затем давайте создадим тест, чтобы доказать, что мы можем заставить CountDownLatch ждать завершения экземпляров Worker :

@Test
public void whenParallelProcessing_thenMainThreadWillBlockUntilCompletion()
  throws InterruptedException {

    List outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List workers = Stream
      .generate(() -> new Thread(new Worker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

      workers.forEach(Thread::start);
      countDownLatch.await(); 
      outputScraper.add("Latch released");

      assertThat(outputScraper)
        .containsExactly(
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Counted down",
          "Latch released"
        );
    }

Естественно, “Защелка выпущена” всегда будет последним выходом – так как она зависит от CountDownLatch releasing.

Обратите внимание , что если бы мы не вызывали await () , мы не смогли бы гарантировать порядок выполнения потоков, поэтому тест случайно провалился бы.

4. Пул потоков, ожидающих начала

Если мы возьмем предыдущий пример, но на этот раз запустили тысячи потоков вместо пяти, то вполне вероятно, что многие из более ранних завершат обработку еще до того, как мы вызовем start() на более поздних. Это может затруднить попытку воспроизвести проблему параллелизма, так как мы не сможем заставить все наши потоки работать параллельно.

Чтобы обойти это, давайте заставим CountDownLatch работать иначе, чем в предыдущем примере. Вместо того чтобы блокировать родительский поток до тех пор, пока не закончатся некоторые дочерние потоки, мы можем блокировать каждый дочерний поток до тех пор, пока не начнутся все остальные.

Давайте изменим наш метод run() так, чтобы он блокировался перед обработкой:

public class WaitingWorker implements Runnable {

    private List outputScraper;
    private CountDownLatch readyThreadCounter;
    private CountDownLatch callingThreadBlocker;
    private CountDownLatch completedThreadCounter;

    public WaitingWorker(
      List outputScraper,
      CountDownLatch readyThreadCounter,
      CountDownLatch callingThreadBlocker,
      CountDownLatch completedThreadCounter) {

        this.outputScraper = outputScraper;
        this.readyThreadCounter = readyThreadCounter;
        this.callingThreadBlocker = callingThreadBlocker;
        this.completedThreadCounter = completedThreadCounter;
    }

    @Override
    public void run() {
        readyThreadCounter.countDown();
        try {
            callingThreadBlocker.await();
            doSomeWork();
            outputScraper.add("Counted down");
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            completedThreadCounter.countDown();
        }
    }
}

Теперь давайте изменим наш тест так, чтобы он блокировался до тех пор, пока все Рабочие не запустятся, разблокирует Рабочих , а затем блокируется до тех пор, пока Рабочие не закончат:

@Test
public void whenDoingLotsOfThreadsInParallel_thenStartThemAtTheSameTime()
 throws InterruptedException {
 
    List outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch readyThreadCounter = new CountDownLatch(5);
    CountDownLatch callingThreadBlocker = new CountDownLatch(1);
    CountDownLatch completedThreadCounter = new CountDownLatch(5);
    List workers = Stream
      .generate(() -> new Thread(new WaitingWorker(
        outputScraper, readyThreadCounter, callingThreadBlocker, completedThreadCounter)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    readyThreadCounter.await(); 
    outputScraper.add("Workers ready");
    callingThreadBlocker.countDown(); 
    completedThreadCounter.await(); 
    outputScraper.add("Workers complete");

    assertThat(outputScraper)
      .containsExactly(
        "Workers ready",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Counted down",
        "Workers complete"
      );
}

Этот шаблон действительно полезен для воспроизведения ошибок параллелизма, так как может быть использован для того, чтобы заставить тысячи потоков пытаться выполнять некоторую логику параллельно.

5. Досрочное завершение обратного отсчета

Иногда мы можем столкнуться с ситуацией, когда Workers завершаются по ошибке перед отсчетом CountDownLatch. Это может привести к тому, что он никогда не достигнет нуля и await() никогда не завершится:

@Override
public void run() {
    if (true) {
        throw new RuntimeException("Oh dear, I'm a BrokenWorker");
    }
    countDownLatch.countDown();
    outputScraper.add("Counted down");
}

Давайте изменим наш предыдущий тест, чтобы использовать Сломанный рабочий, чтобы показать, как wait() заблокируется навсегда:

@Test
public void whenFailingToParallelProcess_thenMainThreadShouldGetNotGetStuck()
  throws InterruptedException {
 
    List outputScraper = Collections.synchronizedList(new ArrayList<>());
    CountDownLatch countDownLatch = new CountDownLatch(5);
    List workers = Stream
      .generate(() -> new Thread(new BrokenWorker(outputScraper, countDownLatch)))
      .limit(5)
      .collect(toList());

    workers.forEach(Thread::start);
    countDownLatch.await();
}

Очевидно, что это не то поведение, которое мы хотим – для приложения было бы гораздо лучше продолжать работу, чем бесконечно блокировать.

Чтобы обойти это, давайте добавим аргумент timeout к нашему вызову await().

boolean completed = countDownLatch.await(3L, TimeUnit.SECONDS);
assertThat(completed).isFalse();

Как мы видим, тест в конечном итоге выйдет из таймаута и await() вернет false .

6. Заключение

В этом кратком руководстве мы продемонстрировали, как мы можем использовать CountDownLatch для блокировки потока до тех пор, пока другие потоки не закончат некоторую обработку.

Мы также показали, как его можно использовать для отладки проблем параллелизма, убедившись, что потоки работают параллельно.

Реализацию этих примеров можно найти на GitHub ; это проект на основе Maven, поэтому его должно быть легко запустить как есть.