博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
spring整合kafka项目生产和消费测试结果记录(一)
阅读量:5822 次
发布时间:2019-06-18

本文共 4979 字,大约阅读时间需要 16 分钟。

使用spring+springMVC+mybatis+kafka做了两个web项目,一个是生产者,一个是消费者。

通过JMeter测试工具模拟100个用户并发访问生产者项目,发送json数据给生产者的接口,生产者将json数据发送到kafka集群,

消费者监听到kafka集群中的消息就开始消费,并将json解析成对象存到MySQL数据库。

下面是使用JMeter测试工具模拟100个并发的线程设置截图:

请求所发送的数据:

 

下面是100个用户10000个请求的聚合报告:

 

下面是生产者截图生产完10000条消息的时间截图:

 

下面是消费者项目消费入库的结束时间截图:

 

可见,10000条消息从生产完成到入库(消费完10000条消息的时间只是比生产完成的时间落后了几十秒,但是消费端真正入库完成所需要的时间很长)完成时间相差了10几分钟。

 

下面是MySQL数据库截图,数据全部入库成功:

下面是消息对应的POJO:

1 package com.xuebusi.pojo; 2  3 public class TbPerson { 4     private Long id; 5  6     private String name; 7  8     private Integer age; 9 10     public Long getId() {11         return id;12     }13 14     public void setId(Long id) {15         this.id = id;16     }17 18     public String getName() {19         return name;20     }21 22     public void setName(String name) {23         this.name = name == null ? null : name.trim();24     }25 26     public Integer getAge() {27         return age;28     }29 30     public void setAge(Integer age) {31         this.age = age;32     }33 34     @Override35     public String toString() {36         return "TbPerson [id=" + id + ", name=" + name + ", age=" + age + "]";37     }38 }

 

下面是生产端的逻辑:

1 package com.xuebusi.controller; 2  3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.KafkaService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.stereotype.Controller; 9 import org.springframework.web.bind.annotation.RequestBody;10 import org.springframework.web.bind.annotation.RequestMapping;11 import org.springframework.web.bind.annotation.RequestMethod;12 import org.springframework.web.bind.annotation.ResponseBody;13 14 import javax.annotation.Resource;15 16 @Controller17 @RequestMapping("/producer")18 public class KafkaController {19 20     private static final Logger logger = LoggerFactory.getLogger(KafkaController.class);21 22     @Resource23     private KafkaService kafkaService;24 25     /**26      * 发消息到ssmk这个topic27      * @param person28      * @return29      */30     @RequestMapping(value = "/person", method = RequestMethod.POST)31     @ResponseBody32     public String createPerson(@RequestBody TbPerson person) {33         if (person == null){34             return "fail, data can not be null.";35         }36         String json = JSON.toJSONString(person);37         boolean result = kafkaService.sendInfo("ssmk", json);38         logger.info("生产者发送消息[" + result + "]:" + json);39         return "success";40     }41 }

 

下面是消费端的逻辑:

1 package com.xuebusi.consumer; 2  3 import com.alibaba.fastjson.JSON; 4 import com.xuebusi.pojo.TbPerson; 5 import com.xuebusi.service.PersonService; 6 import org.slf4j.Logger; 7 import org.slf4j.LoggerFactory; 8 import org.springframework.beans.factory.annotation.Autowired; 9 import org.springframework.stereotype.Service;10 11 import java.util.List;12 import java.util.Map;13 14 @Service15 public class KafkaConsumerService {16     private static final Logger logger = LoggerFactory.getLogger(KafkaConsumerService.class);17 18     @Autowired19     private PersonService personService;20 21     public void processMessage(Map
> msgs) {22 /*for (Map.Entry
> entry : msgs.entrySet()) {23 String topic = entry.getKey();24 Map
value = entry.getValue();25 for (Map.Entry
entrySet : value.entrySet()) {26 Integer partiton = entrySet.getKey();27 String msg = entrySet.getValue();28 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);29 logger.info("=======使用JSON解析对象=========");30 TbPerson person = JSON.parseObject(msg, TbPerson.class);31 logger.info("=======对象开始入库=========");32 personService.insert(person);33 logger.info("=======对象入库成功=========");34 }35 }*/36 37 for (Map.Entry
> entry : msgs.entrySet()) {38 String topic = entry.getKey();39 Map
value = entry.getValue();40 for (Map.Entry
entrySet : value.entrySet()) {41 Integer partiton = entrySet.getKey();42 String msg = entrySet.getValue();43 logger.info("消费的主题:" + topic + ",消费的分区:" + partiton + ",消费的消息:" + msg);44 msg = "[" + msg + "]";//注意这里要在前后都加上中括号,否则下面在解析json成对象的时候会报json格式不对的异常(spring会对多条json数据用逗号分隔)45 logger.info("=======使用JSON解析对象=========");46 List
personList = JSON.parseArray(msg, TbPerson.class);47 //TbPerson person = JSON.parseObject(msg, TbPerson.class);48 if (personList != null && personList.size() > 0) {49 logger.info("消息中包含[" + personList.size() + "]个对象");50 for (TbPerson person : personList) {51 logger.info("=======对象开始入库=========");52 personService.insert(person);53 logger.info("=======对象入库成功=========");54 }55 }56 57 }58 }59 }60 }

 

如果觉得本文对您有帮助,不妨扫描下方微信二维码打赏点,您的鼓励是我前进最大的动力:

 

转载于:https://www.cnblogs.com/jun1019/p/6580371.html

你可能感兴趣的文章
Java内存 -JVM 内存管理
查看>>
我在51testing客串专家时候的互动问题
查看>>
docker环境elasticSearch5.5 head 插件安装步骤
查看>>
Report viewer 水平滚动条
查看>>
关于AbstractCollection
查看>>
Centos 6.5 mkisofs kickstart 制作自动安装iso镜像 光盘
查看>>
Nginx完整配置说明
查看>>
Java核心技术卷1: Java基础知识汇总
查看>>
web服务器的ddos***
查看>>
WebView使用中存在的问题
查看>>
Tomcat服务器SSL报错问题
查看>>
Python实例:毛玻璃效果
查看>>
Android开发中出现的问题及解决(一)
查看>>
6 个重构方法可帮你提升 80% 的代码质量
查看>>
陆上行舟,现在还是理想
查看>>
SaltStack安装与配置
查看>>
惰性载入函数
查看>>
一张图学会数据库迁云最佳路径
查看>>
阿里云MaxCompute被Forrester评为全球云端数据仓库领导者
查看>>
生产场景NFS共享存储优化及实战
查看>>