Рубрики
Без рубрики

Интеграция Spring с AWS Kinesis

Узнайте, как интегрировать Spring с двумя библиотеками AWS для взаимодействия с потоками данных Kinesis

Автор оригинала: baeldung.

1. введение

Kinesis-это инструмент для сбора, обработки и анализа потоков данных в режиме реального времени, разработанный в Amazon. Одним из его главных преимуществ является то, что он помогает в разработке приложений, управляемых событиями.

В этом уроке мы рассмотрим несколько библиотек, которые позволяют нашему приложению Spring создавать и использовать записи из потока Kinesis . Примеры кода покажут основные функциональные возможности, но не будут представлять готовый к производству код.

2. Обязательное условие

Прежде чем мы пойдем дальше, нам нужно сделать две вещи.

Первый – создать проект Spring , так как цель здесь-взаимодействовать с Kinesis из проекта Spring.

Второй – создать поток данных Kinesis. Мы можем сделать это из веб-браузера в вашей учетной записи AWS. Одной из альтернатив для поклонников AWS CLI среди нас является использование командной строки . Поскольку мы будем взаимодействовать с ним из кода, у нас также должны быть под рукой учетные данные AWS IAM , ключ доступа и секретный ключ, а также регион.

Все наши производители будут создавать фиктивные записи IP-адресов, в то время как потребители будут считывать эти значения и перечислять их в консоли приложения.

3. AWS SDK для Java

Самая первая библиотека, которую мы будем использовать, – это AWS SDK для Java. Его преимущество заключается в том, что он позволяет нам управлять многими частями работы с потоками данных Kinesis. Мы можем считывать данные, создавать данные, создавать потоки данных и изменять потоки данных|/. Недостатком является то, что для того, чтобы иметь готовый к производству код, нам придется кодировать такие аспекты, как управление, обработка ошибок или демон, чтобы поддерживать жизнь потребителя.

3.1. Зависимость от Maven

Зависимость amazon-kinesis-client Maven предоставит все необходимое для того, чтобы у нас были рабочие примеры. Теперь мы добавим его в ваш pom.xml файл:


    com.amazonaws
    amazon-kinesis-client
    1.11.2

3.2. Установка пружины

Давайте повторно используем объект Amazon Kinesis , необходимый для взаимодействия с вашим потоком Kinesis. Мы создадим его как @Bean внутри нашего @SpringBootApplication класса:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

Далее давайте определим aws.access.ключ и aws.secret.ключ , необходимый для локальной машины, в application.properties :

aws.access.key=my-aws-access-key-goes-here
aws.secret.key=my-aws-secret-key-goes-here

И мы будем читать их, используя аннотацию @Value :

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

Для простоты мы будем полагаться на @Запланированные методы для создания и использования записей.

3.3. Потребитель

Потребитель AWS SDK Kinesis использует модель извлечения , что означает, что наш код будет извлекать записи из фрагментов потока данных Kinesis:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

Объект GetRecordsRequest создает запрос на потоковые данные . В нашем примере мы определили ограничение в 25 записей на запрос, и мы продолжаем читать, пока больше нечего читать.

Мы также можем заметить, что для нашей итерации мы использовали объект GetShardIteratorResult . Мы создали этот объект внутри метода @Postconstruct t, чтобы сразу же начать отслеживать записи:

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName(IPS_STREAM);
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId(IPS_SHARD_ID);

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

3.4. Производитель

Давайте теперь посмотрим, как обрабатывать создание записей для нашего потока данных Kinesis .

Мы вставляем данные с помощью PutRecordsRequest объекта . Для этого нового объекта мы добавляем список, содержащий несколько объектов PutRecordsRequestEntry :

List entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey(IPS_PARTITION_KEY);
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName(IPS_STREAM);
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

Мы создали базового потребителя и производителя смоделированных IP-записей. Все, что осталось сделать сейчас, это запустить наш весенний проект и просмотреть IP-адреса, перечисленные в нашей консоли приложений.

4. KCL и KPL

Клиентская библиотека Kinesis (KCL) – это библиотека, которая упрощает использование записей|/. Это также уровень абстракции над API Java AWS SDK для потоков данных Kinesis. За кулисами библиотека выполняет балансировку нагрузки во многих экземплярах, реагируя на сбои экземпляров, отмечая обработанные записи и реагируя на изменение конфигурации.

Библиотека производителя Kinesis (KPL) – это библиотека, полезная для записи в поток данных Kinesis . Он также обеспечивает уровень абстракции, который находится поверх API Java AWS SDK для потоков данных Kinesis. Для повышения производительности библиотека автоматически обрабатывает логику пакетной обработки, многопоточности и повторных попыток.

У KCL и KPL есть главное преимущество в том, что они просты в использовании, так что мы можем сосредоточиться на производстве и потреблении записей.

4.1. Зависимости Maven

При необходимости две библиотеки могут быть включены в наш проект отдельно. Чтобы включить KPI и KCL в наш проект Maven, нам необходимо обновить наш pom.xml файл:


    com.amazonaws
    amazon-kinesis-producer
    0.13.1


    com.amazonaws
    amazon-kinesis-client
    1.11.2

4.2. Установка Пружины

Единственная подготовка к весне, которая нам нужна, – это убедиться, что у нас под рукой есть учетные данные IAM. Значения для aws.access.ключ и aws.secret.ключ определены в нашем файле application.properties , поэтому при необходимости мы можем прочитать их с помощью @Value .

4.3. Потребитель

Сначала мы создадим класс, реализующий интерфейс IRecordProcessor и определяющий нашу логику обработки записей потока данных Kinesis , которая заключается в их печати в консоли:

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

Следующим шагом является определение заводского класса, который реализует IRecordProcessorFactory интерфейс и возвращает ранее созданный IpProcessor объект:

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

А теперь для заключительного шага мы будем использовать Рабочий объект для определения нашего потребительского конвейера . Нам нужен объект KinesisClientLibConfiguration , который при необходимости определит учетные данные IAM и регион AWS.

Мы передадим конфигурацию KinesisClientLibConfiguration и наш объект Фабрика Ip-процессоров нашему Рабочему , а затем запустим его в отдельном потоке. Мы сохраняем эту логику потребления записей всегда живой с помощью класса Worker , поэтому сейчас мы постоянно читаем новые записи:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  APP_NAME, 
  IPS_STREAM,
  new AWSStaticCredentialsProvider(awsCredentials), 
  IPS_WORKER)
    .withRegionName(Regions.EU_CENTRAL_1.getName());

final Worker worker = new Worker.Builder()
  .recordProcessorFactory(new IpProcessorFactory())
  .config(consumerConfig)
  .build();
CompletableFuture.runAsync(worker.run());

4.4. Производитель

Давайте теперь определим Конфигурацию производителя Kinesis объект, добавив учетные данные IAM и регион AWS:

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setVerifyCertificate(false)
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

Мы включим Производителя кинезиса объект, ранее созданный в @Запланированное задание, и будем непрерывно создавать записи для нашего потока кинетических данных:

IntStream.range(1, 200).mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord(IPS_STREAM, IPS_PARTITION_KEY, entry));

5. Кинезис связующего потока Весеннего Облака

Мы уже видели две библиотеки, обе созданные за пределами экосистемы Spring. Сейчас мы посмотрим, как кинезис связующего потока Spring Cloud может еще больше упростить нашу жизнь при создании поверх Потока Spring Cloud .

5.1. Зависимость от Maven

Зависимость Maven, которую нам нужно определить в нашем приложении для кинезиса Spring Cloud Stream Binder , такова:


    org.springframework.cloud
    spring-cloud-stream-binder-kinesis
    1.2.1.RELEASE

5.2. Установка Пружины

При работе на EC2 необходимые свойства AWS обнаруживаются автоматически, поэтому их нет необходимости определять. Поскольку мы запускаем наши примеры на локальном компьютере, нам необходимо определить ключ доступа IAM, секретный ключ и регион для нашей учетной записи AWS. Мы также отключили автоматическое определение имени стека CloudFormation для приложения:

cloud.aws.credentials.access-key=my-aws-access-key
cloud.aws.credentials.secret-key=my-aws-secret-key
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream поставляется в комплекте с тремя интерфейсами, которые мы можем использовать в привязке потока:

  • Приемник предназначен для приема данных
  • Источник используется для публикации записей
  • Процессор представляет собой комбинацию обоих

При необходимости мы также можем определить ваши собственные интерфейсы.

5.3. Потребитель

Определение потребителя – это работа, состоящая из двух частей. Сначала мы определим в application.properties поток данных, из которого мы будем потреблять:

spring.cloud.stream.bindings.input.destination=live-ips
spring.cloud.stream.bindings.input.group=live-ips-group
spring.cloud.stream.bindings.input.content-type=text/plain

А затем давайте определим класс Spring @Component . Аннотация @EnableBinding(Sink.class) позволит нам читать из потока Kinesis, используя метод, аннотированный @StreamListener(Раковина.ВВОД) :

@EnableBinding(Sink.class)
public class IpConsumer {

    @StreamListener(Sink.INPUT)
    public void consume(String ip) {
        System.out.println(ip);
    }
}

5.4. Производитель

Производитель также может быть разделен на две части. Во-первых, мы должны определить наши свойства потока внутри application.properties :

spring.cloud.stream.bindings.output.destination=live-ips
spring.cloud.stream.bindings.output.content-type=text/plain

А затем мы добавляем @EnableBinding(Source.class) на пружине @Компонент и создавать новые тестовые сообщения каждые несколько секунд:

@Component
@EnableBinding(Source.class)
public class IpProducer {

    @Autowired
    private Source source;

    @Scheduled(fixedDelay = 3000L)
    private void produce() {
        IntStream.range(1, 200).mapToObj(ipSuffix -> "192.168.0." + ipSuffix)
          .forEach(entry -> source.output().send(MessageBuilder.withPayload(entry).build()));
    }
}

Это все, что нам нужно для работы Spring Cloud Stream Binder Kinesis. Мы можем просто запустить приложение прямо сейчас.

6. Заключение

В этой статье мы рассмотрели, как интегрировать наш проект Spring с двумя библиотеками AWS для взаимодействия с потоком данных Kinesis. Мы также видели, как использовать библиотеку Spring Cloud Stream Binder Kinesis, чтобы сделать реализацию еще проще.

Исходный код этой статьи можно найти на Github .