Рубрики
Без рубрики

Клиент MQTT на Java

Узнайте, как использовать библиотеку Paho для отправки и получения сообщений от брокера MQTT в Java-приложении,

Автор оригинала: baeldung.

1. Обзор

В этом уроке мы увидим, как мы можем добавить сообщения MQTT в проект Java, используя библиотеки, предоставляемые проектом Eclipse Paho .

2. Грунтовка MQTT

MQTT (MQ Telemetry Transport)-это протокол обмена сообщениями, созданный для удовлетворения потребности в простом и легком способе передачи данных на | с маломощных устройств, таких как устройства, используемые в промышленных приложениях.

С ростом популярности устройств IoT (Internet of Things) MQTT стал широко использоваться, что привело к его стандартизации OASIS и ISO.

Протокол поддерживает единый шаблон обмена сообщениями, а именно шаблон публикации-подписки: каждое сообщение, отправленное клиентом, содержит соответствующую “тему”, которая используется брокером для маршрутизации его подписанным клиентам. Имена тем могут быть простыми строками, такими как ” температура масла “или похожая на путь строка” двигатель/1/об/мин “.

Чтобы получать сообщения, клиент подписывается на одну или несколько тем, используя свое точное имя или строку, содержащую один из поддерживаемых подстановочных знаков (“#” для многоуровневых тем и “+” для одноуровневых”).

3. Настройка проекта

Чтобы включить библиотеку Пахо в проект Maven, мы должны добавить следующую зависимость:


  org.eclipse.paho
  org.eclipse.paho.client.mqttv3
  1.2.0

Последнюю версию модуля библиотеки Java Eclipse Paho можно загрузить с Maven Central.

4. Настройка клиента

При использовании библиотеки Paho первое, что нам нужно сделать для отправки и/или получения сообщений от брокера MQTT, – это получить реализацию интерфейса IMqttClient . Этот интерфейс содержит все методы, необходимые приложению для установления соединения с сервером, отправки и получения сообщений.

Paho выходит из коробки с двумя реализациями этого интерфейса: асинхронной ( MqttAsyncClient ) и синхронной ( MqttClient ). В нашем случае мы сосредоточимся на синхронной версии, которая имеет более простую семантику.

Сама настройка представляет собой двухэтапный процесс: сначала мы создаем экземпляр класса MqttClient , а затем подключаем его к нашему серверу. В следующем подразделе подробно описаны эти шаги.

4.1. Создание нового экземпляра Mqttclient

В следующем фрагменте кода показано, как создать новый Mqttclient синхронный экземпляр:

String publisherId = UUID.randomUUID().toString();
IMqttClient publisher = new MqttClient("tcp://iot.eclipse.org:1883",publisherId);

В этом случае мы используем самый простой из доступных конструкторов, который принимает адрес конечной точки нашего брокера MQTT и идентификатор клиента , который однозначно идентифицирует нашего клиента.

В нашем случае мы использовали случайный UUID, поэтому при каждом запуске будет генерироваться новый идентификатор клиента.

Paho также предоставляет дополнительные конструкторы, которые мы можем использовать для настройки механизма сохранения, используемого для хранения неподтвержденных сообщений и/или ScheduledExecutorService , используемого для выполнения фоновых задач, требуемых реализацией механизма протокола.

Конечная точка сервера , которую мы используем, является общедоступным брокером MQTT, размещенным в проекте Paho , который позволяет любому пользователю с подключением к Интернету тестировать клиентов без необходимости какой-либо аутентификации.

4.2. Подключение к Серверу

Наш недавно созданный Mqtt-клиент экземпляр не подключен к серверу. Мы делаем это, вызывая его connect() метод , при необходимости передавая экземпляр MqttConnectOptions , который позволяет нам настраивать некоторые аспекты протокола.

В частности, мы можем использовать эти параметры для передачи дополнительной информации, такой как учетные данные безопасности, режим восстановления сеанса, режим повторного подключения и так далее.

Класс Mqtt Connection Options предоставляет эти параметры как простые свойства, которые мы можем задать с помощью обычных методов настройки. Нам нужно только установить свойства, необходимые для нашего сценария – остальные будут принимать значения по умолчанию.

Код, используемый для установления соединения с сервером, обычно выглядит следующим образом:

MqttConnectOptions options = new MqttConnectOptions();
options.setAutomaticReconnect(true);
options.setCleanSession(true);
options.setConnectionTimeout(10);
publisher.connect(options);

Здесь мы определяем наши параметры подключения таким образом, чтобы:

  • Библиотека автоматически попытается повторно подключиться к серверу в случае сбоя сети
  • Он будет отбрасывать неотправленные сообщения из предыдущего запуска
  • Тайм-аут подключения установлен на 10 секунд

5. Отправка Сообщений

Отправка сообщений с помощью уже подключенного клиента Mqtt очень проста. Мы используем один из вариантов метода publish() для отправки полезной нагрузки, которая всегда представляет собой массив байтов, в заданную тему , используя один из следующих параметров качества обслуживания:

  • 0 – семантика “самое большее один раз”, также известная как “огонь и забудь”. Используйте эту опцию, когда потеря сообщения приемлема, так как она не требует какого-либо подтверждения или сохранения
  • 1 – семантика “по крайней мере один раз”. Используйте эту опцию, когда потеря сообщений неприемлема и ваши подписчики могут обрабатывать дубликаты
  • 2 – семантика “ровно один раз”. Используйте эту опцию, если потеря сообщений неприемлема и ваши подписчики не могут обрабатывать дубликаты

В нашем примере проекта класс Датчик температуры двигателя играет роль имитационного датчика, который выдает новое значение температуры каждый раз, когда мы вызываем его метод call () .

Этот класс реализует интерфейс Вызываемый , поэтому мы можем легко использовать его с одной из реализаций ExecutorService , доступных в пакете java.util.concurrent :

public class EngineTemperatureSensor implements Callable {

    // ... private members omitted
    
    public EngineTemperatureSensor(IMqttClient client) {
        this.client = client;
    }

    @Override
    public Void call() throws Exception {        
        if ( !client.isConnected()) {
            return null;
        }           
        MqttMessage msg = readEngineTemp();
        msg.setQos(0);
        msg.setRetained(true);
        client.publish(TOPIC,msg);        
        return null;        
    }

    private MqttMessage readEngineTemp() {             
        double temp =  80 + rnd.nextDouble() * 20.0;        
        byte[] payload = String.format("T:%04.2f",temp)
          .getBytes();        
        return new MqttMessage(payload);           
    }
}

Сообщение Mqtt инкапсулирует саму полезную нагрузку, запрошенное качество обслуживания, а также флаг сохраненный для сообщения. Этот флаг указывает брокеру, что он должен сохранять это сообщение до тех пор, пока оно не будет использовано подписчиком.

Мы можем использовать эту функцию для реализации “последнего известного хорошего” поведения, поэтому, когда новый подписчик подключается к серверу, он сразу же получит сохраненное сообщение.

6. Получение Сообщений

Для получения сообщений от брокера MQTT нам необходимо использовать один из вариантов метода subscribe () , который позволяет нам указать:

  • Один или несколько фильтров тем для сообщений, которые мы хотим получить
  • Связанные QoS
  • Обработчик обратного вызова для обработки полученных сообщений

В следующем примере мы покажем, как добавить прослушиватель сообщений в существующий экземпляр IMqttClient для получения сообщений из данной темы. Мы используем CountDownLatch в качестве механизма синхронизации между нашим обратным вызовом и основным потоком выполнения, уменьшая его каждый раз, когда приходит новое сообщение.

В примере кода мы использовали другой экземпляр Mqtt-клиента для получения сообщений. Мы сделали это просто для того, чтобы прояснить, какой клиент что делает, но это не ограничение Paho – если вы хотите, вы можете использовать один и тот же клиент для публикации и получения сообщений:

CountDownLatch receivedSignal = new CountDownLatch(10);
subscriber.subscribe(EngineTemperatureSensor.TOPIC, (topic, msg) -> {
    byte[] payload = msg.getPayload();
    // ... payload handling omitted
    receivedSignal.countDown();
});    
receivedSignal.await(1, TimeUnit.MINUTES);

Используемый выше вариант subscribe() принимает экземпляр IMqttMessageListener в качестве второго аргумента.

В нашем случае мы используем простую лямбда-функцию, которая обрабатывает полезную нагрузку и уменьшает счетчик. Если в указанное время (1 минута) поступит недостаточно сообщений, метод await() выдаст исключение.

При использовании Paho нам не нужно явно подтверждать получение сообщения. Если обратный вызов возвращается нормально, Paho предполагает, что это успешное потребление и отправляет подтверждение на сервер.

Если обратный вызов вызовет Исключение , клиент будет закрыт. Обратите внимание, что это приведет к потере всех сообщений, отправленных с уровнем QoS 0 .

Сообщения, отправленные с уровнем QoS 1 или 2, будут отправлены сервером после повторного подключения клиента и повторной подписки на эту тему.

7. Заключение

В этой статье мы продемонстрировали, как мы можем добавить поддержку протокола MQTT в наши Java-приложения, используя библиотеку, предоставленную проектом Eclipse Paho.

Эта библиотека обрабатывает все детали протокола низкого уровня, позволяя нам сосредоточиться на других аспектах нашего решения, оставляя при этом достаточно места для настройки важных аспектов его внутренних функций, таких как сохранение сообщений.

Код, показанный в этой статье, доступен на GitHub .