Рубрики
Без рубрики

Сложный поток данных ep.2: Импорт документов из представлений MongoDB

Это второй эпизод моей серии “Хитрый поток данных”, в которой я представляю некоторые из самых хитрых iss… С тегами google cloud, dataflow, mongodb, java.

Это второй эпизод моей серии “Сложный поток данных”, в которой я представляю некоторые из самых сложных проблем, с которыми я столкнулся при внедрении конвейеров с помощью Google Cloud Dataflow, и как я их преодолел.

В последнем эпизоде рассматривались некоторые проблемы с большими запросами . На этот раз давайте поговорим о совершенно другом типе базы данных: MongoDB

MongoDB в настоящее время довольно широко распространен в мире баз данных и, возможно, является самой известной базой данных NoSQL на рынке. Итак, как и следовало ожидать, в SDK Dataflow есть готовый соединитель MongoDB, позволяющий упростить использование MongoDB в качестве источника данных .

Он предлагает возможность чтения из коллекций MongoDB и записи в них, скажем (наивный я, который в то время не был так хорошо знаком с MongoDB), думал, что это все, что требуется для реализации такого простого конвейера:

Но, конечно, – иначе не было бы смысла писать пост в блоге – все прошло не так гладко, как я ожидал.

Итак, вы хотите запросить представление, да?

В первой версии конвейера, которую я сделал в качестве разминки, я читал документы непосредственно из коллекции с помощью MongoDB IO.read().Withuri(...).withDatabase(... ).).с коллекцией(...) и не столкнулся ни с какой реальной проблемой. Однако был один тонкий момент, важность которого я в то время не осознавал: Поскольку исходный экземпляр MongoDB был размещен в Atlas, MongoDbIO не разрешалось запускать команду splitVector() по умолчанию и поэтому было обязательно добавить с помощью Bucket Auto(true) предложение для загрузки коллекции.

Я не ожидал трудностей, которые возникли, когда я наивно попытался использовать имя представления вместо коллекции:

[ПРЕДУПРЕЖДЕНИЕ] org.apache.beam.sdk. Конвейер$PipelineExecutionException: com.mongodb. Исключение MongoCommandException: Сбой команды с ошибкой 166 (Команда не поддерживается в представлении): “Пространство имен [myview] – это представление, а не коллекция” на сервере [***]

Так что, очевидно, MongoDB знает о моем представлении, поймите, я хотел бы запросить это представление но нет, это не позволит мне извлечь из него документы. На самом деле оказывается, что не было простого способа просто получить документы из представления. Этому, безусловно, есть хорошее объяснение, но я не смог его найти. Так неприятно…

Тебе знакомо это чувство… (Фото Википедия/Nlan86)

На самом деле представление в MongoDB не так просто, как обычное представление в мире SQL: представление MongoDB – это результат сбора документов, обработанных конвейером агрегации . И MongoDB IO способен выполнять запросы агрегации для коллекции чтения благодаря Агрегационный запрос , который может быть передан в .withQueryFn () . Решение начало появляться:

  • читать из сборника
  • извлеките определение агрегации из параметров представления
  • передайте конвейер агрегации в withQueryFn
  • MongoDB обработает документ через предоставленный конвейер, в результате чего будут получены те же документы, что и в представлении

Давайте следовать плану!

Извлеките конвейер агрегации представления

Чтобы получить конвейер, нам нужно напрямую использовать mongo-java-client и getcollectioninfos вместе с ним. Это довольно многословно:

static List retrieveViewPipeline(Options options) {
        if (Strings.isNullOrEmpty(options.getView())) {
            LOG.debug("No view in options");
            return new ArrayList<>();
        }
        com.mongodb.MongoClientOptions.Builder optionsBuilder = new com.mongodb.MongoClientOptions.Builder();
        optionsBuilder.maxConnectionIdleTime(60000);
        MongoClient mongoClient = new MongoClient(new MongoClientURI("mongodb+srv://" + options.getMongoDBUri(),
                optionsBuilder));

        List viewPipeline = null;
        for (Document collecInfosDoc : mongoClient.getDatabase(options.getDatabase()).listCollections()) {
            if (collecInfosDoc.getString("name").equalsIgnoreCase(options.getView())) {
                viewPipeline = collecInfosDoc.get("options", Document.class).getList("pipeline", Document.class);
                break;
            }
        }
        checkArgument(viewPipeline != null, String.format("%s view not found", options.getView()));

        return viewPipeline.stream().map((doc) -> doc.toBsonDocument(BsonDocument.class,
                MongoClient.getDefaultCodecRegistry())).collect(Collectors.toList());
    }

Передайте конвейер в MongoDB IO

Как уже упоминалось, MongoDB IO имеет метод для обработки агрегаций: с помощью запроса Fn . Однако этот метод на самом деле имеет небольшую ошибку в текущей версии (2.27) когда конвейер состоит из нескольких этапов:

Строка 71: Суровое время для последнего этапа конвейера: ( (скриншот с Github)

Конечно, для этого есть простой обходной путь: просто добавьте бесполезный элемент в список конвейера, который будет заменен bucket() stage:

if (viewPipeline.size() > 1) {
    viewPipeline.add(new BsonDocument());
}

Вот так, с помощью source connector, настроенного таким образом, теперь вы можете извлекать документы для просмотра:

PCollectionTuple mongoDocs =
    pipeline.apply("Read from MongoDB",
        MongoDbIO.read()
        .withUri("mongodb+srv://" + options.getMongoDBUri())         
        .withDatabase(options.getDatabase())                        
        .withCollection(options.getCollection())
        .withBucketAuto(true) 
        .withQueryFn(
            AggregationQuery.create()
                .withMongoDbPipeline(viewPipeline))
    )

Но подождите! Работает ли это с ОГРОМНЫМИ коллекциями?

Наконец-то! Теперь вы можете извлекать документы из своего тестового набора данных, теперь вы чувствуете, что готовы протестировать свой новый блестящий конвейер в вашем реальном, огромном представлении MongoDB. И затем…

com.mongodb. Исключение MongoCommandException: Команда завершилась ошибкой 16819 (Местоположение 16819): “Сортировка превысила лимит памяти в 104857600 байт, но не переключилась на внешнюю сортировку. Прерывание операции. Передайте allowDiskUse:true, чтобы зарегистрироваться.’

… оказывается, ты еще не закончил. По крайней мере, сообщение об ошибке довольно ясно: при обработке конвейера агрегации в экземпляре MongoDB был превышен лимит памяти (RAM). К сожалению, это ограничение не настраивается. Единственный способ обойти это разрешить MongoDB использовать файл подкачки, который вы можете принудительно установить, установив параметр allowDiskUse: true рядом с конвейером агрегации. Этот параметр легко доступен через mongo-java-client благодаря AggregateIterable.allowDiskUse() . Проблема в том, что, к сожалению, этот метод еще не представлен в MongoDB I O. Для этого есть запрос на функцию но на данный момент этого нет в дорожной карте.

К сожалению, allowDiskUse() необходим в двух местах соединителя MongoDB Beam, и переопределить их невозможно:

  • Ввод-вывод MongoDB.buildAutoBuckets
AggregateIterable buckets = mongoCollection.aggregate(aggregates).allowDiskUse(true);
return collection.aggregate(mongoDbPipeline()).allowDiskUse(true).iterator();

Итак, единственный способ отредактировать эти классы на данный момент – это разветвить или дублировать их. Не идеально, но, по крайней мере, вы можете выполнить некоторую очистку в своих зависимостях конвейера:

    
    
    
    
    
      org.mongodb
      mongo-java-driver
      3.12.7
    

Все, что вам нужно, это монго-java-драйвер

У этой длинной истории счастливый конец: благодаря allowDiskUse и файлу подкачки ваш пользовательский соединитель ввода-вывода MongoDB теперь может запрашивать представления MongoDB любого размера!

Вот и все для этого второго эпизода. Следите за обновлениями для следующего, я расскажу о рабочих процессах GCP, удобном способе организации ваших конвейеров потоков данных

Оригинал: “https://dev.to/stack-labs/tricky-dataflow-ep-2-import-documents-from-mongodb-views-lpf”