Установка
Чтобы добавить зависимость с помощью Maven, используйте следующее:
com.clivern kafka-sdk 0.1.0
Чтобы добавить зависимость с помощью Gradle, используйте следующее:
dependencies { compile 'com.clivern:kafka-sdk:0.1.0' }
Чтобы добавить зависимость с помощью Scala SBT, используйте следующее:
libraryDependencies += "com.clivern" % "kafka-sdk" % "0.1.0"
Чтобы создать тему Кафки:
import java.util.HashMap; import com.clivern.kafka.Configs; import com.clivern.kafka.Utils; HashMapmap = new HashMap (); map.put("bootstrap.servers", "localhost:9092"); Utils.createTopic("clivern", Configs.fromMap(map));
Продюсер Кафки:
import com.clivern.kafka.Configs; import com.clivern.kafka.Producer; import com.clivern.kafka.Kafka; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.StringSerializer; Configs configs = new Configs(); configs.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configs.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); Producer producer = (new Kafka()).newProducer(configs); for (int i = 0; i < 10; i++) { ProducerRecordrecord = new ProducerRecord<>("clivern", null, "Hello World " + i); producer.send(record).flush(); } producer.close();
Потребитель Кафки:
import com.clivern.kafka.Configs; import com.clivern.kafka.Consumer; import com.clivern.kafka.Kafka; import com.clivern.kafka.HandlerCallbackInterface; import com.clivern.kafka.FailureCallbackInterface; import com.clivern.kafka.SuccessCallbackInterface; import com.clivern.kafka.exception.MissingHandler; import org.apache.kafka.clients.consumer.ConsumerConfig; import org.apache.kafka.common.serialization.StringDeserializer; import org.apache.kafka.clients.consumer.ConsumerRecord; Configs configs = new Configs(); configs.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); configs.put(ConsumerConfig.GROUP_ID_CONFIG, "clivern"); configs.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); configs.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); configs.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); Consumer consumer = (new Kafka()).newConsumer(configs); HandlerCallbackInterface> handler = (record) -> { System.out.println("Message Received: " + record.value()); // Throw error if message has error if (record.value().equals("error")) { throw new Exception("Error!"); } }; SuccessCallbackInterface > onSuccess = (record) -> { System.out.println("Message Succeeded: " + record.value()); }; FailureCallbackInterface > onFailure = (record, exception) -> { System.out.println( "Message " + record.value() + " Failed: " + exception.getMessage()); }; consumer.subscribe("clivern") .handler(handler) .onSuccess(onSuccess) .onFailure(onFailure) .run();
Пожалуйста, не забудьте заменить localhost
на хост кафки.
Для начала, не пропустите этот урок https://dev.to/clivern/getting-started-with-kafka-3mbi
Оригинал: “https://dev.to/clivern/getting-started-with-apache-kafka-and-java-40p5”