开源监控平台-cat客户端原理解析
<p>[TOC]</p>
<h1>简介</h1>
<p>参考官网:<a href="https://github.com/dianping/cat">https://github.com/dianping/cat</a></p>
<h1>架构图</h1>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/db09bc103fd7f95fb976eca3b00a58e6?showdoc=.jpg" alt="" />
(图片来自网络)</p>
<h1>类图</h1>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/2358cfcd7221dc98a31b8f00329ccdfc?showdoc=.jpg" alt="" />
(图片来自网络)</p>
<h1>原理解析</h1>
<blockquote>
<p>阅读思路:跟着简单的用例debug</p>
</blockquote>
<pre><code class="language-java"> //静态方法获取Transaction对象
//初始化
Transaction t = Cat.getProducer().newTransaction("logTransaction", "logTransaction");
Cat.logEvent("", catMonitor.name(), "0", JSON.toJSONString(joinPoint.getArgs()));
//这里运行你的业务代码
your.business();
t.setStatus("0");
//上报
t.complete();</code></pre>
<h2>初始化</h2>
<pre><code class="language-java">//静态变量初始化
private static Cat s_instance = new Cat();
private static volatile boolean s_init = false;
public static MessageProducer getProducer() {
//初始化工作
checkAndInitialize();
return s_instance.m_producer;
}
private static void checkAndInitialize() {
if (!s_init) {
synchronized (s_instance) {
if (!s_init) {
//根据路径下的配置文件初始化
initialize(new File(getCatHome(), "client.xml"));
log("WARN", "Cat is lazy initialized!");
s_init = true;
}
}
}
}
public static void initialize(File configFile) {
//这里用到了Plexus这个框架,类似于spring的一个IOC框架,加载/META-INF/plexus/plexus.xml配置文件,完成IOC容器的初始化。
PlexusContainer container = ContainerLoader.getDefaultContainer();
initialize(container, configFile);
}
public static void initialize(PlexusContainer container, File configFile) {
//1.持有 plexus IOC 容器的引用;
//2.持有 logger对象引用,用来打日志。
//3.持有 需要使用到的配置文件路径。
ModuleContext ctx = new DefaultModuleContext(container);
//获取到具体的module实例
Module module = ctx.lookup(Module.class, CatClientModule.ID);
if (!module.isInitialized()) {
ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
ctx.setAttribute("cat-client-config-file", configFile);
//初始化工作
initializer.execute(ctx, module);
}
}</code></pre>
<p>这里可能大家不太了解plexus,它相当于Spring的IoC容器,但是它和Spring框架不同,它并不是一个完整的,拥有各种组件的大型框架,仅仅是一个纯粹的IoC容器,它的开发者与Maven的开发者是同一拨人,最初开发Maven的时候,Spring并不成熟,所以Maven的开发者决定使用自己维护的IoC容器Plexus,它与Spring在语法和描述方式稍有不同。在Plexus中,有ROLE的概念,相当于Spring中的一个Bean。支持组件生命周期管理</p>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/29b38b9fb067e4a8019c114cbaede713?showdoc=.jpg" alt="" />
(图片来自网络)</p>
<pre><code>cat的初始化工作完成后会创建四个守护线程:
cat-StatusUpdateTask 用来每十秒钟上报客户端基本信息(心跳信息)
cat-merge-atomic-task 消息合并检查,检查消息是否合并
cat-TcpSocketSender-ChannelManager(NIO 连接服务端检查,会每十秒去检查一下NIO的链接情况,并且返回可用的channel)
cat-TcpSocketSender(消息发送服务线程)</code></pre>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/5b65a8fa9e31402f0bd1db5d6b94dbdd?showdoc=.jpg" alt="" /></p>
<pre><code class="language-java">//完成初始化工作
protected void execute(final ModuleContext ctx) throws Exception {
ctx.info("Current working directory is " + System.getProperty("user.dir"));
// 初始化基本时间
MilliSecondTimer.initialize();
// 自定义线程池,并实现线程创建、销毁的监听
Threads.addListener(new CatThreadListener(ctx));
// 初始化
Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());
// 初始化 TransportManager
ctx.lookup(TransportManager.class);
ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);
//初始化心跳上报线程
if (clientConfigManager.isCatEnabled()) {
// start status update task
StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);
Threads.forGroup("cat").start(statusUpdateTask);
LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms
// MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
// Threads.forGroup("cat").start(mmapReaderTask);
}
}
void setContainer(PlexusContainer container) {
try {
m_container = container;
m_manager = container.lookup(MessageManager.class);
m_producer = container.lookup(MessageProducer.class);
} catch (ComponentLookupException e) {
throw new RuntimeException("Unable to get instance of MessageManager, "
+ "please make sure the environment was setup correctly!", e);
}
}</code></pre>
<pre><code class="language-java">public void initialize() throws InitializationException {
//获取配置的服务器地址
List<Server> servers = m_configManager.getServers();
if (!m_configManager.isCatEnabled()) {
m_tcpSocketSender = null;
m_logger.warn("CAT was DISABLED due to not initialized yet!");
} else {
List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();
for (Server server : servers) {
if (server.isEnabled()) {
addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
}
}
m_logger.info("Remote CAT servers: " + addresses);
if (addresses.isEmpty()) {
throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers);
} else {
m_tcpSocketSender.setServerAddresses(addresses);
//初始化上报线程
m_tcpSocketSender.initialize();
}
}
}</code></pre>
<pre><code class="language-java">private ChannelManager m_manager;
private MessageQueue m_queue;
private MessageQueue m_atomicTrees;
public void initialize() {
int len = getQueueSize();
m_queue = new DefaultMessageQueue(len);
m_atomicTrees = new DefaultMessageQueue(len);
//初始化channel管理线程
m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);
//初始化守护线程
Threads.forGroup("cat").start(this);
Threads.forGroup("cat").start(m_manager);
Threads.forGroup("cat").start(new MergeAtomicTask());
}</code></pre>
<pre><code class="language-java">public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
ClientConfigManager configManager, MessageIdFactory idFactory) {
m_logger = logger;
m_queue = queue;
m_configManager = configManager;
m_idfactory = idFactory;
//使用NIO进行TCP通信
EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread t = new Thread(r);
t.setDaemon(true);
return t;
}
});
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group).channel(NioSocketChannel.class);
bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
bootstrap.handler(new ChannelInitializer<Channel>() {
@Override
protected void initChannel(Channel ch) throws Exception {
}
});
m_bootstrap = bootstrap;
//检测远端服务器是够正常
String serverConfig = loadServerConfig();
//初始化本地和远端的channel
if (StringUtils.isNotEmpty(serverConfig)) {
List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
ChannelHolder holder = initChannel(configedAddresses, serverConfig);
if (holder != null) {
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(configedAddresses);
}
} else {
//不能创建连接
ChannelHolder holder = initChannel(serverAddresses, null);
if (holder != null) {
m_activeChannelHolder = holder;
} else {
m_activeChannelHolder = new ChannelHolder();
m_activeChannelHolder.setServerAddresses(serverAddresses);
m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
}
}
}
//启动守护线程
public Thread start(Runnable runnable, boolean deamon) {
Thread thread = m_factory.newThread(runnable);
thread.setDaemon(deamon);
thread.start();
return thread;
}</code></pre>
<h2>构建消息</h2>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/ad3fd814864b6c2cb8c2e9dd52b86875?showdoc=.jpg" alt="" /></p>
<pre><code class="language-java">//消息的存储方式
private ThreadLocal<Context> m_context = new ThreadLocal<Context>();
public Transaction newTransaction(String type, String name) {
// this enable CAT client logging cat message without explicit setup
if (!m_manager.hasContext()) {
//此处就是用ThreadLocal存储一个Context对象:ctx = new Context(m_domain.getId(), m_hostName, .getIp());
m_manager.setup();
}
if (m_manager.isMessageEnabled()) {
DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);
//向Context中填充构造的消息体:Context.m_tree;Context.m_stack;稍后看看Context这个对象
m_manager.start(transaction, false);
return transaction;
} else {
return NullMessage.TRANSACTION;
}
}
public void setup() {
//初始化Context
Context ctx;
if (m_domain != null) {
ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
} else {
ctx = new Context("Unknown", m_hostName, "");
}
m_context.set(ctx);
}
public void start(Transaction transaction, boolean forked) {
Context ctx = getContext();
if (ctx != null) {
//Context的start方法
ctx.start(transaction, forked);
} else if (m_firstMessage) {
m_firstMessage = false;
m_logger.warn("CAT client is not enabled because it's not initialized yet");
}
}
public void start(Transaction transaction, boolean forked) {
//如果栈不为空,将消息添加到Transaction的叶子节点
if (!m_stack.isEmpty()) {
Transaction parent = m_stack.peek();
addTransactionChild(transaction, parent);
} else {
m_tree.setMessage(transaction);
}
if (!forked) {
//将Transaction节点放入到栈中
m_stack.push(transaction);
}
}
//创建event
public void logEvent(String type, String name, String status, String nameValuePairs) {
Event event = newEvent(type, name);
if (nameValuePairs != null && nameValuePairs.length() > 0) {
event.addData(nameValuePairs);
}
event.setStatus(status);
//添加到context中
event.complete();
}
public void complete() {
setCompleted(true);
if (m_manager != null) {
m_manager.add(this);
}
}
public void add(Message message) {
//栈为空,说明没有transaction节点,直接flush
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
tree.setMessage(message);
flush(tree);
} else {
//栈不为空,将此节点添加到transaction的子节点
Transaction parent = m_stack.peek();
addTransactionChild(message, parent);
}
}
//Transaction类
public DefaultTransaction addChild(Message message) {
if (m_children == null) {
m_children = new ArrayList<Message>();
}
if (message != null) {
m_children.add(message);
} else {
Cat.logError(new Exception("null child message"));
}
return this;
}
public void flush(MessageTree tree) {
if (tree.getMessageId() == null) {
//生成唯一的一个messageID:ps-car-resource-local-ac1201e8-425698-2
/**
*messageID分为四段:
*第一段是应用名ps-car-resource-local
*第二段是当前这台机器的IP的16进制格式,ac1201e8
*第三段的425698,是系统当前时间除以小时得到的整点数。
*第四段的2,是表示当前这个客户端在当前小时的顺序递增号(这里用了volatile和AtomicInteger实现)
*/
tree.setMessageId(nextMessageId());
}
MessageSender sender = m_transportManager.getSender();
if (sender != null && isMessageEnabled()) {
//发送消息
sender.send(tree);
//重置
reset();
} else {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}
public void reset() {
// destroy current thread local data
Context ctx = m_context.get();
if (ctx != null) {
if (ctx.m_totalDurationInMicros == 0) {
ctx.m_stack.clear();
ctx.m_knownExceptions.clear();
m_context.remove();
} else {
ctx.m_knownExceptions.clear();
}
}
}</code></pre>
<pre><code class="language-java">class Context {
private MessageTree m_tree;
private Stack<Transaction> m_stack;
//初始化
public Context(String domain, String hostName, String ipAddress) {
m_tree = new DefaultMessageTree();
m_stack = new Stack<Transaction>();
Thread thread = Thread.currentThread();
String groupName = thread.getThreadGroup().getName();
m_tree.setThreadGroupName(groupName);
m_tree.setThreadId(String.valueOf(thread.getId()));
m_tree.setThreadName(thread.getName());
m_tree.setDomain(domain);
m_tree.setHostName(hostName);
m_tree.setIpAddress(ipAddress);
m_length = 1;
m_knownExceptions = new HashSet<Throwable>();
}
public void end(Transaction transaction) {
Context ctx = getContext();
if (ctx != null && transaction.isStandalone()) {
//ctx.end 返回ftrue才会清空threadlocal
if (ctx.end(this, transaction)) {
m_context.remove();
}
}
}
/**
* //事物结束提交
* @param manager
* @param transaction 当前事物
* @return
*/
public boolean end(DefaultMessageManager manager, Transaction transaction) {
if (!m_stack.isEmpty()) {
Transaction current = m_stack.pop();
if (transaction == current) {
//校验子节点是否已经完成
m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
} else {
while (transaction != current && !m_stack.empty()) {
m_validator.validate(m_stack.peek(), current);
current = m_stack.pop();
}
}
//如果栈为空走此段逻辑,说明当前的transaction是最顶层的transaction,构建消息数,然后flush
if (m_stack.isEmpty()) {
MessageTree tree = m_tree.copy();
m_tree.setMessageId(null);
m_tree.setMessage(null);
if (m_totalDurationInMicros > 0) {
adjustForTruncatedTransaction((Transaction) tree.getMessage());
}
manager.flush(tree);
return true;
}
}
return false;
}
}
public void flush(MessageTree tree) {
if (tree.getMessageId() == null) {
//设置消息ID
tree.setMessageId(nextMessageId());
}
MessageSender sender = m_transportManager.getSender();
if (sender != null && isMessageEnabled()) {
sender.send(tree);
reset();
} else {
m_throttleTimes++;
if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
}
}
}</code></pre>
<p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/c330b3fc1216247e0c7cf68ff0aa5ad3?showdoc=.jpg" alt="" /></p>
<h2>消息上报</h2>
<p>消息完成后,将消息放入一个队列中,从而保证异步上报。</p>
<p><code>transaction.complete();</code></p>
<pre><code class="language-java">public void send(MessageTree tree) {
//对消息做校验,isAtomicMessage的话放到m_atomicTrees,否则放到m_queue。logQueueFullInfo方法对队列溢出做处理
if (isAtomicMessage(tree)) {
boolean result = m_atomicTrees.offer(tree, m_manager.getSample());
m_logger.info("消息添加到本地队列m_atomicTrees:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size());
if (!result) {
logQueueFullInfo(tree);
}
} else {
boolean result = m_queue.offer(tree, m_manager.getSample());
m_logger.info("消息添加到本地队列m_queue:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size());
if (!result) {
logQueueFullInfo(tree);
}
}
}
private boolean isAtomicMessage(MessageTree tree) {
Message message = tree.getMessage();
if (message instanceof Transaction) {
String type = message.getType();
//Transaction类型的消息中,除Cache和SQL都是false。其他类型消息都是true
if (type.startsWith("Cache.") || "SQL".equals(type)) {
return true;
} else {
return false;
}
} else {
return true;
}
}
private void logQueueFullInfo(MessageTree tree) {
//统计溢出的数量,打印日志,并直接将当前消息置为空
if (m_statistics != null) {
m_statistics.onOverflowed(tree);
}
int count = m_errors.incrementAndGet();
if (count % 1000 == 0 || count == 1) {
m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
}
tree = null;
}
//守护线程run
public void run() {
m_active = true;
try {
while (m_active) {
//从channelmanager拿到一个可用的channel
ChannelFuture channel = m_manager.channel();
if (channel != null && checkWritable(channel)) {
try {
//channel可用的话发送数据
MessageTree tree = m_queue.poll();
if (tree != null) {
sendInternal(tree);
tree.setMessage(null);
}
} catch (Throwable t) {
m_logger.error("Error when sending message over TCP socket!", t);
}
} else {
//统计当前小时外丢失的消息数量
long current = System.currentTimeMillis();
long oldTimestamp = current - HOUR;
while (true) {
try {
MessageTree tree = m_queue.peek();
if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
MessageTree discradTree = m_queue.poll();
if (discradTree != null) {
m_statistics.onOverflowed(discradTree);
}
} else {
break;
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
break;
}
}
TimeUnit.MILLISECONDS.sleep(5);
}
}
} catch (InterruptedException e) {
// ignore it
m_active = false;
}
}
private void sendInternal(MessageTree tree) {
ChannelFuture future = m_manager.channel();
//每次开辟一个10K的内存去
ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K
System.out.println(tree);
m_codec.encode(tree, buf);//编码后发送
int size = buf.readableBytes();
Channel channel = future.channel();
channel.writeAndFlush(buf);
if (m_statistics != null) {
//统计发送的消息大小
m_statistics.onBytes(size);
}
}</code></pre>
<p><strong>守护线程</strong>
merge-atomic-task</p>
<pre><code class="language-java"> @Override
public void run() {
while (true) {
if (shouldMerge(m_atomicTrees)) {
MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees队列中的消息merge为一条消息树
boolean result = m_queue.offer(tree);//放入m_queue队列,等待cat-TcpSocketSender线程正常消费
if (!result) {
logQueueFullInfo(tree);
}
} else {
try {
Thread.sleep(5);
} catch (InterruptedException e) {
break;
}
}
}
}
.....
private boolean shouldMerge(MessageQueue trees) {
MessageTree tree = trees.peek();//获取对头元素,非移除
if (tree != null) {
long firstTime = tree.getMessage().getTimestamp();
int maxDuration = 1000 * 30;
//消息在30s内生成,或者队列挤压消息超过200,则需要merge
if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
return true;
}
}
return false;
}
......
private MessageTree mergeTree(MessageQueue trees) {
int max = MAX_CHILD_NUMBER;
DefaultTransaction tran = new DefaultTransaction("System", "MergeTree", null);//增加merge处理埋点
MessageTree first = trees.poll();//从队列头部移除
tran.setStatus(Transaction.SUCCESS);
tran.setCompleted(true);
tran.addChild(first.getMessage());
tran.setTimestamp(first.getMessage().getTimestamp());
long lastTimestamp = 0;
long lastDuration = 0;
//这段逻辑就是不停从这个m_atomicTrees队列头部拿去messsage,并使用同一个messageId,把队列中所有的消息合并为一条Transaction消息。
while (max >= 0) {
MessageTree tree = trees.poll();//接着 从队列头部移除
if (tree == null) {
tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
break;
}
lastTimestamp = tree.getMessage().getTimestamp();
if(tree.getMessage() instanceof DefaultTransaction){
lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
} else {
lastDuration = 0;
}
tran.addChild(tree.getMessage());
m_factory.reuse(tree.getMessageId());
max--;
}
((DefaultMessageTree) first).setMessage(tran);
return first;
} </code></pre>
<p>TcpSocketSender-ChannelManager 后台线程</p>
<p>这个线程是通过服务端配置的路由ip,10s轮询一次,当满足自旋n(n=m_count%30)次,去检查路由服务端ip是否变动,并保证连接正常。</p>
<pre><code class="language-java">public void run() {
while (m_active) {
// make save message id index asyc
m_idfactory.saveMark();
checkServerChanged();// 每100s检查连接信息(shouldCheckServerConfig),并进行连接,使用TCP协议建立长连接
ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();//根据服务端配置的路由,获取其中一个服务端ip并建立连接.
try {
} catch (Exception e) {
e.printStackTrace();
}
List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();
doubleCheckActiveServer(activeFuture);//检查当前连接是否正常
reconnectDefaultServer(activeFuture, serverAddresses);//如果不正常,则继续尝试建立其他连接。当所有default-server ip都无法连接时,默认会走backServer的Ip进行连接。
try {
Thread.sleep(10 * 1000L); // check every 10 seconds
} catch (InterruptedException e) {
}
}
}
....
private void checkServerChanged() {
//检查是否需要改变连接
if (shouldCheckServerConfig(++m_count)) {//每遍历监听n(n=m_count%30)次或者没有成功的连接,则检查连接信息
Pair<Boolean, String> pair = routerConfigChanged();
if (pair.getKey()) {
String servers = pair.getValue();
List<InetSocketAddress> serverAddresses = parseSocketAddress(servers);
ChannelHolder newHolder = initChannel(serverAddresses, servers);//建立连接
if (newHolder != null) {
if (newHolder.isConnectChanged()) {
ChannelHolder last = m_activeChannelHolder;
m_activeChannelHolder = newHolder;
closeChannelHolder(last);
m_logger.info("switch active channel to " + m_activeChannelHolder);
} else {
m_activeChannelHolder = newHolder;
}
}
}
}
}
private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {
try {
int len = addresses.size();
for (int i = 0; i < len; i++) {//遍历,连接成功返回
InetSocketAddress address = addresses.get(i);
String hostAddress = address.getAddress().getHostAddress();
ChannelHolder holder = null;
if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {//当前的链接ip和address一致,那么就复用,否则新建立连接。(稍后关闭之前过期的连接。)
holder = new ChannelHolder();
holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);
} else {
ChannelFuture future = createChannel(address);
if (future != null) {
holder = new ChannelHolder();
holder.setActiveFuture(future).setConnectChanged(true);//true表示需要关闭之前的链接
}
}
if (holder != null) {
holder.setActiveIndex(i).setIp(hostAddress);
holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);
m_logger.info("success when init CAT server, new active holder" + holder.toString());
return holder;
}
}
} catch (Exception e) {
m_logger.error(e.getMessage(), e);
}
try {
StringBuilder sb = new StringBuilder();
for (InetSocketAddress address : addresses) {
sb.append(address.toString()).append(";");
}
m_logger.info("Error when init CAT server " + sb.toString());
} catch (Exception e) {
// ignore
}
return null;
}
private boolean shouldCheckServerConfig(int count) {
int duration = 30;
//m_activeChannelHolder.getActiveIndex() == -1表示关闭了当前连接
if (count % duration == 0 || m_activeChannelHolder.getActiveIndex() == -1) {
return true;
} else {
return false;
}
}
private Pair<Boolean, String> routerConfigChanged() {
String current = loadServerConfig();//获取当前路由表中的服务地址信息。示例:ip1:2280;ip2:2280...;
//current不为空 && 路由表中的配置没有任何变化
if (!StringUtils.isEmpty(current) && !current.equals(m_activeChannelHolder.getActiveServerConfig())) {
return new Pair<Boolean, String>(true, current);
} else {
return new Pair<Boolean, String>(false, current);
}
}
private String loadServerConfig() {
try {
//使用http请求获取路由表配置信息
//示例url:http://ip:port/cat/s/router?domain=someDomain&ip=当前客户端ip&op=json
//返回的content :{"kvs":{"routers":"ip1:2280;ip2:2280;..;","sample":"1.0"}}
String url = m_configManager.getServerConfigUrl();
InputStream inputstream = Urls.forIO().readTimeout(2000).connectTimeout(1000).openStream(url);
String content = Files.forIO().readFrom(inputstream, "utf-8");
KVConfig routerConfig = (KVConfig) m_jsonBuilder.parse(content.trim(), KVConfig.class);
String current = routerConfig.getValue("routers");
m_sample = Double.valueOf(routerConfig.getValue("sample").trim());
return current.trim();
} catch (Exception e) {
// ignore
}
return null;
}</code></pre>
<p>StatusUpdateTask心跳上报线程</p>
<p>这个线程比较简单,每分钟上报关于应用的各种信息。而且,在每次线程启动时上报一个Reboot消息表示重启动</p>
<pre><code class="language-java">public void run() {
// try to wait cat client init success
try {
Thread.sleep(10 * 1000);
} catch (InterruptedException e) {
return;
}
while (true) {
Calendar cal = Calendar.getInstance();
int second = cal.get(Calendar.SECOND);
// try to avoid send heartbeat at 59-01 second
if (second < 2 || second > 58) {
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// ignore it
}
} else {
break;
}
}
try {
buildClasspath();
} catch (Exception e) {
e.printStackTrace();
}
MessageProducer cat = Cat.getProducer();
Transaction reboot = cat.newTransaction("System", "Reboot");
reboot.setStatus(Message.SUCCESS);
cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null);
reboot.complete();
while (m_active) {
long start = MilliSecondTimer.currentTimeMillis();
if (m_manager.isCatEnabled()) {
Transaction t = cat.newTransaction("System", "Status");
Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
StatusInfo status = new StatusInfo();
t.addData("dumpLocked", m_manager.isDumpLocked());
try {
StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);
status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));
buildExtensionData(status);
h.addData(status.toString());
h.setStatus(Message.SUCCESS);
} catch (Throwable e) {
h.setStatus(e);
cat.logError(e);
} finally {
h.complete();
}
t.setStatus(Message.SUCCESS);
t.complete();
}
long elapsed = MilliSecondTimer.currentTimeMillis() - start;
if (elapsed < m_interval) {
try {
Thread.sleep(m_interval - elapsed);
} catch (InterruptedException e) {
break;
}
}
}
}</code></pre>