ELK-Kafka分布式日志收集
1. ELK搭建详细教程参考
ELK详细教程-地址一
ELK详细教程-地址二
2. ELK-Kafka分布式日志收集架构设计
使用SpringAop进行日志收集,然后通过kafka将日志发送给logstash,logstash再将日志写入elasticsearch,这样elasticsearch就有了日志数据了,最后,则使用kibana将存放在elasticsearch中的日志数据显示出来,并且可以做实时的数据图表分析等等。
3. 环境搭建部署
3.1 环境准备
服务名 |
Docker ip地址 |
宿主机ip地址 |
开放端口 |
功能 |
elasticsearch-one |
172.20.0.2 |
192.168.80.130 |
9200、9300 |
搜索 |
elasticsearch-two |
172.20.0.3 |
192.168.80.130 |
9202、9303 |
搜索 |
logstash |
172.20.0.4 |
192.168.80.130 |
5044 |
日志收集 |
kibana |
172.20.0.5 |
192.168.80.130 |
5061 |
展示、监控 |
zookeeper |
172.20.0.6 |
192.168.80.130 |
2181 |
注册配置中心 |
kafka |
172.20.0.7 |
192.168.80.130 |
9092 |
消息中间件,提供发布订阅功能 |
kafka-manage |
172.20.0.8 |
192.168.80.130 |
9000 |
kafka界面化管理 |
备注: 由于需要整合springboot程序,所以elasticsearch必须是集群形式,至少需要两台,否则程序启动会报错
3.2 创建网络
1
| $ docker network create --driver=bridge --subnet=172.20.0.1/16 elk-kafka-network
|
3.3 elasticsearch
3.3.1 elasticsearch配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| $ vim es-one.yml
cluster.name: elasticsearch-cluster node.name: es-node-one network.bind_host: 0.0.0.0 network.publish_host: 172.20.0.2 http.port: 9200 transport.tcp.port: 9300 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true discovery.zen.ping.unicast.hosts: ["172.20.0.2:9300","172.20.0.3:9303"] discovery.zen.minimum_master_nodes: 1
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| $ vim es-two.yml
cluster.name: elasticsearch-cluster node.name: es-node-two network.bind_host: 0.0.0.0 network.publish_host: 172.20.0.3 http.port: 9202 transport.tcp.port: 9303 http.cors.enabled: true http.cors.allow-origin: "*" node.master: true node.data: true discovery.zen.ping.unicast.hosts: ["172.20.0.2:9300","172.20.0.3:9303"] discovery.zen.minimum_master_nodes: 1
|
3.3.2 docker-compose配置文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| $ vim elasticsearch-one.yml
version: '3' services: elasticsearch-one: image: elasticsearch container_name: elasticsearch-one restart: always networks: default: ipv4_address: 172.20.0.2 ports: - 9200:9200 - 9300:9300 volumes: - ./es-one.yml:/usr/share/elasticsearch/config/elasticsearch.yml - ./plugins-one:/usr/share/elasticsearch/plugins
networks: default: external: name: elk-kafka-network
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| $ vim elasticsearch-one.yml
version: '3' services: elasticsearch-two: image: elasticsearch container_name: elasticsearch-two restart: always networks: default: ipv4_address: 172.20.0.3 ports: - 9202:9202 - 9303:9303 volumes: - ./es-two.yml:/usr/share/elasticsearch/config/elasticsearch.yml - ./plugins-two:/usr/share/elasticsearch/plugins
networks: default: external: name: elk-kafka-network
|
3.3.3 构建脚本
1 2 3 4 5 6 7
| $ vim build-one.sh
docker-compose -f elasticsearch-one.yml stop
docker-compose -f elasticsearch-one.yml rm --force
docker-compose -f elasticsearch-one.yml up -d
|
1 2 3 4 5 6 7
| $ vim build-two.sh
docker-compose -f elasticsearch-two.yml stop
docker-compose -f elasticsearch-two.yml rm --force
docker-compose -f elasticsearch-two.yml up -d
|
执行构建脚本
1
| $ chmod +x build-one.sh build-two.sh && ./build-one.sh && ./build-two.sh
|
3.4 logstash
3.4.1 订阅kafka数据来源配置文件
编辑logstash_kafka.conf
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| input { kafka { bootstrap_servers => "192.168.80.130:9092" topics => ["my_log"] } } output { stdout { codec => rubydebug } elasticsearch { hosts => ["192.168.80.130:9200","192.168.80.130:9202"] index => "my_log" } }
|
3.4.2 docker-compose配置文件
编辑logstash.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| version: '3' services: logstash: image: logstash container_name: logstash restart: always networks: default: ipv4_address: 172.20.0.4 ports: - 5044:5044 - 4560:4560 - 8080:8080 volumes: - ./data:/data - ./config:/config - ./logs/tomcat.logs:/tomcat.logs - ./patterns:/opt/logstash/patterns - ./mysql/mysql-connector-java-5.1.46.jar:/mysql-connector-java-5.1.46.jar external_links: - elasticsearch:elasticsearch command: bash -c "chmod +x /data && logstash -f logstash_kafka.conf --path.data=/data"
networks: default: external: name: elk-kafka-network
|
3.4.3 构建脚本
编辑build.sh
1 2 3 4 5
| docker-compose -f logstash.yml stop
docker-compose -f logstash.yml rm --force
docker-compose -f logstash.yml up -d
|
执行构建脚本
1
| $ chmod +x build.sh && ./build.sh
|
3.5 kibana
3.5.1 docker-compose配置文件
编辑kibana.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| version: '3' services: kibana: image: kibana container_name: kibana restart: always networks: default: ipv4_address: 172.20.0.5 environment: - ELASTICSEARCH_URL=http://172.20.0.2:9200 ports: - 5601:5601 external_links: - elasticsearch-one:elasticsearch-one - elasticsearch-two:elasticsearch-two
networks: default: external: name: elk-kafka-network
|
3.5.2 构建脚本
编辑build.sh
1 2 3 4 5
| docker-compose -f kibana.yml stop
docker-compose -f kibana.yml rm --force
docker-compose -f kibana.yml up -d
|
执行构建脚本
1
| $ chmod +x build.sh && ./build.sh
|
3.6 zookeeper
3.6.1 docker-compose配置文件
编辑zookeeper.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| version: '3' services: zookeeper: image: wurstmeister/zookeeper container_name: zookeeper restart: always networks: default: ipv4_address: 172.20.0.6 ports: - 2181:2181
networks: default: external: name: elk-kafka-network
|
3.6.2 构建脚本
编辑build.sh
1 2 3 4 5
| docker-compose -f zookeeper.yml stop
docker-compose -f zookeeper.yml rm --force
docker-compose -f zookeeper.yml up -d
|
执行构建脚本
1
| $ chmod +x build.sh && ./build.sh
|
3.7 kafka
3.7.1 docker-compose配置文件
编辑kafka.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| version: '3' services: kafka: image: wurstmeister/kafka container_name: kafka restart: always environment: #- KAFKA_BROKER_ID=0 #- KAFKA_ZOOKEEPER_CONNECT=192.168.80.130:2181 #- KAFKA_ADVERTISED_LISTENERS=PLAINTEXT: #- KAFKA_LISTENERS=PLAINTEXT: KAFKA_ADVERTISED_LISTENERS: PLAINTEXT: KAFKA_ZOOKEEPER_CONNECT: 192.168.80.130:2181 KAFKA_LISTENERS: PLAINTEXT: KAFKA_DELETE_TOPIC_ENABLE: "true" KAFKA_BROKER_ID: 0 networks: default: ipv4_address: 172.20.0.7 ports: - 9092:9092 external_links: - zookeeper:zookeeper
networks: default: external: name: elk-kafka-network
|
3.7.2 构建脚本
编辑build.sh
1 2 3 4 5
| docker-compose -f kafka.yml stop
docker-compose -f kafka.yml rm --force
docker-compose -f kafka.yml up -d
|
执行构建脚本
1
| $ chmod +x build.sh && ./build.sh
|
3.8 kafka-manager
3.8.1 docker-compose配置文件
编辑kafka-manager.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| version: '3' services: kafka-manager: image: sheepkiller/kafka-manager container_name: kafka-manager restart: always environment: ZK_HOSTS: 172.20.0.6:2181 networks: default: ipv4_address: 172.20.0.8 ports: - 9000:9000
networks: default: external: name: elk-kafka-network
|
3.8.2 构建脚本
编辑build-manager.sh
1 2 3 4 5
| docker-compose -f kafka-manager.yml stop
docker-compose -f kafka-manager.yml rm --force
docker-compose -f kafka-manager.yml up -d
|
执行构建脚本
1
| $ chmod +x build-manager.sh && ./build-manager.sh
|
3.9 搭建部署完成
- 查看docker容器状态是否正常
- 查看elasticsearch集群状态是否正常
1
| $ curl http://localhost:9200 && curl http://localhost:9202
|
测试集群效果:
http://192.168.80.130:9200/_cat/nodes?pretty
- kafka需要添加指定主题my_log
1 2 3 4 5 6 7
| $ docker exec -it kafka /bin/bash
/opt/kafka/bin/kafka-topics.sh --create --zookeeper 192.168.80.130:2181 --replication-factor 1 --partitions 1 --topic my_log
/opt/kafka/bin/kafka-topics.sh --list --zookeeper 192.168.80.130:2181
|
4. springboot整合elk+kafka
4.1 elasticsearch查询模块
引入maven依赖
1 2 3 4 5 6 7 8 9 10 11 12 13
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-data-elasticsearch</artifactId> <version>2.2.6.RELEASE</version> </dependency> <dependency> <groupId>com.querydsl</groupId> <artifactId>querydsl-apt</artifactId> </dependency> <dependency> <groupId>com.querydsl</groupId> <artifactId>querydsl-jpa</artifactId> </dependency>
|
添加实体类ProductEntity
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| @Document(indexName = "product", type = "product") @Data public class ProductEntity {
private Integer id;
private Integer categoryId;
private String name;
private String subtitle;
private String mainImage;
private String subImages;
private String detail;
private String attributeList;
private Double price;
private Integer stock;
private Integer status;
private String createdBy;
private Date createdTime;
private MappingMetaData.Timestamp updatedTime; }
|
添加es查询接口ProductReposiory
1 2
| public interface ProductReposiory extends ElasticsearchRepository<ProductEntity, Long> { }
|
添加查询接口以及实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public interface ProductSearchService {
List<ProductEntity> search(String name, @PageableDefault(page = 0, value = 10) Pageable pageable);
}
@Service public class ProductSearchServiceImpl implements ProductSearchService {
@Autowired private ProductReposiory productReposiory;
@Override public List<ProductEntity> search(String name, @PageableDefault(page = 0, value = 10) Pageable pageable) {
BoolQueryBuilder builder = QueryBuilders.boolQuery(); builder.must(QueryBuilders.multiMatchQuery(name, "name", "subtitle", "detail")); Page<ProductEntity> page = productReposiory.search(builder, pageable); List<ProductEntity> content = page.getContent(); return content; }
}
|
添加访问层ProductController
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| @RestController @RequestMapping("elk") public class ProductController {
@Autowired private ProductSearchService productSearchService;
@Cache(key = "product") @PostMapping("search") public List<ProductEntity> search(@RequestParam("name") String name) { Pageable pageable = PageRequest.of(0, 10); return productSearchService.search(name, pageable); } }
|
添加配置文件application.yml
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| server: port: 9000
spring: application: name: elk-es data: elasticsearch: cluster-name: elasticsearch-cluster cluster-nodes: 192.168.80.130:9300 kafka: bootstrap-servers: 192.168.80.130:9092
redis: host: 114.55.34.44 port: 6379 password: root timeout: 2000 jedis: pool: maxActive: 300 maxIdle: 100 maxWait: 1000 sysName: admin enable: true database: 0
kafka: topic: my_log
|
4.2 kafka日志收集模块
引入maven依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.1.5.RELEASE</version> </dependency> <dependency> <groupId>org.aspectj</groupId> <artifactId>aspectjrt</artifactId> <version>1.8.6</version> </dependency>
|
添加kafka推送消息类KafkaSender
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| @Component @Slf4j public class KafkaSender<T> {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
@Value("${kafka.topic}") public String kafkaTopic;
public void send(T obj) { String jsonObj = JSON.toJSONString(obj); log.info("------------ message = {}", jsonObj);
ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(kafkaTopic, jsonObj); future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() { @Override public void onFailure(Throwable throwable) { log.info("Produce: The message failed to be sent:" + throwable.getMessage()); }
@Override public void onSuccess(SendResult<String, Object> stringObjectSendResult) { log.info("Produce: The message was sent successfully:"); log.info("Produce: _+_+_+_+_+_+_+ result: " + stringObjectSendResult.toString()); } }); }
}
|
添加AOP拦截方法请求日志收集类AopLogAspect和全局错误日志收集类GlobalExceptionHandler
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93
| @Aspect @Slf4j @Component public class AopLogAspect {
@Autowired private KafkaSender<JSONObject> kafkaSender;
@Pointcut("execution(* com.elk.*.controller.*.*(..))") private void serviceAspect() { }
@Before(value = "@annotation(com.elk.elkkafka.annotation.MonitorRequest)") public void doBefore(JoinPoint joinPoint) { ServletRequestAttributes attributes = (ServletRequestAttributes) RequestContextHolder.getRequestAttributes(); HttpServletRequest request = attributes.getRequest(); log.info("url=" + request.getRequestURL()); log.info("method=" + request.getMethod()); log.info("ip=" + request.getRemoteAddr()); log.info("class=" + joinPoint.getSignature().getDeclaringTypeName() + "and method name=" + joinPoint.getSignature().getName()); log.info("参数=" + joinPoint.getArgs().toString()); }
@Before(value = "serviceAspect()") public void methodBefore(JoinPoint joinPoint) { ServletRequestAttributes requestAttributes = (ServletRequestAttributes) RequestContextHolder .getRequestAttributes(); HttpServletRequest request = requestAttributes.getRequest();
log.info("===============请求内容==============="); log.info("请求地址:" + request.getRequestURL().toString()); log.info("请求方式:" + request.getMethod()); log.info("请求类方法:" + joinPoint.getSignature()); log.info("请求类方法参数:" + Arrays.toString(joinPoint.getArgs())); log.info("===============请求内容===============");
JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); jsonObject.put("request_time", df.format(new Date())); jsonObject.put("request_url", request.getRequestURL().toString()); jsonObject.put("request_method", request.getMethod()); jsonObject.put("signature", joinPoint.getSignature()); jsonObject.put("request_args", Arrays.toString(joinPoint.getArgs())); try { jsonObject.put("request_ip", WebToolUtils.getLocalIP()); } catch (UnknownHostException e) { e.printStackTrace(); } catch (SocketException e) { e.printStackTrace(); } JSONObject requestJsonObject = new JSONObject(); requestJsonObject.put("request", jsonObject); kafkaSender.send(requestJsonObject); }
@AfterReturning(returning = "o", pointcut = "serviceAspect()") public void methodAfterReturing(Object o) {
log.info("--------------返回内容----------------"); log.info("Response内容:" + o.toString()); log.info("--------------返回内容----------------"); JSONObject respJSONObject = new JSONObject(); JSONObject jsonObject = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); jsonObject.put("response_time", df.format(new Date())); jsonObject.put("response_content", JSONObject.toJSONString(o)); respJSONObject.put("response", jsonObject); kafkaSender.send(respJSONObject); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| @ControllerAdvice @Slf4j public class GlobalExceptionHandler {
@Autowired private KafkaSender<JSONObject> kafkaSender;
@ExceptionHandler(RuntimeException.class) @ResponseBody public JSONObject exceptionHandler(Exception e) { log.info("###全局捕获异常###,error:{}", e);
JSONObject errorJson = new JSONObject(); JSONObject logJson = new JSONObject(); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); logJson.put("request_time", df.format(new Date())); logJson.put("error_info", e);
errorJson.put("request_error", logJson); kafkaSender.send(errorJson); JSONObject result = new JSONObject(); result.put("code", 500); result.put("msg", "系统错误");
return result; }
}
|
4.3 测试日志收集
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| $ curl http://localhost:9000/elk/search?name=苹果
[ { "id": 1, "categoryId": null, "name": "Pad平板电脑无线局域网", "subtitle": "Pad平板电脑", "mainImage": null, "subImages": null, "detail": "官方授权Pad苹果电脑", "attributeList": null, "price": null, "stock": null, "status": 0, "createdBy": null, "createdTime": null, "updatedTime": null } ]
|
查询kibana,发现数据已经同步: