Рубрики
Без рубрики

Как реализовать очередь заданий базы данных с помощью блокировки ПРОПУСКА

Узнайте, как реализовать очередь заданий в системе реляционной базы данных с помощью директивы блокировки чтения или записи на уровне строк с блокировкой ПРОПУСКА.

Автор оригинала: Vlad Mihalcea.

Вступление

В этой статье мы рассмотрим, как мы можем реализовать очередь заданий базы данных с помощью функции ПРОПУСТИТЬ ЗАБЛОКИРОВАНО.

I decided to write this article while answering this Stack Overflow question asked by Rafael Winterhalter .

Поскольку блокировка ПРОПУСКА является менее известной функцией SQL, это хорошая возможность показать вам, как ее использовать и почему ее следует использовать, особенно при реализации задачи очереди заданий.

Как реализовать очередь заданий базы данных с помощью функции ПРОПУСТИТЬ ЗАБЛОКИРОВАНО. @vlad_mihalcea https://t.co/sX2bkCXeHk https://t.co/sX2bkCXeHk

Модель предметной области

Давайте предположим, что у нас есть следующее Должность юридическое лицо, имеющее статус Перечисление свойств выглядит следующим образом:

Статус поста Перечисление инкапсулирует статус модератора данной Записи сущности. Поэтому, когда Сообщение создается впервые, статус находится В ОЖИДАНИИ . Модераторы сайта собираются рассмотреть отложенные Опубликуйте записи и измените состояние на ОДОБРЕНО или СПАМ .

Статус записи класс выглядит следующим образом:

public enum PostStatus {
    PENDING,
    APPROVED,
    SPAM
}

И сущность Post также тривиальна для отображения в качестве сущности JPA:

@Entity(name = "Post")
@Table(name = "post")
public class Post {

    @Id
    private Long id;

    private String title;

    private String body;

    @Enumerated
    private PostStatus status;

    //Getters and setters omitted for brevity
}

Очередь заданий

Таким образом, связанная таблица post действует как очередь заданий, поскольку строки должны быть модерированы перед отображением пользователю. Если у нас есть несколько одновременных пользователей, пытающихся модерировать объекты Post , нам нужен способ координации их усилий, чтобы избежать того, чтобы два модератора просматривали один и тот же Сообщение запись.

Давайте рассмотрим, что у нас есть следующее Публикация записи для модерации:

for (long i = 0; i < 10; i++) {
    Post post = new Post();
    post.setId(i);
    post.setTitle("High-Performance Java Persistence");
    post.setBody(String.format("Chapter %d summary", i));
    post.setStatus(PostStatus.PENDING);
    
    entityManager.persist(post);
}

Первой наивной реализацией было бы получение первых N Post строк, а также их блокировка:

public List getAndLockPosts(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager.createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout",
        LockOptions.NO_WAIT
    )
    .getResultList();
}

Обратите внимание, что мы используем PESSIMISTIC_WRITE JPA LockModeType для указания Hibernate применить эксклюзивную блокировку базовый выбранный Сообщение записи.

Подсказка запроса javax.persistence.lock.timeout JPA указывает Hibernate на выдачу НЕТ опции ОЖИДАНИЯ при применении эксклюзивной блокировки. Без использования функции “НЕ ЖДАТЬ” получение блокировки будет блокироваться до тех пор, пока она либо не получит блокировку на уровне строк, либо не истечет время ожидания блокировки.

Теперь, если мы вызовем метод getAndLockPost из двух параллельных потоков Java:

final int postCount = 2;

doInJPA(entityManager -> {
    assertEquals(
            postCount,
            getAndLockPosts(
                entityManager,
                PostStatus.PENDING,
                postCount
            ).size()
    );

    try {
        executeSync(() -> {
            doInJPA(_entityManager -> {
                assertEquals(
                    postCount,
                    getAndLockPosts(
                        _entityManager,
                        PostStatus.PENDING,
                        postCount
                    ).size()
                );
            });
        });
    } catch (Exception e) {
        assertEquals(
            1,
            Arrays.stream(ExceptionUtils.getThrowables(e))
            .map(Throwable::getClass)
            .filter(clazz -> clazz.equals(PessimisticLockException.class))
            .count()
        );
    }
});

Мы видим, что PessimisticLockException действительно выбрасывается:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

[Bob]: 
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status=0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p NOWAIT

-- SQL Error: 0, SQLState: 55P03
-- ERROR: could not obtain lock on row in relation "post"

Причина, по которой возникает исключение PessimisticLockException , заключается в том, что обе параллельные транзакции пытаются заблокировать одни и те же записи, поскольку вторая транзакция не может узнать, какие записи уже заблокированы.

Использование БЛОКИРОВКИ ПРОПУСКА

Чтобы устранить эту проблему, нам нужно использовать параметры Блокировки.ПРОПУСК ЗАБЛОКИРОВАН Подсказка для запроса в режиме гибернации:

public List getAndLockPostsWithSkipLocked(
            EntityManager entityManager,
            PostStatus status,
            int postCount) {
    return entityManager
    .createQuery(
        "select p " +
        "from Post p " +
        "where p.status = :status " +
        "order by p.id", Post.class)
    .setParameter("status", status)
    .setMaxResults(postCount)
    .setLockMode(LockModeType.PESSIMISTIC_WRITE)
    .setHint(
        "javax.persistence.lock.timeout", 
        LockOptions.SKIP_LOCKED
    )
    .getResultList();
}

Теперь при извлечении записей Post с использованием двух параллельных потоков Java:

final int postCount = 2;

doInJPA(entityManager -> {
    
    List pendingPosts = getAndLockPostsWithSkipLocked(
        entityManager, 
        PostStatus.PENDING, 
        postCount
    );
    
    List ids = pendingPosts
    .stream()
    .map(Post::getId)
    .collect(toList());
        
    assertTrue(
        ids.size() == 2 && 
        ids.contains(0L) && 
        ids.contains(1L)
    );

    executeSync(() -> {
        doInJPA(_entityManager -> {
            List _pendingPosts = getAndLockPostsWithSkipLocked(
                _entityManager, 
                PostStatus.PENDING, 
                postCount
            );
            
            List _ids = _pendingPosts
            .stream()
            .map(Post::getId)
            .collect(toList());
            
            assertTrue(
                _ids.size() == 2 && 
                _ids.contains(2L) && 
                _ids.contains(3L)
            );
        });
    });
});

Все будет работать просто отлично, так как вторая транзакция пропустит строки, которые ранее были заблокированы первой транзакцией:

[Alice]:
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

[Bob]:                                                                                                                                                                                                               
SELECT 
    p.id AS id1_0_,
    p.body AS body2_0_,
    p.status AS status3_0_,
    p.title AS title4_0_
FROM 
    post p
WHERE 
    p.status = 0
ORDER BY 
    p.id
LIMIT 2
FOR UPDATE OF p SKIP LOCKED

Обратите внимание на опцию ПРОПУСТИТЬ БЛОКИРОВКУ, добавленную Hibernate в предложение ДЛЯ ОБНОВЛЕНИЯ. Опция ПРОПУСТИТЬ БЛОКИРОВКУ позволит нам заблокировать строки, которые ранее не были заблокированы. В нашем примере вы можете видеть, что Алиса выбрала и заблокировала Сообщение объекты со значениями идентификаторов 0 и 1 в то время как Боб выбирает и блокирует в Опубликовать записи со значениями идентификаторов 3 и 4 .

Без этой опции реализация очереди заданий с использованием реляционной базы данных была бы очень сложной задачей.

Опция ПРОПУСТИТЬ БЛОКИРОВКУ в настоящее время поддерживается большинством систем реляционных баз данных. В следующем списке указана первая версия базы данных, в которой появилась поддержка БЛОКИРОВКИ ПРОПУСКА.

  • Oracle 10g
  • PostgreSQL 9.5
  • SQL Server 2005
  • MySQL 8.0

Вывод

БЛОКИРОВКА ПРОПУСКА-очень удобная опция при реализации алгоритмов управления параллелизмом с использованием реляционной базы данных. Теперь, когда SKIP LOCKED широко поддерживается, вы определенно должны использовать его, если вам нужно реализовать задание очереди в используемой вами системе реляционных баз данных.