1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
| local json = loadMod("cjson")
local redis = loadMod("resty.redis")
local util = loadMod("core.util")
local exception = loadMod("core.exception")
local counter = loadMod("core.counter")
local dbConf = loadMod("config.redis")
--- 需要重试的错误对照表
local RETRY_ERRMSG_MAP = {
["broken pipe"] = true,
["timeout"] = true,
["closed"] = true,
}
--- Redis工具类
local Redis = {}
--- 获取连接
--
-- @param boolean reset 重置(强制重连)
-- @return resty.redis Redis连接
-- @return string 错误信息
local function getClient(reset)
local client = ngx.ctx[Redis]
if reset or not client then
if client then
client:close()
end
-- 新建连接
local errmsg
client, errmsg = redis:new()
if not client then
return nil, errmsg
end
-- 设置超时
client:set_timeout(dbConf.TIMEOUT)
-- 连接服务器
local ok, errmsg
local options = {}
if dbConf.SOCK then
ok, errmsg = client:connect("unix:" .. dbConf.SOCK, options)
else
ok, errmsg = client:connect(dbConf.HOST, dbConf.PORT, options)
end
if not ok then
return nil, errmsg
end
ngx.ctx[Redis] = client
end
return client
end
--- 关闭连接
local function closeClient()
local client = ngx.ctx[Redis]
if client then
client:set_keepalive(dbConf.TIMEOUT, dbConf.POOL_SIZE)
ngx.ctx[Redis] = nil
end
end
--- 转化null为nil
--
-- @param mixed value
-- @return mixed
local function nul2nil(value)
if value == ngx.null then
return nil
end
return value
end
--- 将任意值编码为格式字符串
--
-- @param mixed value
-- @return string
local function encode(value)
if util:isNumber(value) then
return value
else
json.encode_sparse_array(true)
return "*" .. json.encode(value)
end
end
--- 将格式字符串解码为值
--
-- @param string value
-- @return mixed
local function decode(value)
if nul2nil(value) == nil then
return nil
end
if util:isNumber(value) then
return value
else
local flag = value:sub(1, 1)
if flag == "*" then
return json.decode(value:sub(2))
end
return value
end
end
--- 执行命令
--
-- @param string cmd 命令
-- @param mixed ... 命令参数
-- @return mixed 命令结果
function Redis:execute(cmd, ...)
counter:set(counter.COUNTER_REDIS_COMMAND)
local client, results, errmsg
for i = 1, dbConf.RETRY_TIMES do
client, errmsg = getClient(i > 1)
if client then
if cmd == "select" or not client[cmd] then
exception:raise("core.badCall", { cmd = cmd, args = { ... } })
end
client:init_pipeline()
client:select(self.dbIndex)
client[cmd](client, ...)
results, errmsg = client:commit_pipeline()
end
if results or not RETRY_ERRMSG_MAP[errmsg] then
break
end
end
if not results or not util:isTable(results) or #results ~= 2 then
exception:raise("core.queryFailed", { args = { ... }, message = errmsg })
end
local selectRet, cmdRet = unpack(results)
if not selectRet or (util:isTable(selectRet) and not selectRet[1]) then
exception:raise("core.queryFailed", { cmd = "select", args = { self.dbIndex }, message = selectRet[2] })
end
if not cmdRet then
exception:raise("core.queryFailed", { cmd = cmd, args = { ... }, message = cmdRet[2] })
end
return cmdRet
end
--- 实例工厂
local Module = { instances = {} }
--- 获取查询对象实例
--
-- @param number dbIndex 数据库索引
-- @return table 查询对象
function Module:getInstance(dbIndex)
if not self.instances[dbIndex] then
self.instances[dbIndex] = util:inherit({ dbIndex = dbIndex }, Redis)
end
return self.instances[dbIndex]
end
--- 关闭连接
function Module:close()
closeClient()
end
return Module
|