使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者。
通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群,
消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库。
下面是使用JMeter测试工具模拟100个并发的线程设置截图:
请求所发送的数据:
下面是100个用户10000个请求的聚合报告:
下面是生产者截图生产完10000条消息的时间截图:
下面是消费者项目消费入库的结束时间截图:
可见,10000条消息从生产完成到入库(消费完10000条消息的时间只是比生产完成的时间落后了几十秒,但是消费端真正入库完成所需要的时间很长)完成时间相差了10几分钟。
下面是MySQL数据库截图,数据全部入库成功:
下面是消息对应的POJO:
1 package com.xuebusi.pojo; 2 3 public class TbPerson { 4 private Long id; 5 6 private String name; 7 8 private Integer age; 9 10 public Long getId() {11 return id;12 }13 14 public void setId(Long id) {15 this.id = id;16 }17 18 public String getName() {19 return name;20 }21 22 public void setName(String name) {23 this.name = name == null ? null : name.trim();24 }25 26 public Integer getAge() {27 return age;28 }29 30 public void setAge(Integer age) {31 this.age = age;32 }33 34 @Override35 public String toString() {36 return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]";37 }38 }
下面是生产端的逻辑:
1 package com.xuebusi.controller; 2 3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.KafkaService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.stereotype.Controller; 9 import org.springframework.web.bind.annotation.RequestBody;10 import org.springframework.web.bind.annotation.RequestMapping;11 import org.springframework.web.bind.annotation.RequestMethod;12 import org.springframework.web.bind.annotation.ResponseBody;13 14 import javax.annotation.Resource;15 16 @Controller17 @RequestMapping("/producer")18 public class KafkaController {19 20 private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);21 22 @Resource23 private KafkaService kafkaService;24 25 /**26 * 发消息到ssmk这个topic27 * @param person28 * @return29 */30 @RequestMapping(value = "/person", method = RequestMethod.POST)31 @ResponseBody32 public String createPerson(@RequestBody TbPerson person) {33 if (person == null){34 return "fail, data can not be null.";35 }36 String json = JSON.toJSONString(person);37 boolean result = kafkaService.sendInfo("ssmk", json);38 logger.info("生产者发送消息[" + result + "]:" + json);39 return "success";40 }41 }
下面是消费端的逻辑:
1 package com.xuebusi.consumer; 2 3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.PersonService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Service;10 11 import java.util.List;12 import java.util.Map;13 14 @Service15 public class KafkaConsumerService {16 private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);17 18 @Autowired19 private PersonService personService;20 21 public void processMessage(Map> msgs) {22 /*for (Map.Entry > entry : msgs.entrySet()) {23 String topic = entry.getKey();24 Map value = entry.getValue();25 for (Map.Entry entrySet : value.entrySet()) {26 Integer partiton = entrySet.getKey();27 String msg = entrySet.getValue();28 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);29 logger.info("=======使用JSON解析对象=========");30 TbPerson person = JSON.parseObject(msg, TbPerson.class);31 logger.info("=======对象开始入库=========");32 personService.insert(person);33 logger.info("=======对象入库成功=========");34 }35 }*/36 37 for (Map.Entry > entry : msgs.entrySet()) {38 String topic = entry.getKey();39 Map value = entry.getValue();40 for (Map.Entry entrySet : value.entrySet()) {41 Integer partiton = entrySet.getKey();42 String msg = entrySet.getValue();43 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);44 msg = "[" + msg + "]";//注意这里要在前后都加上中括号,否则下面在解析json成对象的时候会报json格式不对的异常(spring会对多条json数据用逗号分隔)45 logger.info("=======使用JSON解析对象=========");46 List personList = JSON.parseArray(msg, TbPerson.class);47 //TbPerson person = JSON.parseObject(msg, TbPerson.class);48 if (personList != null && personList.size() > 0) {49 logger.info("消息中包含[" + personList.size() + "]个对象");50 for (TbPerson person : personList) {51 logger.info("=======对象开始入库=========");52 personService.insert(person);53 logger.info("=======对象入库成功=========");54 }55 }56 57 }58 }59 }60 }
如果觉得本文对您有帮助,不妨扫描下方微信二维码打赏点,您的鼓励是我前进最大的动力: