Рубрики
Без рубрики

Потоковая передача больших запросов на Java

Как использовать потоковый API Java вместе с JDBC для потоковой передачи запросов с большим количеством строк без нехватки памяти. Помеченный java, jdbc, spring, java streams.

Я использую класс JdbcTemplate с версии 1.0, и он хорошо эволюционировал, но я надеялся, что для версии 5 он будет включать некоторые возможности потоковой передачи для запросов с большими результатами. Увы, этого не произошло.

Тем не менее, иногда мне нужно выполнять запросы, которые возвращают миллионы строк, и я не могу использовать методы JdbcTemplate, которые возвращают для этого списки. RowCallbackHandler идеально подходит для этого, но было бы намного приятнее просто получать поток, не так ли? Особенно, если у вас есть пользовательский Rowmapper…

Итак, я решил написать свой собственный генератор потоков для использования с JdbcTemplate. В процессе я закончил созданием более универсального генератора потоков, который, на мой взгляд, хорош, и поэтому я хочу поделиться им со всеми, кому нужно что-то подобное. Однако я не думаю, что этого материала достаточно для библиотеки, поэтому вместо этого я решил написать об этом пост.

Задача

Прежде всего, нам нужно учитывать, что потоки ленивы, и когда вы получаете поток и определяете операции, которые должны быть выполнены над ним, пока ничего не происходит, пока вы не реализуете заключительную операцию, которая должна фактически проходить элементы и применять операции к нему. Есть операции, которые проходят через весь поток (например, подсчет или сбор элементов в другие коллекции), и есть операции короткого замыкания (например, определение того, проходит ли какой-либо элемент какой-либо фильтр).

Итак, мы хотим получить поток и определить операции над ним, и ничего не происходит, вплоть до того момента, когда поток нужно будет пересечь, затем необходимо выполнить запрос (что подразумевает наличие открытого подключения к базе данных). Если произойдет что-то плохое, запрос необходимо остановить (и JdbcTemplate позаботится об очистке соединения и других ресурсов).

Единственный способ, которым я обнаружил, что могу заставить это работать, – это использовать два потока: поток-производитель, в котором выполняется запрос и строки каким-то образом передаются в поток, и поток-потребитель, который является считывателем потока.

Нам понадобится буфер, в котором производитель будет хранить элементы и из которого потребитель будет брать элементы. LinkedBlockingQueue кажется идеальным для этого.

Итак, без лишних слов, вот оно:

    public static  Stream streamForQuery(int bufferSize, T endOfStreamMarker,
                                               Consumer> query) {
        final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(bufferSize);
        //This is the consumer that is usually passed to queries;
        //it will receive each item from the query and put it in the queue
        Consumer filler = t -> {
            try {
                //Try to add to the queue, waiting up to 1 second
                //Honestly if after 1 second the queue is still full, either the stream consumer
                //needs some serious optimization or, more likely, a short-circuit terminal
                //operation was performed on the stream.
                if (!queue.offer(t, 1, TimeUnit.SECONDS)) {
                    //If the queue is full after 1 second, time out.
                    //Throw an exception to stop the producer queue.
                    log.error("Timeoud waiting to feed elements to stream");
                    throw new BufferOverflowException();
                }
            } catch (InterruptedException ex) {
                System.err.println("Interrupted trying to add item to stream");
                ex.printStackTrace();
            }
        };
        //For the stream that we return, we use a Spliterator.
        return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator(Long.MAX_VALUE, Spliterator.ORDERED) {
            //We need to know if the producer thread has been started
            private boolean started = false;
            //If there's an exception in the producer, keep it here
            private volatile Throwable boom;
            /** This method is called once, before advancing to the first element.
             * It will start the producer thread, which runs the query, passing it our
             * queue filler.
             */
            private void startProducer() {
                //Get the consumer thread
                Thread interruptMe = Thread.currentThread();
                //First time this is called it will run the query in a separate thread
                //This is the producer thread
                new Thread(() -> {
                    try {
                        //Run the query, with our special consumer
                        query.accept(filler);
                    } catch (BufferOverflowException ignore) {
                        //The filler threw this, means the queue is not being consumed fast enough
                        //(or, more likely, not at all)
                    } catch (Throwable thr) {
                        //Something bad happened, store the exception and interrupt the reader
                        boom = thr;
                        interruptMe.interrupt();
                    }
                }).start();
                started = true;
            }
            @Override
            public boolean tryAdvance(Consumer action) {
                if (!started) {
                    startProducer();
                }
                try {
                    //Take an item from the queue and if it's not the end of stream maker, pass it
                    //to the action consumer.
                    T t = queue.take();
                    if (t != endOfStreamMarker) {
                        action.accept(t);
                        return true;
                    }
                } catch (InterruptedException ex) {
                    if (boom == null) {
                        System.err.println("Interrupted reading from stream");
                        ex.printStackTrace();
                    } else {
                        //Throw the exception from the producer on the consumer side
                        throw new RuntimeException(boom);
                    }
                }
                return false;
            }
        }, Spliterator.IMMUTABLE, false);
    }

И вот как вы его используете, с помощью JdbcTemplate:

final MyRow marker = new MyRow();
Stream stream = streamForQuery(100, marker, callback -> {
    //Pass a RowCallbackHandler that passes a MyRow to the callback
    jdbcTemplate.query("SELECT * FROM really_big_table_with_millions_of_rows",
                       rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); }
    );
    //Pass the marker to the callback, to signal end of stream
    callback.accept(marker);
});

На данный момент запрос не был выполнен. Вы можете делать такие вещи, как:

поток.фильтр(строка -> row.is Симпатичная());

И все равно ничего не происходит. Когда вы делаете что-то подобное, хотя:

Необязательно.пропустить(100_000).ограничить(1000).найти любой();

Затем выполняется запрос, первые сто тысяч строк будут прочитаны (и пропущены), и каждая строка будет проходить через фильтр, пока не будет найдена одна или не будет прочитана тысяча строк.

С огромной силой…

Пожалуйста, пожалуйста, ПОЖАЛУЙСТА, не используйте это в качестве замены хорошего предложения WHERE и правильной индексации ваших таблиц. Я использовал эту штуку в основном для создания отчетов, объединяя потоки непересекающихся типов путем сопоставления элементов с общим типом для дальнейшей обработки (в основном, компенсируя отсутствие типов объединения в Java).

Сказав это, довольно здорово иметь возможность считывать строки из базы данных в потоковом режиме. Я предполагаю, что это может быть интегрировано в JdbcTemplate Spring и/или jOOQ…

Оригинал: “https://dev.to/chochos/streaming-large-queries-in-java-3c9d”