Рубрики
Без рубрики

Логика обработки блокировки заказчиком

Автор оригинала: Vlad Mihalcea.

Вступление

В текущем приложении, которое мы разрабатываем, был один случай использования, когда мы хотели синхронизировать обработку сообщений поставщиком сообщений (клиентом, генерирующим эти сообщения). Поток выглядит примерно так:

Таким образом, сообщения могут приходить случайным образом, поскольку параллельно выполняется больше заданий для клиентов, но мы хотим гарантировать, что сообщения, принадлежащие одному и тому же клиенту, обрабатываются одно за другим (аналогично уровню изоляции сериализуемой базы данных), позволяя при этом обрабатывать сообщения, поступающие от разных клиентов параллельно.

Синхронизация доступа

Итак, вот как выглядит механизм блокировки клиента:

/**
 * CustomerLockedExecution - Lock execution based for a given customer
 */
public class CustomerLockedExecution {

    private Map lockMap = new HashMap();

    private Lock getLock(K customerId) {
        ReentrantLock lock = lockMap.get(customerId);
        if (lock == null) {
            synchronized (this) {
                lock = lockMap.get(customerId);

                if (lock == null) {
                    lock = new ReentrantLock();
                    lockMap.put(customerId, lock);
                }
            }
        }
        return lock;
    }

    /**
     * Lock on the customer and execute the specific logic
     *
     * @param customerId customer id
     * @param callable   custom logic callback
     */
    public  void lockExecution(K customerId, Callable callable) {
        Lock lock = getLock(customerId);
        try {
            lock.lockInterruptibly();
            callable.call();
        } catch (Exception e) {
            throw new CallableException(e, callable);
        } finally {
            lock.unlock();
        }
    }
}

Время тестирования

Модульный тест запустит 10 потоков, каждый из которых имеет одинаковое значение CustomerID , поэтому все они совпадают для выполнения своей логики, которая состоит из добавления 3 последовательных чисел (начиная с начального индекса) в общий буфер.

private CustomerLockedExecution execution = new CustomerLockedExecution<>();

private CopyOnWriteArrayList buffer = new CopyOnWriteArrayList<>();

private static final int appendTries = 3;

private final int threadCount = 10;

private ExecutorService executorService = Executors.newFixedThreadPool(threadCount);

@Test
public void testAwaitExecutionForSameIntegratedSource() 
        throws InterruptedException {
    final CountDownLatch startLatch = new CountDownLatch(threadCount + 1);
    final CountDownLatch endLatch = new CountDownLatch(threadCount + 1);

    for (long i = 0; i < threadCount; i++) {
        final long index = i * threadCount;

        LOG.info("Scheduling thread index {}", index);

        executorService.submit(() -> {
            try {
                startLatch.countDown();
                startLatch.await();
                execution.lockExecution(
                    0L,
                    () -> {
                        LOG.info("Running thread index {}", index);
                        for (int j = 0; j < appendTries; j++) {
                            long number = index + j;
                            LOG.info("Adding {}", number);
                            buffer.add(number);
                        }

                        return null;
                    }
                );
                endLatch.countDown();
            } catch (InterruptedException e) {
                Thread.currentThread().interrupt();
            }
        });
    }

    startLatch.countDown();

    LOG.info("Waiting for threads to be done");

    endLatch.countDown();
    endLatch.await();

    LOG.info("Threads are done processing");

    for (int i = 0; i < threadCount; i += appendTries) {
        long reference = buffer.get(i);
        for (int j = 0; j < appendTries; j++) {
            assertEquals(reference + j, (long) buffer.get(i + j));
        }
    }
}

При выполнении приведенного выше тестового примера мы получаем следующий результат:

Scheduling thread index 0
Scheduling thread index 10
Scheduling thread index 20
Scheduling thread index 30
Scheduling thread index 40
Scheduling thread index 50
Scheduling thread index 60
Scheduling thread index 70
Scheduling thread index 80
Scheduling thread index 90

Waiting for threads to be done

Running thread index 0
Adding 0
Adding 1
Adding 2

Running thread index 80
Adding 80
Adding 81
Adding 82

Running thread index 30
Adding 30
Adding 31
Adding 32

Running thread index 40
Adding 40
Adding 41
Adding 42

Running thread index 60
Adding 60
Adding 61
Adding 62

Running thread index 50
Adding 50
Adding 51
Adding 52

Running thread index 10
Adding 10
Adding 11
Adding 12

Running thread index 90
Adding 90
Adding 91
Adding 92

Running thread index 20
Adding 20
Adding 21
Adding 22

Running thread index 70
Adding 70
Adding 71
Adding 72

Threads are done processing

Как вы можете видеть, каждый поток выполняется случайным образом, даже если все они запланированы для запуска одновременно, и между ними нет перекрывающихся чисел, поэтому каждый поток добавляет свои три числа, не чередуя добавление с каким-либо другим потоком.

Предотвращение взаимоблокировок

Вы должны знать о взаимоблокировках, так как мы удерживаем блокировку при выполнении определенной логики, вызывающей какой-либо не закрытый метод, и эта конкретная вызываемая логика также может получить какую-либо другую блокировку.

К счастью, это не наш случай, так как наш конвейер сообщений идет от одного конца к другому, поэтому существует только один способ ввода этой логики обработки.

В любом случае, когда приобретается несколько блокировок (например, A, B и C), обязательно всегда приобретать эти блокировки в одном и том же порядке:

  • A -> B -> C и A -> B

Комбинации, такие как:

  • A -> B и B -> A
  • A -> B -> C и C -> B -> A

запрещены, так как они могут оказаться в тупике.

Вывод

Кроме того, я всегда стараюсь избегать вызова внешнего API при удержании блокировки, так как они могут оказаться медленными (длительный вызов веб-службы обработки), что может повлиять на масштабируемость нашей обработки, поскольку блокировка будет сохраняться в течение длительного времени.

Но внешние вызовы API также могут получать блокировки, о которых мы не знаем, что увеличивает вероятность взаимоблокировки, если мы случайно блокируем те же объекты, что и внешний API.