1. введение
В этом последующем руководстве по использованию Couchbase в приложении Spring мы исследуем асинхронный характер пакета SDK Couchbase и то , как он может использоваться для выполнения операций сохранения в пакетах, что позволяет нашему приложению оптимально использовать ресурсы Couchbase.
1.1. Интерфейс службы Crud
Во-первых, мы расширяем наш общий Crud-сервис интерфейс, чтобы включить пакетные операции:
public interface CrudService{ ... List readBulk(Iterable ids); void createBulk(Iterable items); void updateBulk(Iterable items); void deleteBulk(Iterable ids); boolean exists(String id); }
1.2. Интерфейс сущности Couchbase
Мы определяем интерфейс для сущностей, которые мы хотим сохранить:
public interface CouchbaseEntity { String getId(); void setId(String id); }
1.3. Абстрактный класс обслуживания Crud
Затем мы реализуем каждый из этих методов в общем абстрактном классе. Этот класс является производным от класса Person CrudService , который мы использовали в предыдущем уроке , и начинается следующим образом:
public abstract class AbstractCrudServiceimplements CrudService { private BucketService bucketService; private Bucket bucket; private JsonDocumentConverter converter; public AbstractCrudService(BucketService bucketService, JsonDocumentConverter converter) { this.bucketService = bucketService; this.converter = converter; } protected void loadBucket() { bucket = bucketService.getBucket(); } ... }
2. Интерфейс Асинхронного Ковша
Пакет SDK Couchbase предоставляет интерфейс AsyncBucket для выполнения асинхронных операций. Учитывая экземпляр Bucket , вы можете получить его асинхронную версию с помощью метода async() :
AsyncBucket asyncBucket = bucket.async();
3. Пакетные Операции
Для выполнения пакетных операций с использованием интерфейса AsyncBucket мы используем библиотеку RxJava .
3.1. Пакетное чтение
Здесь мы реализуем метод read Bulk . Сначала мы используем AsyncBucket и механизм flatMap в RxJava для асинхронного извлечения документов в Observable , затем мы используем механизм toBlocking в RxJava для преобразования их в список сущностей:
@Override public ListreadBulk(Iterable ids) { AsyncBucket asyncBucket = bucket.async(); Observable asyncOperation = Observable .from(ids) .flatMap(new Func1 >() { public Observable call(String key) { return asyncBucket.get(key); } }); List items = new ArrayList (); try { asyncOperation.toBlocking() .forEach(new Action1 () { public void call(JsonDocument doc) { T item = converter.fromDocument(doc); items.add(item); } }); } catch (Exception e) { logger.error("Error during bulk get", e); } return items; }
3.2. Пакетная Вставка
Мы снова используем конструкцию flatMap RxJava для реализации метода create Bulk .
Поскольку запросы на массовую мутацию создаются быстрее, чем могут быть сгенерированы их ответы, что иногда приводит к перегрузке, мы инициируем повторную попытку с экспоненциальной задержкой всякий раз, когда встречается исключение BackpressureException :
@Override public void createBulk(Iterableitems) { AsyncBucket asyncBucket = bucket.async(); Observable .from(items) .flatMap(new Func1 >() { @SuppressWarnings("unchecked") @Override public Observable call(final T t) { if(t.getId() == null) { t.setId(UUID.randomUUID().toString()); } JsonDocument doc = converter.toDocument(t); return asyncBucket.insert(doc) .retryWhen(RetryBuilder .anyOf(BackpressureException.class) .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100)) .max(10) .build()); } }) .last() .toBlocking() .single(); }
3.3. Пакетное обновление
Мы используем аналогичный механизм в методе update Bulk :
@Override public void updateBulk(Iterableitems) { AsyncBucket asyncBucket = bucket.async(); Observable .from(items) .flatMap(new Func1 >() { @SuppressWarnings("unchecked") @Override public Observable call(final T t) { JsonDocument doc = converter.toDocument(t); return asyncBucket.upsert(doc) .retryWhen(RetryBuilder .anyOf(BackpressureException.class) .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100)) .max(10) .build()); } }) .last() .toBlocking() .single(); }
3.4. Пакетное удаление
И мы пишем метод delete Bulk следующим образом:
@Override public void deleteBulk(Iterableids) { AsyncBucket asyncBucket = bucket.async(); Observable .from(ids) .flatMap(new Func1 >() { @SuppressWarnings("unchecked") @Override public Observable call(String key) { return asyncBucket.remove(key) .retryWhen(RetryBuilder .anyOf(BackpressureException.class) .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100)) .max(10) .build()); } }) .last() .toBlocking() .single(); }
4. Person CrudService
Наконец, мы пишем весеннюю службу Person CrudService , которая расширяет вашу Абстрактную службу Crud для Person entity.
Поскольку все взаимодействие Couchbase реализовано в абстрактном классе, реализация для класса сущностей тривиальна, так как нам нужно только убедиться, что все наши зависимости введены и наше ведро загружено:
@Service public class PersonCrudService extends AbstractCrudService{ @Autowired public PersonCrudService( @Qualifier("TutorialBucketService") BucketService bucketService, PersonDocumentConverter converter) { super(bucketService, converter); } @PostConstruct private void init() { loadBucket(); } }
5. Заключение
Исходный код, показанный в этом руководстве, доступен в проекте github .
Вы можете узнать больше о Java SDK Couchbase на официальном сайте документации разработчика Couchbase .