Рубрики
Без рубрики

Использование InfluxDB с Java

Узнайте, как использовать InfluxDB – высокопроизводительное хранилище данных временных рядов.

Автор оригинала: baeldung.

1. Обзор

InfluxDB -это высокопроизводительное хранилище данных временных рядов. Он поддерживает вставку и запрос данных в режиме реального времени с помощью SQL-подобного языка запросов.

В этой вводной статье мы продемонстрируем, как подключиться к серверу InfluxDB, создать базу данных, записать информацию о временных рядах и затем запросить базу данных.

2. Настройка

Чтобы подключиться к базе данных, нам нужно добавить запись в ваш pom.xml файл:


    org.influxdb
    influxdb-java
    2.8

Последнюю версию этой зависимости можно найти на Maven Central .

Нам также понадобится экземпляр InfluxDB. Инструкции по загрузке и установке базы данных можно найти на веб-сайте Influence Data .

3. Подключение к серверу

3.1. Создание соединения

Создание соединения с базой данных требует передачи URL-адреса String и учетных данных пользователя на фабрику соединений:

InfluxDB influxDB = InfluxDBFactory.connect(databaseURL, userName, password);

3.2. Проверка подключения

Связь с базой данных осуществляется через RESTful API , поэтому они не являются постоянными.

API предлагает специальную службу “ping” для подтверждения работоспособности соединения. Если соединение хорошее, ответ содержит версию базы данных. Если нет, то он содержит “неизвестно”.

Поэтому после создания соединения мы можем проверить его, выполнив:

Pong response = this.influxDB.ping();
if (response.getVersion().equalsIgnoreCase("unknown")) {
    log.error("Error pinging server.");
    return;
} 

3.3. Создание базы данных

Создание базы данных InfluxDB аналогично созданию базы данных на большинстве платформ. Но нам нужно создать хотя бы одну политику хранения, прежде чем использовать ее.

Политика хранения указывает базе данных, как долго должен храниться фрагмент данных. Временные ряды, такие как статистика процессора или памяти, имеют тенденцию накапливаться в больших наборах данных.

Типичной стратегией управления размером баз данных временных рядов является понижающая дискретизация . “Необработанные” данные хранятся с высокой скоростью, суммируются, а затем удаляются через короткое время.

Политики хранения упрощают это, связывая часть данных со временем истечения срока действия. Данные о притоке имеют подробное объяснение на своем сайте.

После создания базы данных мы добавим единую политику с именем Политика по умолчанию. Он просто сохранит данные в течение 30 дней:

influxDB.createDatabase("baeldung");
influxDB.createRetentionPolicy(
  "defaultPolicy", "baeldung", "30d", 1, true);

Чтобы создать политику хранения, нам понадобится имя, база данных, интервал, коэффициент репликации (который должен быть равен 1 для базы данных с одним экземпляром) и логическое значение , указывающее, что это политика по умолчанию.

3.4. Установка уровня ведения журнала

Внутренне API InfluxDB использует модернизацию и предоставляет интерфейс для модернизации средства ведения журнала с помощью перехватчика ведения журнала.

Таким образом, мы можем установить уровень ведения журнала с помощью:

influxDB.setLogLevel(InfluxDB.LogLevel.BASIC);

И теперь мы можем видеть сообщения, когда мы открываем соединение и пингуем его:

Dec 20, 2017 5:38:10 PM okhttp3.internal.platform.Platform log
INFO: --> GET http://127.0.0.1:8086/ping

Доступные уровни: БАЗОВЫЙ , ПОЛНЫЙ , ЗАГОЛОВКИ и НЕТ.

4. Добавление и извлечение данных

4.1. Баллы

Итак, теперь мы готовы начать вставку и извлечение данных.

Основной единицей информации в базе данных InfluxDB является Точка, которая по сути является меткой времени и картой ключевых значений.

Давайте посмотрим на точку, содержащую данные об использовании памяти:

Point point = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743656L)
  .addField("used", 1015096L)
  .addField("buffer", 1010467L)
  .build();

Мы создали запись, которая содержит три Лонга в качестве статистики памяти, имя хоста и метку времени.

Давайте посмотрим, как добавить это в базу данных.

4.2. Написание пакетов

Данные временных рядов, как правило, состоят из множества небольших точек, и запись этих записей по одной за раз была бы очень неэффективной. Предпочтительным методом является сбор записей в пакеты.

API InfluxDB предоставляет объект Batch Point :

BatchPoints batchPoints = BatchPoints
  .database(dbName)
  .retentionPolicy("defaultPolicy")
  .build();

Point point1 = Point.measurement("memory")
  .time(System.currentTimeMillis(), TimeUnit.MILLISECONDS)
  .addField("name", "server1") 
  .addField("free", 4743656L)
  .addField("used", 1015096L) 
  .addField("buffer", 1010467L)
  .build();

Point point2 = Point.measurement("memory")
  .time(System.currentTimeMillis() - 100, TimeUnit.MILLISECONDS)
  .addField("name", "server1")
  .addField("free", 4743696L)
  .addField("used", 1016096L)
  .addField("buffer", 1008467L)
  .build();

batchPoints.point(point1);
batchPoints.point(point2);
influxDB.write(batchPoints);

Мы создаем Пакетную точку , а затем добавляем к ней Точки . Мы установили метку времени для нашей второй записи на 100 миллисекунд в прошлом, так как метки времени являются основным индексом. Если мы отправим две точки с одной и той же меткой времени, будет сохранена только одна.

Обратите внимание, что мы должны связать Пакетные точки с базой данных и политикой хранения.

4.3. Писать по одному

Дозирование может быть непрактичным для некоторых случаев использования.

Давайте включим пакетный режим с одним вызовом подключения InfluxDB:

influxDB.enableBatch(100, 200, TimeUnit.MILLISECONDS);

Мы включили пакетирование 100 для вставки на сервер или отправки того, что у него есть, каждые 200 миллисекунд.

При включенном пакетном режиме мы все еще можем писать по одному за раз. Однако требуется дополнительная настройка:

influxDB.setRetentionPolicy("defaultPolicy");
influxDB.setDatabase(dbName);

Более того, теперь мы можем записывать отдельные точки, и они собираются пакетами в фоновом потоке:

influxDB.write(point);

Прежде чем ставить в очередь отдельные точки, нам нужно установить базу данных (аналогично команде use в SQL) и установить политику хранения по умолчанию. Поэтому, если мы хотим воспользоваться преимуществами понижающей выборки с несколькими политиками хранения, создание пакетов-это правильный путь.

В пакетном режиме используется отдельный пул потоков. Поэтому неплохо отключить его, когда он больше не нужен:

influxDB.disableBatch();

Закрытие соединения также приведет к закрытию пула потоков:

influxDB.close();

4.4. Результаты картографического запроса

Запросы возвращают a Результат запроса , который мы можем сопоставить с POJOs.

Прежде чем мы рассмотрим синтаксис запроса, давайте создадим класс для хранения нашей статистики памяти:

@Measurement(name = "memory")
public class MemoryPoint {

    @Column(name = "time")
    private Instant time;

    @Column(name = "name")
    private String name;

    @Column(name = "free")
    private Long free;

    @Column(name = "used")
    private Long used;

    @Column(name = "buffer")
    private Long buffer;
}

Класс аннотируется @Measurement(имя) , соответствующим Point.measurement(“память”) , который мы использовали для создания наших Точек .

Для каждого поля в нашем Результате запроса мы добавляем @Столбец(имя) аннотацию с именем соответствующего поля.

Результаты запроса сопоставляются с POJOs с помощью InfluxDBResultMapper.

4.5. Запрос базы данных InfluxDB

Итак, давайте использовать наш POJO с точками, которые мы добавили в базу данных в нашем двухточечном пакете:

QueryResult queryResult = connection
  .performQuery("Select * from memory", "baeldung");

InfluxDBResultMapper resultMapper = new InfluxDBResultMapper();
List memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743696L == memoryPointList.get(0).getFree());

Запрос иллюстрирует, как наше измерение с именем memory хранится в виде таблицы Точек , из которой мы можем выбрать .

InfluxDBResultMapper принимает ссылку на MemoryPoint.class с результатом Запроса и возвращает список точек.

После сопоставления результатов мы проверяем, что получили два, проверяя длину Списка , полученного из запроса. Затем мы смотрим на первую запись в списке и видим размер свободной памяти второй точки, которую мы вставили. По умолчанию порядок результатов запроса из InfluxDB увеличивается по метке времени.

Давайте изменим это:

queryResult = connection.performQuery(
  "Select * from memory order by time desc", "baeldung");
memoryPointList = resultMapper
  .toPOJO(queryResult, MemoryPoint.class);

assertEquals(2, memoryPointList.size());
assertTrue(4743656L == memoryPointList.get(0).getFree());

Добавление заказ по времени меняет порядок наших результатов.

Запросы InfluxDB очень похожи на SQL. На их сайте есть обширное справочное руководство .

5. Заключение

Мы подключились к серверу InfluxDB, создали базу данных с политикой хранения, а затем вставили и извлекли данные с сервера.

Полный исходный код примеров находится на GitHub .