查看: 258|回复: 0

[Java代码] Java使用kafka发送和生产消息的示例

发表于 2018-5-6 09:01:11

1. maven依赖包

  1. <dependency>
  2. <groupId>org.apache.kafka</groupId>
  3. <artifactId>kafka-clients</artifactId>
  4. <version>0.9.0.1</version>
  5. </dependency>
复制代码

2. 生产者代码

  1. package com.lnho.example.kafka;
  2. import org.apache.kafka.clients.producer.KafkaProducer;
  3. import org.apache.kafka.clients.producer.Producer;
  4. import org.apache.kafka.clients.producer.ProducerRecord;
  5. import java.util.Properties;
  6. public class KafkaProducerExample {
  7. public static void main(String[] args) {
  8. Properties props = new Properties();
  9. props.put("bootstrap.servers", "master:9092");
  10. props.put("acks", "all");
  11. props.put("retries", 0);
  12. props.put("batch.size", 16384);
  13. props.put("linger.ms", 1);
  14. props.put("buffer.memory", 33554432);
  15. props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  16. props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  17. Producer<String, String> producer = new KafkaProducer<>(props);
  18. for(int i = 0; i < 100; i++)
  19. producer.send(new ProducerRecord<>("topic1", Integer.toString(i), Integer.toString(i)));
  20. producer.close();
  21. }
  22. }
复制代码

3. 消费者代码

  1. package com.lnho.example.kafka;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.apache.kafka.clients.consumer.ConsumerRecords;
  4. import org.apache.kafka.clients.consumer.KafkaConsumer;
  5. import java.util.Arrays;
  6. import java.util.Properties;
  7. public class KafkaConsumerExample {
  8. public static void main(String[] args) {
  9. Properties props = new Properties();
  10. props.put("bootstrap.servers", "master:9092");
  11. props.put("group.id", "test");
  12. props.put("enable.auto.commit", "true");
  13. props.put("auto.commit.interval.ms", "1000");
  14. props.put("session.timeout.ms", "30000");
  15. props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  16. props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
  17. KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
  18. consumer.subscribe(Arrays.asList("topic1"));
  19. while (true) {
  20. ConsumerRecords<String, String> records = consumer.poll(100);
  21. for (ConsumerRecord<String, String> record : records)
  22. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value());
  23. }
  24. }
  25. }
复制代码

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持程序员之家。



回复

使用道具 举报