resolver-handler
<p>[TOC]</p>
<h3>简要描述</h3>
<p>resolver handler 实现具体的处理逻辑,主要完成 <strong>snapshot 排序及组装</strong>和 <strong>snapshot 处理及存储</strong>两方面的工作,具体的:
snapshot 排序及组装:</p>
<ul>
<li>snapshot 在 exporter 处被打包成 <code>data package</code> 的形式传输,<code>data package</code> 本质上是 snapshot 的列表,额外包含了 snapshot 的序列号信息。</li>
<li>序列号信息指的是当前 snapshot 列表在全局所有 snapshot 中所处的位置信息,即 lseq 和 rseq。</li>
<li>由于前置会发生的 http 和 rpc 乱序,依次 handler 的第一项工作就是,完成 snapshot 原始数据的排序,将多个 <code>data package</code> 的 snapshot,还原成按照 <code>timestamp</code> 排好序的 snapshot 流。</li>
<li>然后,再将 snapshot 流组装成 <code>init snapshot</code> 与 <code>final snapshot</code> 互相配对的 <code>snapshot pair</code>。</li>
<li>最后将 <code>snapshot pair</code> 流式地传入 snapshot 处理逻辑中。</li>
</ul>
<p>简单地说,排序与组装地详细流程为:</p>
<ul>
<li><code>data package</code> 内部按照 <code>timestamp</code> 对 snapshot 列表排序。</li>
<li><code>data package</code> 间采用滑动窗口的实现方式,基于序列号信息,对 <code>data package</code> 排序。这种方式只能处理失序地 snapshot 不跨越一个以上 <code>data package</code> 的情况。</li>
<li>将上述过程生成地 snapshot 流,再依次通过 <code>skiplist</code> 来维护,完成跨越多个 <code>data package</code> 情况下 snapshot 的排序,并且完成 <code>init snapshot</code> 与 <code>final snapshot</code> 的组装。</li>
<li>详细逻辑这里略过,见:<a href="https://github.com/BitTraceProject/BitTrace-Resolver/blob/main/common/resolve.go">BitTraceProject/BitTrace-Resolver/common/resolve.go</a></li>
</ul>
<p>snapshot 处理及存储:</p>
<ul>
<li>这方面的工作时比较简单的,因为数据本身在插桩时就是按照指定的数据结构输出并组织起来。snapshot 处理所需要做的工作只有,将 snapshot 还原出来,然后依次写入 collector。</li>
<li>需要注意的目前有两点:对于重复数据的处理,以及对于 <code>sync snapshot</code> 的处理。</li>
</ul>
<h3><code>ResolverHandler</code> 数据结构</h3>
<pre><code class="language-golang">type (
ResolverHandler interface {
// OnReceive 接收到 dp,实现各种重排序任务,然后将排序好的 snapshot 组装成 pair,写入 stream
OnReceive(dp protocol.ReceiverDataPackage)
// OnResolve 从 stream 读取 snapshot pair,进行处理工作,并写入 collector
OnResolve(snapshotPairList []*snapshotPair)
}
DefaultResolverHandler struct {
dataPackageMap *dataPackageMap
snapshotPairMap *snapshotPairMap
snapshotStream chan []*snapshotPair
collectorWriterServerAddr string
collectorWriterClient *rpc.Client
}
dataPackageMap struct {
firstLSeq int64
dataMap map[int64][]*structure.Snapshot // next data lseq = lseq+len(data)
eof bool
}
snapshotPair struct {
snapshotID string
// pair: init+final
initSnapshot *structure.Snapshot
finalSnapshot *structure.Snapshot
hasFinal bool
// sync
syncSnapshot *structure.Snapshot
}
snapshotPairMap struct {
pairSkipList *skiplist.SkipList // timestamp:id,通过 timestamp 维护 id 有序,支持循环读取 timestamp 最小的 id
pairMap map[string]*snapshotPair
}
)</code></pre>
<h3>方法接口列表</h3>
<ul>
<li><code>func (h *DefaultResolverHandler) OnReceive(dp protocol.ReceiverDataPackage)</code></li>
<li><code>func (h *DefaultResolverHandler) OnResolve(snapshotPairList []*snapshotPair)</code></li>
<li><code>func (h *DefaultResolverHandler) daemon()</code></li>
<li><code>func (m *dataPackageMap) PutDataPackage(dp protocol.ReceiverDataPackage) ([]*structure.Snapshot, bool)</code></li>
<li><code>func (m *dataPackageMap) sortRawSnapshotByTimestamp(data [][]byte) []*structure.Snapshot</code></li>
<li><code>func (m *dataPackageMap) sortSnapshotByTimestamp(snapshotList []*structure.Snapshot) []*structure.Snapshot</code></li>
<li><code>func (m *snapshotPairMap) PutSnapshot(snapshot *structure.Snapshot) []*snapshotPair</code></li>
<li><code>func (m *snapshotPairMap) RemoveSnapshotPair(id string) (*snapshotPair, bool)</code></li>
</ul>
<p>详细逻辑这里略过,见:<a href="https://github.com/BitTraceProject/BitTrace-Resolver/blob/main/common/resolve.go">BitTraceProject/BitTrace-Resolver/common/resolve.go</a></p>