查看: 233|回复: 0

[Java学习] spring boot 与kafka集成的示例代码

发表于 2018-5-5 09:30:12

新建spring boot项目

这里使用intellij IDEA

这里写图片描述

这里写图片描述

这里写图片描述

这里写图片描述

添加kafka集成maven

  1. <?xml version="1.0" encoding="UTF-8"?>
  2. <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  3. xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  4. <modelVersion>4.0.0</modelVersion>
  5. <groupId>com.example</groupId>
  6. <artifactId>demo</artifactId>
  7. <version>0.0.1-SNAPSHOT</version>
  8. <packaging>jar</packaging>
  9. <name>demo</name>
  10. <description>Demo project for Spring Boot</description>
  11. <parent>
  12. <groupId>org.springframework.boot</groupId>
  13. <artifactId>spring-boot-starter-parent</artifactId>
  14. <version>1.5.8.RELEASE</version>
  15. <relativePath/> <!-- lookup parent from repository -->
  16. </parent>
  17. <properties>
  18. <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  19. <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
  20. <java.version>1.8</java.version>
  21. </properties>
  22. <dependencies>
  23. <dependency>
  24. <groupId>org.springframework.boot</groupId>
  25. <artifactId>spring-boot-starter-web</artifactId>
  26. </dependency>
  27. <dependency>
  28. <groupId>org.springframework.kafka</groupId>
  29. <artifactId>spring-kafka</artifactId>
  30. </dependency>
  31. <dependency>
  32. <groupId>org.springframework.boot</groupId>
  33. <artifactId>spring-boot-starter-test</artifactId>
  34. <scope>test</scope>
  35. </dependency>
  36. </dependencies>
  37. <build>
  38. <plugins>
  39. <plugin>
  40. <groupId>org.springframework.boot</groupId>
  41. <artifactId>spring-boot-maven-plugin</artifactId>
  42. </plugin>
  43. </plugins>
  44. </build>
  45. </project>
复制代码

项目中application.properties 添加

  1. spring.kafka.bootstrap-servers=vm208:9092,vm:9092,vm50:9092
  2. spring.kafka.consumer.auto-offset-reset=latest
  3. spring.kafka.consumer.group-id=local_test
  4. spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  5. spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
  6. spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
  7. spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
  8. spring.kafka.producer.acks=1
复制代码

新建KafkaConsumer消费类

  1. package com.example.demo.consumer;
  2. import org.apache.kafka.clients.consumer.ConsumerRecord;
  3. import org.slf4j.Logger;
  4. import org.slf4j.LoggerFactory;
  5. import org.springframework.kafka.annotation.KafkaListener;
  6. import org.springframework.stereotype.Component;
  7. @Component
  8. public class KafkaConsumer {
  9. private Logger logger = LoggerFactory.getLogger(this.getClass());
  10. @KafkaListener(topics = {"test"})
  11. public void listen(ConsumerRecord<?, ?> record) {
  12. System.out.printf("offset = %d,key =%s,value=%s\n", record.offset(), record.key(), record.value());
  13. }
  14. }
复制代码

启动spring-boot程序,在kafka集群,模拟发送topic,检验接收

代码如下:bin/kafka-console-producer.sh --broker-list vm208:9092,vm210:9092,vm50:9092 --topic test

编写producer代码

  1. package com.example.demo.producer;
  2. import org.apache.kafka.clients.producer.ProducerRecord;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.kafka.core.KafkaTemplate;
  5. import org.springframework.stereotype.Component;
  6. @Component
  7. public class KafkaProducer {
  8. @Autowired
  9. private KafkaTemplate kafkaTemplate;
  10. String topic="test";
  11. public void sendMessage(String key,String data){
  12. kafkaTemplate.send(new ProducerRecord(topic,key,data));
  13. }
  14. }
复制代码

建立一个restful模拟发送( //http://localhost:8080/kafka/send.do?key=2&data=allen-test-message)

  1. package com.example.demo.controller;
  2. import com.example.demo.producer.KafkaProducer;
  3. import org.springframework.beans.factory.annotation.Autowired;
  4. import org.springframework.web.bind.annotation.RequestMapping;
  5. import org.springframework.web.bind.annotation.RequestMethod;
  6. import org.springframework.web.bind.annotation.RequestParam;
  7. import org.springframework.web.bind.annotation.RestController;
  8. @RestController
  9. public class ProducerController {
  10. @Autowired
  11. private KafkaProducer kafkaProducer;
  12. @RequestMapping(value = "/kafka/send.do", method = RequestMethod.GET)
  13. public String sendMessage(@RequestParam(value = "key") String key, @RequestParam(value = "data") String data) {
  14. kafkaProducer.sendMessage(key, data);
  15. return "sucess";
  16. }
  17. }
复制代码

可以发现 spring-kafka大大减少了代码工作量.

官方文档: https://docs.spring.io/spring-ka ... ASE/reference/html/

以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持程序员之家。



回复

使用道具 举报