可配置的通用 Prometheus 监控模块

Prometheus

  Prometheus 是目前最流行的开源系统监控报警框架,易于配置和使用,基于 PromQL 可以很方便的在 Grafana 上展示各种需要的图表。

  Prometheus 的生态丰富各类扩展和支持都非常齐全,基本不需要二次开发,就可以找到你所使用的语言和模块的支持,很方便的嵌入。

指标类型

  • Counter

    Counter 类型的指标其工作方式和计数器一样,只增不减(除非系统发生重置)。常见的监控指标,如http_requests_totalnode_cpu 都是 Counter 类型的监控指标。 一般在定义 Counter 类型指标的名称时推荐使用 _total 作为后缀。

  • Gauge

    Gauge 类型的指标侧重于反应系统的当前状态。因此这类指标的样本数据可增可减。常见指标如:node_memory_MemFree(主机当前空闲的内容大小)、node_memory_MemAvailable(可用内存大小)都是 Gauge 类型的监控指标。

  • Histogram

    Histogram<basename>_bucket{le="<upper inclusive bound>"}<basename>_bucket{le="+Inf"}, <basename>_sum<basename>_count 组成,主要用于表示一段时间范围内对数据进行采样(通常是请求持续时间或响应大小),并能够对其指定区间以及总数进行统计,通常它采集的数据展示为直方图。如:请求持续时间,响应大小。

  • Summary

    Histogram 类似,由 <basename>{quantile="<φ>"}<basename>_sum<basename>_count 组成,主要用于表示一段时间内数据采样结果(通常是请求持续时间或响应大小),它直接存储了 quantile 数据,而不是根据统计区间计算出来的。

造轮子

  对于复杂的微服务架构,完善的监控和报警是十分重要的。我们项目主要使用 Prometheus 收集监控数据, Grafana 负责展示监控图表和报警(钉钉插件),目前工作良好。

  最主要的监控是错误异常、HTTP 响应、GRPC 响应、GRCP 调用四个方面,错误异常使用 Counter 统计次数,HTTP & GRPC 响应和 GRPC 调用使用 Histogram 统计次数和耗时。

  在查看了一些常用的 Prometheus 开源工具库项目之后,我最后还是决定自己造轮子,因为那些项目都感觉太重了,也不太符合我的需求,我大概只需要一个简简单单、可配置的高效轻量级 lib 就可以了。

轮子

  仔细思考了小半天,写了 300 行代码来实现了具体需求,项目中实际使用了半个月,工作良好,基本够用了。

代码

  主要实现了以下特性:

  • 使用 toml 文件配置,可配置 const labels,自定义 vector
  • HTTP & GRPCInterceptor 模式;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
package prometheus

import (
stdCtx "context"
"errors"
"fmt"
"github.com/kataras/iris/v12"
"github.com/kataras/iris/v12/context"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promhttp"
"google.golang.org/grpc"
"google.golang.org/grpc/peer"
"net"
"otrade/lib/golib/logger"
"strconv"
"strings"
"time"
)

const (
TypeCounter = iota
TypeGauge
TypeHistogram
TypeSummary
)

var (
// DefaultBuckets prometheus buckets in seconds.
DefaultBuckets = []float64{0.1, 0.3, 0.5, 1.0, 3.0, 5.0}
)

type VectorConfig struct {
Name string `toml:"name" json:"name"`
Desc string `toml:"desc" json:"desc"`
Type int `toml:"type" json:"type"`
Labels []string `toml:"labels" json:"labels"`
IgnoreConstLabel bool `toml:"ignore_const_label" json:"ignore_const_label"`
}

type Config struct {
ConstLabels map[string]string `toml:"const_labels" json:"const_labels"`
Vectors []*VectorConfig `toml:"vectors" json:"vectors"`
}

type Vector struct {
config *VectorConfig
vec prometheus.Collector
logger *logger.Logger
}

func (v *Vector) Trigger(value float64, labels ...string) {
if len(labels) != len(v.config.Labels) {
v.logger.Errorf("invalid vector labels | name: %s | labels: %+v", v.config.Name, labels)
return
}

switch v.config.Type {
case TypeGauge:
v.vec.(*prometheus.GaugeVec).WithLabelValues(labels...).Set(value)
case TypeHistogram:
v.vec.(*prometheus.HistogramVec).WithLabelValues(labels...).Observe(value)
case TypeSummary:
v.vec.(*prometheus.SummaryVec).WithLabelValues(labels...).Observe(value)
case TypeCounter:
v.vec.(*prometheus.CounterVec).WithLabelValues(labels...).Inc()
}
}

// NOTE: vector type must is Histogram or Summary
func (v *Vector) HttpInterceptor(ctx context.Context) {
start := time.Now()
ctx.Next()

r := ctx.Request()
statusCode := strconv.Itoa(ctx.GetStatusCode())
duration := float64(time.Since(start).Nanoseconds()) / 1000000000
labels := []string{statusCode, r.Method, r.URL.Path}

v.Trigger(duration, labels...)
}

func getClietIP(ctx stdCtx.Context) (ip string, err error) {
pr, ok := peer.FromContext(ctx)

if !ok {
err = fmt.Errorf("invoke FromContext() failed")
return
}

if pr.Addr == net.Addr(nil) {
err = fmt.Errorf("peer.Addr is nil")
return
}

ip = strings.Split(pr.Addr.String(), ":")[0]
return
}

// NOTE: vector type must is Histogram or Summary
func (v *Vector) GrpcServerUnaryInterceptor(ctx stdCtx.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) {
start := time.Now()
clientIp := "unknown"

if ip, e := getClietIP(ctx); e == nil {
clientIp = ip
}

resp, err = handler(ctx, req)
duration := float64(time.Since(start).Nanoseconds()) / 1000000000
labels := []string{info.FullMethod, clientIp}

v.Trigger(duration, labels...)
return
}

// NOTE: vector type must is Histogram or Summary
func (v *Vector) GrpcServerStreamInterceptor(srv interface{}, stream grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler) (err error) {
start := time.Now()
clientIp := "unknown"

if ip, e := getClietIP(stream.Context()); e == nil {
clientIp = ip
}

err = handler(srv, stream)
duration := float64(time.Since(start).Nanoseconds()) / 1000000000
labels := []string{info.FullMethod, clientIp}

v.Trigger(duration, labels...)
return
}

// NOTE: vector type must is Histogram or Summary
func (v *Vector) GrpcClientUnaryInterceptor(ctx stdCtx.Context, method string, req, resp interface{}, conn *grpc.ClientConn, invoker grpc.UnaryInvoker, options ...grpc.CallOption) (err error) {
start := time.Now()
err = invoker(ctx, method, req, resp, conn, options...)
duration := float64(time.Since(start).Nanoseconds()) / 1000000000
labels := []string{method, conn.Target()}

v.Trigger(duration, labels...)
return
}

// NOTE: vector type must is Histogram or Summary
func (v *Vector) GrpcClientStreamInterceptor(ctx stdCtx.Context, desc *grpc.StreamDesc, conn *grpc.ClientConn, method string, streamer grpc.Streamer, options ...grpc.CallOption) (stream grpc.ClientStream, err error) {
start := time.Now()
stream, err = streamer(ctx, desc, conn, method, options...)
duration := float64(time.Since(start).Nanoseconds()) / 1000000000
labels := []string{method, conn.Target()}

v.Trigger(duration, labels...)
return
}

func (v *Vector) Config() *VectorConfig {
return v.config
}

type Monitor struct {
config *Config
vectors map[string]*Vector
logger *logger.Logger
}

func (m *Monitor) Register(config *VectorConfig) (err error) {
constLabels := m.config.ConstLabels

if config.IgnoreConstLabel {
constLabels = nil
}

var vec prometheus.Collector

switch config.Type {
case TypeHistogram:
vec = prometheus.NewHistogramVec(
prometheus.HistogramOpts{
Name: config.Name,
Help: config.Desc,
ConstLabels: constLabels,
Buckets: DefaultBuckets,
},
config.Labels,
)
case TypeGauge:
vec = prometheus.NewGaugeVec(
prometheus.GaugeOpts{
Name: config.Name,
Help: config.Desc,
ConstLabels: constLabels,
},
config.Labels,
)
case TypeSummary:
vec = prometheus.NewSummaryVec(
prometheus.SummaryOpts{
Name: config.Name,
Help: config.Desc,
ConstLabels: constLabels,
},
config.Labels,
)
case TypeCounter:
vec = prometheus.NewCounterVec(
prometheus.CounterOpts{
Name: config.Name,
Help: config.Desc,
ConstLabels: constLabels,
},
config.Labels,
)
default:
err = errors.New("invalid monitor type")
return
}

m.vectors[config.Name] = &Vector{config: config, vec: vec, logger: m.logger}
prometheus.MustRegister(vec)
return
}

func (m *Monitor) Trigger(name string, value float64, labels ...string) {
vector, ok := m.vectors[name]

if !ok {
m.logger.Errorf("unknown monitor vector | name: %s", name)
return
}

vector.Trigger(value, labels...)
}

func (m *Monitor) Vector(name string) (vector *Vector) {
vec, ok := m.vectors[name]

if !ok {
m.logger.Errorf("unknown monitor vector | name: %s", name)
return
}

vector = vec
return
}

func (m *Monitor) Metrics() context.Handler {
return iris.FromStd(promhttp.Handler())
}

func New(config *Config, logger *logger.Logger) (monitor *Monitor, err error) {
monitor = &Monitor{
config: config,
vectors: make(map[string]*Vector, len(config.Vectors)),
logger: logger,
}

for _, c := range config.Vectors {
if err = monitor.Register(c); err != nil {
break
}
}

return
}

配置文件

  项目中错误异常、HTTP 响应、GRPC 响应、GRCP 调用的 vector 配置文件示例如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
[monitor]
const_labels = { env: "dev", service: "test", host: "10.100.38.218" }

[[monitor.vectors]]
name = "http_server_requests_seconds"
desc = "How long it took to process the HTTP request, partitioned by status code, method and HTTP path."
type = 2
labels = ["code", "method", "path"]

[[monitor.vectors]]
name = "grpc_server_requests_seconds"
desc = "How long it took to process the GRPC request, partitioned by method and client ip."
type = 2
labels = ["method", "client_ip"]

[[monitor.vectors]]
name = "grpc_client_requests_seconds"
desc = "How long it took to call the GRPC request, partitioned by method and server address."
type = 2
labels = ["method", "server_addr"]

[[monitor.vectors]]
name = "error_code_total"
desc = "How many error raised, partitioned by error code and critical flag."
type = 0
labels = ["error_code", "critical"]

初始化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package monitor

import (
"github.com/BurntSushi/toml"
"github.com/opay-o2o/golib/logger"
"github.com/opay-o2o/golib/prometheus"
)

const (
VectorErrorName = "error_code_total"
VectorHttpServerRequestName = "http_server_requests_seconds"
VectorGrpcClientRequestName = "grpc_client_requests_seconds"
VectorGrpcServerRequestName = "grpc_server_requests_seconds"
)

type Config struct {
Monitor *prometheus.Config `toml:"monitor"`
}

type Monitor struct {
Monitor *prometheus.Monitor
ErrorVec *prometheus.Vector
HttpServerVec *prometheus.Vector
GrpcClientVec *prometheus.Vector
GrpcServerVec *prometheus.Vector
}

func Init(confPath string) (monitor *Monitor, err error) {
l, err := logger.NewLogger(logger.DefaultConfig())

if err != nil {
return
}

c := &Config{}

if _, err := toml.DecodeFile(confPath, c); err != nil {
return
}

m, err := prometheus.New(c.Monitor, l)

if err == nil {
return
}

monitor = &Monitor{
Monitor: m,
ErrorVec: m.Vector(VectorErrorName),
HttpServerVec: m.Vector(VectorHttpServerRequestName),
GrpcClientVec: m.Vector(VectorGrpcClientRequestName),
GrpcServerVec: m.Vector(VectorGrpcServerRequestName),
}
return
}

调用

错误异常

  我们项目中的错误都是转化为自定义异常统一处理的,所以调用非常容易:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package exception

import (
"fmt"
"github.com/opay-o2o/golib/logger"
"github.com/opay-o2o/golib/strings2"
"oride/common"
"oride/common/monitor"
"strconv"
)

type Exception struct {
code int64
message string
}

func (e *Exception) GetCode() int64 {
return e.code
}

func (e *Exception) GetMessage() string {
return e.message
}

func (e *Exception) String() string {
return fmt.Sprintf("(%d) %s", e.code, e.message)
}

func Desc(code int64) string {
if e, ok := Desces[code]; ok {
return e
}

return "server internal error"
}

func New(code int64, args ...interface{}) *Exception {
critical := false

if len(args) > 0 {
if err, ok := args[0].(error); ok {
critical = true
common.Logger.Log(logger.ErrorLevel, "Error: %s | Args: %+v", err, args[1:])
}
}

if monitor.ErrorVec != nil {
monitor.ErrorVec.Trigger(0, strconv.FormatInt(code, 10), strings2.IIf(critical, "1", "0"))
}

return &Exception{code: code, message: Desc(code)}
}

HTTP 响应

  HTTP Server 基于 iris 改造,lib 已经提供了 Interceptor,直接 Use 就好,另外也需要添加 /metricsroutePrometheus 采集数据使用。

1
2
3
4
5
if monitor.HttpServerVec != nil {
app.Use(monitor.HttpServerVec.HttpInterceptor)
}

app.Get("/metrics", middleware.CheckWhiteIp, monitor.Monitor.Metrics())

GRPC 响应

  Grpc Server 的嵌入也很简单,只需要把 GrpcServerUnaryInterceptorGrpcServerStreamInterceptor 注册到 grpc.NewServeroptions 中;

1
2
3
4
5
options := []grpc.ServerOption{
grpc.UnaryInterceptor(monitor.GrpcServerVec.GrpcServerUnaryInterceptor),
grpc.StreamInterceptor(monitor.GrpcServerVec.GrpcServerStreamInterceptor),
}
server := grpc.NewServer(options...)

GRCP 调用

  Grpc 调用的嵌入和 Grpc Server 类似,只需要把 GrpcClientUnaryInterceptorGrpcClientStreamInterceptor 放到 Dial 函数的 options 中;

1
2
3
4
5
6
options := []grpc.DialOption{
grpc.WithInsecure(),
grpc.UnaryInterceptor(monitor.GrpcClientVec.GrpcClientUnaryInterceptor),
grpc.StreamInterceptor(monitor.GrpcClientVec.GrpcClientStreamInterceptor),
}
client := grpc.Dial(addr, options...)

效果

错误异常

  错误异常主要是统计关键错误和普通异常的次数,图表查看错误的速率,如果速率过高触发报警;

1
2
3
4
5
# critical errors per second
sum(rate(error_code_total{env="dev",critical="1"}[5m])) by (error_code)

# normal exceptions per second
sum(rate(error_code_total{env="dev",critical="0"}[5m])) by (error_code)

CleanShot15.32.26@2x

HTTP 响应

  HTTP 响应主要是统计响应次数和耗时,图表查看响应速度、QPS 和调用次数,如果响应速度过慢触发效率报警,如果 QPS 过低触发服务宕机报警;

1
2
3
4
5
6
7
8
# http speed
sum(rate(http_server_requests_seconds_sum{env="dev"}[5m])) by (path) / sum(rate(http_server_requests_seconds_count{env="dev""}[5m])) by (path)

# http QPS
sum(rate(http_server_requests_seconds_count{env="dev"}[5m])) by (path, code)

# http calls
sum(increase(http_server_requests_seconds_count{env="dev"}[$__range])) by (path, code)

CleanShot 2020-01-07 at 17.55.46@2x

GRPC 响应

  GRPC 响应主要是统计响应次数和耗时,图表查看响应速度、QPS 和调用次数,如果响应速度过慢触发效率报警,如果 QPS 过低触发服务宕机报警;

1
2
3
4
5
6
7
8
# grpc speed
sum(rate(grpc_server_requests_seconds_sum{env="dev"}[5m])) by (method) / sum(rate(grpc_server_requests_seconds_count{env="dev"}[5m])) by (method)

# grpc QPS
sum(rate(grpc_server_requests_seconds_count{env="dev"}[5m])) by (method)

# grpc calls
sum(increase(grpc_server_requests_seconds_count{env="dev"}[$__range])) by (method)

CleanShot 2020-01-07 at 17.57.21@2x

GRCP 调用

  GRPC 调用主要是统计调用次数和耗时,图表查看调用速度、QPS 和调用次数,如果调用速度过慢触发效率报警;

1
2
3
4
5
6
7
8
# grpc speed
sum(rate(grpc_call_duration_seconds_sum{env="dev"}[5m])) by (method) / sum(rate(grpc_call_duration_seconds_count{env="dev"}[5m])) by (method)

# grpc QPS
sum(rate(grpc_call_duration_seconds_count{env="prod"}[5m])) by (method)

# grpc calls
sum(increase(grpc_call_duration_seconds_count{env="dev"}[$__range])) by (method)

CleanShot 2020-01-07 at 17.57.49@2x

作者

Zivn

发布于

2020-01-07

更新于

2020-10-30

许可协议

评论