Рубрики
Без рубрики

Асинхронные пакетные операции в Couchbase

Узнайте, как выполнять эффективные пакетные операции в Couchbase с помощью асинхронного API Java Couchbase.

Автор оригинала: baeldung.

1. введение

В этом последующем руководстве по использованию Couchbase в приложении Spring мы исследуем асинхронный характер пакета SDK Couchbase и то , как он может использоваться для выполнения операций сохранения в пакетах, что позволяет нашему приложению оптимально использовать ресурсы Couchbase.

1.1. Интерфейс службы Crud

Во-первых, мы расширяем наш общий Crud-сервис интерфейс, чтобы включить пакетные операции:

public interface CrudService {
    ...
    
    List readBulk(Iterable ids);

    void createBulk(Iterable items);

    void updateBulk(Iterable items);

    void deleteBulk(Iterable ids);

    boolean exists(String id);
}

1.2. Интерфейс сущности Couchbase

Мы определяем интерфейс для сущностей, которые мы хотим сохранить:

public interface CouchbaseEntity {

    String getId();
    
    void setId(String id);
    
}

1.3. Абстрактный класс обслуживания Crud

Затем мы реализуем каждый из этих методов в общем абстрактном классе. Этот класс является производным от класса Person CrudService , который мы использовали в предыдущем уроке , и начинается следующим образом:

public abstract class AbstractCrudService implements CrudService {
    private BucketService bucketService;
    private Bucket bucket;
    private JsonDocumentConverter converter;

    public AbstractCrudService(BucketService bucketService, JsonDocumentConverter converter) {
        this.bucketService = bucketService;
        this.converter = converter;
    }

    protected void loadBucket() {
        bucket = bucketService.getBucket();
    }
    
    ...
}

2. Интерфейс Асинхронного Ковша

Пакет SDK Couchbase предоставляет интерфейс AsyncBucket для выполнения асинхронных операций. Учитывая экземпляр Bucket , вы можете получить его асинхронную версию с помощью метода async() :

AsyncBucket asyncBucket = bucket.async();

3. Пакетные Операции

Для выполнения пакетных операций с использованием интерфейса AsyncBucket мы используем библиотеку RxJava .

3.1. Пакетное чтение

Здесь мы реализуем метод read Bulk . Сначала мы используем AsyncBucket и механизм flatMap в RxJava для асинхронного извлечения документов в Observable , затем мы используем механизм toBlocking в RxJava для преобразования их в список сущностей:

@Override
public List readBulk(Iterable ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable asyncOperation = Observable
      .from(ids)
      .flatMap(new Func1>() {
          public Observable call(String key) {
              return asyncBucket.get(key);
          }
    });

    List items = new ArrayList();
    try {
        asyncOperation.toBlocking()
          .forEach(new Action1() {
              public void call(JsonDocument doc) {
                  T item = converter.fromDocument(doc);
                  items.add(item);
              }
        });
    } catch (Exception e) {
        logger.error("Error during bulk get", e);
    }

    return items;
}

3.2. Пакетная Вставка

Мы снова используем конструкцию flatMap RxJava для реализации метода create Bulk .

Поскольку запросы на массовую мутацию создаются быстрее, чем могут быть сгенерированы их ответы, что иногда приводит к перегрузке, мы инициируем повторную попытку с экспоненциальной задержкой всякий раз, когда встречается исключение BackpressureException :

@Override
public void createBulk(Iterable items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable call(final T t) {
              if(t.getId() == null) {
                  t.setId(UUID.randomUUID().toString());
              }
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.insert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.3. Пакетное обновление

Мы используем аналогичный механизм в методе update Bulk :

@Override
public void updateBulk(Iterable items) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(items)
      .flatMap(new Func1>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable call(final T t) {
              JsonDocument doc = converter.toDocument(t);
              return asyncBucket.upsert(doc)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

3.4. Пакетное удаление

И мы пишем метод delete Bulk следующим образом:

@Override
public void deleteBulk(Iterable ids) {
    AsyncBucket asyncBucket = bucket.async();
    Observable
      .from(ids)
      .flatMap(new Func1>() {
          @SuppressWarnings("unchecked")
          @Override
          public Observable call(String key) {
              return asyncBucket.remove(key)
                .retryWhen(RetryBuilder
                  .anyOf(BackpressureException.class)
                  .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
                  .max(10)
                  .build());
          }
      })
      .last()
      .toBlocking()
      .single();
}

4. Person CrudService

Наконец, мы пишем весеннюю службу Person CrudService , которая расширяет вашу Абстрактную службу Crud для Person entity.

Поскольку все взаимодействие Couchbase реализовано в абстрактном классе, реализация для класса сущностей тривиальна, так как нам нужно только убедиться, что все наши зависимости введены и наше ведро загружено:

@Service
public class PersonCrudService extends AbstractCrudService {

    @Autowired
    public PersonCrudService(
      @Qualifier("TutorialBucketService") BucketService bucketService,
      PersonDocumentConverter converter) {
        super(bucketService, converter);
    }

    @PostConstruct
    private void init() {
        loadBucket();
    }
}

5. Заключение

Исходный код, показанный в этом руководстве, доступен в проекте github .

Вы можете узнать больше о Java SDK Couchbase на официальном сайте документации разработчика Couchbase .