Я использую класс JdbcTemplate
с версии 1.0, и он хорошо эволюционировал, но я надеялся, что для версии 5 он будет включать некоторые возможности потоковой передачи для запросов с большими результатами. Увы, этого не произошло.
Тем не менее, иногда мне нужно выполнять запросы, которые возвращают миллионы строк, и я не могу использовать методы JdbcTemplate, которые возвращают для этого списки. RowCallbackHandler
идеально подходит для этого, но было бы намного приятнее просто получать поток, не так ли? Особенно, если у вас есть пользовательский Rowmapper…
Итак, я решил написать свой собственный генератор потоков для использования с JdbcTemplate. В процессе я закончил созданием более универсального генератора потоков, который, на мой взгляд, хорош, и поэтому я хочу поделиться им со всеми, кому нужно что-то подобное. Однако я не думаю, что этого материала достаточно для библиотеки, поэтому вместо этого я решил написать об этом пост.
Задача
Прежде всего, нам нужно учитывать, что потоки ленивы, и когда вы получаете поток и определяете операции, которые должны быть выполнены над ним, пока ничего не происходит, пока вы не реализуете заключительную операцию, которая должна фактически проходить элементы и применять операции к нему. Есть операции, которые проходят через весь поток (например, подсчет или сбор элементов в другие коллекции), и есть операции короткого замыкания (например, определение того, проходит ли какой-либо элемент какой-либо фильтр).
Итак, мы хотим получить поток и определить операции над ним, и ничего не происходит, вплоть до того момента, когда поток нужно будет пересечь, затем необходимо выполнить запрос (что подразумевает наличие открытого подключения к базе данных). Если произойдет что-то плохое, запрос необходимо остановить (и JdbcTemplate позаботится об очистке соединения и других ресурсов).
Единственный способ, которым я обнаружил, что могу заставить это работать, – это использовать два потока: поток-производитель, в котором выполняется запрос и строки каким-то образом передаются в поток, и поток-потребитель, который является считывателем потока.
Нам понадобится буфер, в котором производитель будет хранить элементы и из которого потребитель будет брать элементы. LinkedBlockingQueue кажется идеальным для этого.
Итак, без лишних слов, вот оно:
public staticStream streamForQuery(int bufferSize, T endOfStreamMarker, Consumer > query) { final LinkedBlockingQueue queue = new LinkedBlockingQueue<>(bufferSize); //This is the consumer that is usually passed to queries; //it will receive each item from the query and put it in the queue Consumer filler = t -> { try { //Try to add to the queue, waiting up to 1 second //Honestly if after 1 second the queue is still full, either the stream consumer //needs some serious optimization or, more likely, a short-circuit terminal //operation was performed on the stream. if (!queue.offer(t, 1, TimeUnit.SECONDS)) { //If the queue is full after 1 second, time out. //Throw an exception to stop the producer queue. log.error("Timeoud waiting to feed elements to stream"); throw new BufferOverflowException(); } } catch (InterruptedException ex) { System.err.println("Interrupted trying to add item to stream"); ex.printStackTrace(); } }; //For the stream that we return, we use a Spliterator. return StreamSupport.stream(() -> new Spliterators.AbstractSpliterator (Long.MAX_VALUE, Spliterator.ORDERED) { //We need to know if the producer thread has been started private boolean started = false; //If there's an exception in the producer, keep it here private volatile Throwable boom; /** This method is called once, before advancing to the first element. * It will start the producer thread, which runs the query, passing it our * queue filler. */ private void startProducer() { //Get the consumer thread Thread interruptMe = Thread.currentThread(); //First time this is called it will run the query in a separate thread //This is the producer thread new Thread(() -> { try { //Run the query, with our special consumer query.accept(filler); } catch (BufferOverflowException ignore) { //The filler threw this, means the queue is not being consumed fast enough //(or, more likely, not at all) } catch (Throwable thr) { //Something bad happened, store the exception and interrupt the reader boom = thr; interruptMe.interrupt(); } }).start(); started = true; } @Override public boolean tryAdvance(Consumer super T> action) { if (!started) { startProducer(); } try { //Take an item from the queue and if it's not the end of stream maker, pass it //to the action consumer. T t = queue.take(); if (t != endOfStreamMarker) { action.accept(t); return true; } } catch (InterruptedException ex) { if (boom == null) { System.err.println("Interrupted reading from stream"); ex.printStackTrace(); } else { //Throw the exception from the producer on the consumer side throw new RuntimeException(boom); } } return false; } }, Spliterator.IMMUTABLE, false); }
И вот как вы его используете, с помощью JdbcTemplate:
final MyRow marker = new MyRow(); Streamstream = streamForQuery(100, marker, callback -> { //Pass a RowCallbackHandler that passes a MyRow to the callback jdbcTemplate.query("SELECT * FROM really_big_table_with_millions_of_rows", rs -> { callback.accept(myRowMapper.mapRow(rs, 0)); } ); //Pass the marker to the callback, to signal end of stream callback.accept(marker); });
На данный момент запрос не был выполнен. Вы можете делать такие вещи, как:
поток.фильтр(строка -> row.is Симпатичная());
И все равно ничего не происходит. Когда вы делаете что-то подобное, хотя:
Необязательно.пропустить(100_000).ограничить(1000).найти любой();
Затем выполняется запрос, первые сто тысяч строк будут прочитаны (и пропущены), и каждая строка будет проходить через фильтр, пока не будет найдена одна или не будет прочитана тысяча строк.
С огромной силой…
Пожалуйста, пожалуйста, ПОЖАЛУЙСТА, не используйте это в качестве замены хорошего предложения WHERE и правильной индексации ваших таблиц. Я использовал эту штуку в основном для создания отчетов, объединяя потоки непересекающихся типов путем сопоставления элементов с общим типом для дальнейшей обработки (в основном, компенсируя отсутствие типов объединения в Java).
Сказав это, довольно здорово иметь возможность считывать строки из базы данных в потоковом режиме. Я предполагаю, что это может быть интегрировано в JdbcTemplate Spring и/или jOOQ…
Оригинал: “https://dev.to/chochos/streaming-large-queries-in-java-3c9d”