解析
<p><br></p>
<h4>redis 解析规则</h4>
<p>可以参考 <a href="https://blog.csdn.net/shuningzhang/article/details/89445417">https://blog.csdn.net/shuningzhang/article/details/89445417</a></p>
<pre><code>1、对于固定的字符串,回复的第一个字节是“+”
比如 "OK", "PONG" 这种固定的
2、对于错误,回复的第一个字节是“-”
3、对于整数,回复的第一个字节是“:”
4、对于字符串,回复的第一个字节是“$”
5、对于数组,回复的第一个字节是“*”
</code></pre>
<p>每一行以 \r\n 结尾, 字符串一般是两行, 数组一般是多行</p>
<ul>
<li>
<p>字符串 ( $ 说明是字符串,6 说明下一行的字符数,然后下一行就放了真正的字符串)</p>
<pre><code>$6 \r\n
foobar \r\n </code></pre>
</li>
<li>字符串数组( * 说明是数组,2说明接下来有两个字符串,然后根据字符串的解析规则,分成 4 行)
<pre><code>*2 \r\n
$3 \r\n
foo \r\n
$2 \r\n
bb \r\n </code></pre></li>
</ul>
<p>特殊情况说明:</p>
<ul>
<li>
<p>空字符串</p>
<pre><code>$0 \r\n </code></pre>
</li>
<li>
<p>查不到字符串</p>
<pre><code>$-1 \r\n </code></pre>
</li>
<li>
<p>空数组(比如 hgetall 一个不存在的key)</p>
<pre><code>*0 \r\n </code></pre>
</li>
<li>null 数组 (这种情况比较特殊,一般来说只有BLPOP超时等情况才会返回 null 数组)
<pre><code>*-1 \r\n </code></pre></li>
</ul>
<p><br></p>
<h4>redisgo 原本的解析方式</h4>
<pre><code>
func readReply() (interface{}, error) {
// 1. 从缓冲区读取一行(就是上面说的一行)
line, err := c.readLine()
--------------------------------------------------------
// 2. 进行参数校验
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, protocolError("short response line")
}
--------------------------------------------------------
// 3. 如果是固定的字符串, 错误,数字,一般只有一行,处理起来很简单
switch line[0] {
case '+':
switch string(line[1:]) {
case "OK":
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case "PONG":
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
--------------------------------------------------------
// 4. 如果是字符串,按开始说的解析规则,要读两行
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
p := make([]byte, n)
_, err = io.ReadFull(c.br, p)
if err != nil {
return nil, err
}
if line, err := c.readLine(); err != nil {
return nil, err
} else if len(line) != 0 {
return nil, protocolError("bad bulk string format")
}
return p, nil
---------------------------------------------------------
// 5. 如果是数组,根据不同类型再进行一次调用,将结果塞进一个 interface 数组
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.readReply()
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
</code></pre>
<p><br></p>
<h4>我们自己拆分的解析方式</h4>
<p>很明显上面的是边读取边解析,但是我们需要先读取出字节,存入 cache, 再解析成最终的形式</p>
<p>步骤一:读取字节</p>
<pre><code>func (c *conn) readReply() ([]byte, error) {
// 1. 老规矩,读取一行(这里的 readLine 经过改写,返回的一行包括 \r\n)
line, err := c.readLine()
---------------------------------------------------------
// 2. 老规矩,参数校验
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, ErrShortLine
}
---------------------------------------------------------
// 3. 根据三种情况读取,然后不解析,返回一个整个字节数组
switch RESP(line[0]) {
case TypeSimpleString, TypeSimpleError, TypeNumber: // 如果是固定的字符串, 错误,数字
break
case TypeBlobString:
line, err = c.decodeBlobString(line) // 字符串
case TypeArray:
line, err = c.decodeArray(line) // 数组
default:
err = ErrUnknown
}
return line, err
}
</code></pre>
<p>步骤二,解析字节</p>
<pre><code>func (c *conn) decodeReply(b []byte) (interface{}, error) {
// 将字节数组用 \r\n 分隔
splits := bytes.Split(b, []byte("\r\n"))
if len(splits) < 2 {
return nil, errors.New("cache splits is err")
}
splits = splits[0 : len(splits)-1] // 去掉最后一个空数组
bHelp := cache.InitByteHelp(splits)
return c.decodeUseByteHelp(bHelp)
}
--------------------------------------------------
func (c *conn) decodeUseByteHelp(bHelp *cache.ByteHelp) (interface{}, error) {
// 这里基本和 redisgo 原本的步骤一样了
// 读出一行
line, err := bHelp.GetLine()
// 参数校验
if err != nil {
return nil, err
}
if len(line) == 0 {
return nil, protocolError("short response line")
}
// 如果是固定的字符串, 错误,数字
switch line[0] {
case '+':
switch string(line[1:]) {
case "OK":
// Avoid allocation for frequent "+OK" response.
return okReply, nil
case "PONG":
// Avoid allocation in PING command benchmarks :)
return pongReply, nil
default:
return string(line[1:]), nil
}
case '-':
return Error(string(line[1:])), nil
case ':':
return parseInt(line[1:])
// 字符串
case '$':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
// 下一行(可以拿到空 byte,代表存入的是空字符串)
return bHelp.GetLine()
// 数组解析
case '*':
n, err := parseLen(line[1:])
if n < 0 || err != nil {
return nil, err
}
r := make([]interface{}, n)
for i := range r {
r[i], err = c.decodeUseByteHelp(bHelp)
if err != nil {
return nil, err
}
}
return r, nil
}
return nil, protocolError("unexpected response line")
}
</code></pre>
<p><br></p>
<h4>PipeLine</h4>
<p>大体思路:
redigo的pipeline由Send(发送命令到缓冲区)、Flush(发送到redis服务器)、Receive(接收消息),由于使用了程序缓存,对于读的操作可以直接从bigCache获取结果,也就不需要经过redigo的pipeline
通过队列形式(先进先出)存储每次Send的命令,检查是否可以从bigCache取到数据,如果取到就保存到命令的同层位置,取不到就保存空数据。Receive时先从队列获取数据,如果取到空数据,走redigo接收字节的逻辑,否则直接使用队列里取到的数据</p>
<p>问题思考:
1.什么时候初始化队列,怎么保证队列在pipeline中的独立性?
在初始化连接池的时候为每一个连接都初始化一条队列,在Get之后的首次Send时cleanQueue当前队列的数据为空</p>