基于网关服务监控方案
一、需求背景
为监控微服务接口运行情况如响应时间、成功率、接口请求频次、业务高峰期段等信息,需对接口进行监控,并可视化展示。
二、方案选择
方案选择,通常解决方案采用Metics埋点方式,业务系统基础埋点SDK,将埋点日志输出到本地,通过agent将埋点日志发送到Kafka,由Kafka将埋点日志推送到InfuxDB,并通过Grafan进行展示。
这里由于我们服务尚未基础埋点,集成埋点工作量较大,短期内无法落地。同时现有技术体系中未使用InfuxDB等时序数据库和数据展示工具。所以我们采用简单数据采集方案,基于微服务网关的监控方案,及在网关中,拦截所有请求信息,对请求信息的请求和响应进行拦截,记录接口执行时间和结果。这样在一定时间内,完成简单粗放的服务监控。
如需详细了解微服务系统监控体系,请查看我的文章《完善的微服务系统监控体系,影响整个系统的可靠性和稳定性?》
这里从基础设施层、系统层、应用层、业务层、端用户体验等方面讲述如何建立微服务监控体系。
三、架构模型
整体架构模型如上:通过AccessLogFilter
将拦截到的请求发送到Pulsar消息队列中,通过日志服务消费队列中数据,并将数据推送到Es中,通过kibana将数据展示出来,最终效果如下:
我们可以通过Kibana自带的日期筛选工具,查看指定时间、指定条件的数据,如我统计本周失败接口有哪些,
设定好条件后,我们点击Save,就可以看到哪些接口出现500错误,出现的频次和响应时间是多少,还可以在表格中增加错误信息,这样很直观的可以定位到服务哪些接口出现问题。
还可以通过错误信息查询到本次接口的请求参数和响应参数,这样可以帮助研发快速定位和解决服务问题。
OK,我们已经看到了现在方案已经达到我们目前想要的效果了,我们看下如何实现吧。
四、实现方案
开始我们的重头戏,AccessLogFilter
的实现。
4.1、AccessLogFilter
AccessLogFilter
实现非常简单,在网关中配置拦截器,通过拦截器获取请求的请求参数和响应参数,并记录请求开始时间和响应时间,记录时间差,将记录的信息序列化发送到MQ即可。
我们先创建一个GatewayLog
,日志对象,统计下我们需求记录哪些信息:这里我们暂时记录了访问实例、请求路径、请求方方法、协议、错误信息、状态码、请求IP、请求开始时间、时间戳、响应时间、执行时间、请求体、响应体等信息,后续还可以继续完善traceid等信息。
import lombok.Data;
import org.springframework.data.elasticsearch.annotations.DateFormat;
import org.springframework.data.elasticsearch.annotations.Document;
import org.springframework.data.elasticsearch.annotations.Field;
import org.springframework.data.elasticsearch.annotations.FieldType;
import java.io.Serializable;
import java.util.Date;
/**
* 网关记录日志
*
* @author wanghongjie
*/
@Data
@Document(indexName = "gateway_log_index")
public class GatewayLog implements Serializable {
/**
* 访问实例
*/
private String targetServer;
/**
* 请求路径
*/
private String requestPath;
/**
* 请求方法
*/
private String requestMethod;
/**
* 协议
*/
private String schema;
/**
* 消息
*/
private String message;
/**
* 状态码
*/
private String code;
/**
* 请求ip
*/
private String ip;
/**
* 请求时间
*/
@Field(type = FieldType.Date, format = DateFormat.date_time, name = "@requestTime")
private Date requestTime;
/**
* 时间戳
*/
@Field(type = FieldType.Date, format = DateFormat.date_time, name = "@timestamp")
private Date timestamp = new Date();
/**
* 响应时间
*/
@Field(type = FieldType.Date, format = DateFormat.date_time, name = "@responseTime")
private Date responseTime;
/**
* 执行时间
*/
private long executeTime;
/**
* 请求体
*/
private String requestBody;
/**
* 响应体
*/
private String responseData;
}
我们看下AccessLogFilter的实现:
import cn.hutool.core.util.ObjectUtil;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.jdd.gateway.log.AccessLogService;
import com.jdd.gateway.log.GatewayLog;
import com.jdd.gateway.utils.IpUtils;
import com.jdd.gateway.utils.StringUtils;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.GatewayFilterChain;
import org.springframework.cloud.gateway.filter.GlobalFilter;
import org.springframework.cloud.gateway.filter.factory.rewrite.CachedBodyOutputMessage;
import org.springframework.cloud.gateway.route.Route;
import org.springframework.cloud.gateway.support.BodyInserterContext;
import org.springframework.cloud.gateway.support.ServerWebExchangeUtils;
import org.springframework.core.Ordered;
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DataBufferFactory;
import org.springframework.core.io.buffer.DataBufferUtils;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpStatus;
import org.springframework.http.MediaType;
import org.springframework.http.codec.HttpMessageReader;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpRequestDecorator;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.http.server.reactive.ServerHttpResponseDecorator;
import org.springframework.stereotype.Component;
import org.springframework.util.MultiValueMap;
import org.springframework.web.reactive.function.BodyInserter;
import org.springframework.web.reactive.function.BodyInserters;
import org.springframework.web.reactive.function.server.HandlerStrategies;
import org.springframework.web.reactive.function.server.ServerRequest;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import javax.annotation.Resource;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@Slf4j
@Component
public class AccessLogFilter implements GlobalFilter, Ordered {
/**
* 获取响应中状态码
*/
private static final String CODE = "code";
/**
* 记录响应消息
*/
private static final String MESSAGE = "message";
@Resource
private AccessLogService accessLogService;
private final List<HttpMessageReader<?>> messageReaders = HandlerStrategies.withDefaults().messageReaders();
@Override
public int getOrder() {
return -100;
}
@Override
@SuppressWarnings("unchecked")
public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
ServerHttpRequest request = exchange.getRequest();
// 请求路径
String requestPath = request.getPath().pathWithinApplication().value();
Route route = getGatewayRoute(exchange);
//通过IpUtils工具类,获取请求IP,兼容k8s
String ipAddress = IpUtils.getIp(request);
GatewayLog gatewayLog = new GatewayLog();
gatewayLog.setTimestamp(new Date());
gatewayLog.setSchema(request.getURI().getScheme());
gatewayLog.setRequestMethod(request.getMethodValue());
gatewayLog.setRequestPath(requestPath);
gatewayLog.setTargetServer(route.getId());
gatewayLog.setRequestTime(new Date());
gatewayLog.setIp(ipAddress);
MediaType mediaType = request.getHeaders().getContentType();
if (MediaType.APPLICATION_FORM_URLENCODED.isCompatibleWith(mediaType) || MediaType.APPLICATION_JSON.isCompatibleWith(mediaType)) {
return writeBodyLog(exchange, chain, gatewayLog);
} else {
return writeBasicLog(exchange, chain, gatewayLog);
}
}
private Mono<Void> writeBasicLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog accessLog) {
StringBuilder builder = new StringBuilder();
MultiValueMap<String, String> queryParams = exchange.getRequest().getQueryParams();
for (Map.Entry<String, List<String>> entry : queryParams.entrySet()) {
builder.append(entry.getKey()).append("=").append(StringUtils.join(entry.getValue(), ","));
}
accessLog.setRequestBody(builder.toString());
//获取响应体
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, accessLog);
return chain.filter(exchange.mutate().response(decoratedResponse).build()).then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(accessLog);
}));
}
/**
* 解决 request body 只能读取一次问题,
* 参考: org.springframework.cloud.gateway.filter.factory.rewrite.ModifyRequestBodyGatewayFilterFactory
*
* @param exchange
* @param chain
* @param gatewayLog
* @return
*/
@SuppressWarnings("unchecked")
private Mono writeBodyLog(ServerWebExchange exchange, GatewayFilterChain chain, GatewayLog gatewayLog) {
ServerRequest serverRequest = ServerRequest.create(exchange, messageReaders);
Mono<String> modifiedBody = serverRequest.bodyToMono(String.class).flatMap(body -> {
gatewayLog.setRequestBody(body);
return Mono.just(body);
});
// 通过 BodyInserter 插入 body(支持修改body), 避免 request body 只能获取一次
BodyInserter bodyInserter = BodyInserters.fromPublisher(modifiedBody, String.class);
HttpHeaders headers = new HttpHeaders();
headers.putAll(exchange.getRequest().getHeaders());
headers.remove(HttpHeaders.CONTENT_LENGTH);
CachedBodyOutputMessage outputMessage = new CachedBodyOutputMessage(exchange, headers);
return bodyInserter.insert(outputMessage, new BodyInserterContext()).then(Mono.defer(() -> {
// 重新封装请求
ServerHttpRequest decoratedRequest = requestDecorate(exchange, headers, outputMessage);
// 记录响应日志
ServerHttpResponseDecorator decoratedResponse = recordResponseLog(exchange, gatewayLog);
// 记录普通的
return chain.filter(exchange.mutate().request(decoratedRequest).response(decoratedResponse).build()).then(Mono.fromRunnable(() -> {
// 打印日志
writeAccessLog(gatewayLog);
}));
}));
}
/**
* 打印日志
*
* @param gatewayLog 网关日志
* @author wanghongjie
*/
private void writeAccessLog(GatewayLog gatewayLog) {
accessLogService.sendLogToMq(gatewayLog);
}
private Route getGatewayRoute(ServerWebExchange exchange) {
return exchange.getAttribute(ServerWebExchangeUtils.GATEWAY_ROUTE_ATTR);
}
/**
* 请求装饰器,重新计算 headers
*
* @param exchange
* @param headers
* @param outputMessage
* @return
*/
private ServerHttpRequestDecorator requestDecorate(ServerWebExchange exchange, HttpHeaders headers, CachedBodyOutputMessage outputMessage) {
return new ServerHttpRequestDecorator(exchange.getRequest()) {
@Override
public HttpHeaders getHeaders() {
long contentLength = headers.getContentLength();
HttpHeaders httpHeaders = new HttpHeaders();
httpHeaders.putAll(super.getHeaders());
if (contentLength > 0) {
httpHeaders.setContentLength(contentLength);
} else {
httpHeaders.set(HttpHeaders.TRANSFER_ENCODING, "chunked");
}
return httpHeaders;
}
@Override
public Flux<DataBuffer> getBody() {
return outputMessage.getBody();
}
};
}
/**
* 记录响应日志
* 通过 DataBufferFactory 解决响应体分段传输问题。
*/
private ServerHttpResponseDecorator recordResponseLog(ServerWebExchange exchange, GatewayLog gatewayLog) {
ServerHttpResponse response = exchange.getResponse();
DataBufferFactory bufferFactory = response.bufferFactory();
return new ServerHttpResponseDecorator(response) {
@Override
public Mono<Void> writeWith(Publisher<? extends DataBuffer> body) {
if (body instanceof Flux) {
Date responseTime = new Date();
gatewayLog.setResponseTime(responseTime);
// 计算执行时间
long executeTime = (responseTime.getTime() - gatewayLog.getRequestTime().getTime());
gatewayLog.setExecuteTime(executeTime);
// 获取响应类型,如果是 json 就打印
String originalResponseContentType = exchange.getAttribute(ServerWebExchangeUtils.ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR);
if (ObjectUtil.equal(this.getStatusCode(), HttpStatus.OK) && StringUtils.isNotBlank(originalResponseContentType) && originalResponseContentType.contains("application/json")) {
Flux<? extends DataBuffer> fluxBody = Flux.from(body);
return super.writeWith(fluxBody.buffer().map(dataBuffers -> {
// 合并多个流集合,解决返回体分段传输
DataBufferFactory dataBufferFactory = new DefaultDataBufferFactory();
DataBuffer join = dataBufferFactory.join(dataBuffers);
byte[] content = new byte[join.readableByteCount()];
join.read(content);
// 释放掉内存
DataBufferUtils.release(join);
String responseResult = new String(content, StandardCharsets.UTF_8);
JSONObject jsonObject = JSON.parseObject(responseResult);
gatewayLog.setMessage(jsonObject.getString(MESSAGE));
gatewayLog.setCode(Objects.isNull(jsonObject.getInteger(CODE)) ? "200" : jsonObject.getInteger(CODE).toString());
gatewayLog.setResponseData(responseResult);
return bufferFactory.wrap(content);
}));
} else {
gatewayLog.setCode(String.valueOf(Objects.requireNonNull(this.getStatusCode()).value()));
gatewayLog.setMessage(this.getStatusCode().getReasonPhrase());
}
}
// if body is not a flux. never got there.
return super.writeWith(body);
}
};
}
}
这里的AccessLogService
和IpUtils
工具类,源码暂时不提供,大家可自行百度或者关注公众号《杰子学编程》回复"IP工具类"获取相关源码。AccessLogService源码比较简单,就是将GatewayLog发送到Mq;发送Mq代码可查看官方文档或腾讯云-基于腾讯云tdmq消息队列封装SpringBootStarter。
4.2、消费消息
消费消息,就是将消息存储到ES中,代码如下:
@Resource
private ElasticsearchRestTemplate elasticsearchRestTemplate;
@JddPulsarConsumer(topic = AccessLogService.LOG_TOPIC, clazz = CreateMsgBean.class, subscriptionName = "gateway-log-es-consume")
public void consume(JddMessage<CreateMsgBean> msg) {
CreateMsgBean createMsgBean = msg.getData();
GatewayLog gatewayLog = JSONUtil.toBean(JSONUtil.toJsonStr(createMsgBean.getData()), GatewayLog.class);
elasticsearchRestTemplate.save(gatewayLog);
}
注:JddPulsarConsumer 注解为自定义注解,消费消息使用,因公司保密原因不提供源码。可参考官方Pulsar消费者。
获取到gatewayLog
对象,将其存储到es中。
说明:
/**
* 时间戳
*/
@Field(type = FieldType.Date, format = DateFormat.date_time, name = "@timestamp")
private Date timestamp = new Date();
gatewayLog对象中timestamp的作用是在kibana中设置的时间检索条件,需要指定Es索引类型为Date、并且设置format为DateFormat.date_time
,将字段重命名为@timestamp
是为了遵循ELK中时间戳使其统一。
好了,今天分析的内容就到这里了,谢谢大家,有如不妥之处望大家及时指正。欢迎大家关注我的公众号《杰子学编程》,会不定期分析福利给大家。