MyBlog


开源监控平台-cat客户端原理解析

<p>[TOC]</p> <h1>简介</h1> <p>参考官网:<a href="https://github.com/dianping/cat">https://github.com/dianping/cat</a></p> <h1>架构图</h1> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/db09bc103fd7f95fb976eca3b00a58e6?showdoc=.jpg" alt="" /> (图片来自网络)</p> <h1>类图</h1> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/2358cfcd7221dc98a31b8f00329ccdfc?showdoc=.jpg" alt="" /> (图片来自网络)</p> <h1>原理解析</h1> <blockquote> <p>阅读思路:跟着简单的用例debug</p> </blockquote> <pre><code class="language-java"> //静态方法获取Transaction对象 //初始化 Transaction t = Cat.getProducer().newTransaction("logTransaction", "logTransaction"); Cat.logEvent("", catMonitor.name(), "0", JSON.toJSONString(joinPoint.getArgs())); //这里运行你的业务代码 your.business(); t.setStatus("0"); //上报 t.complete();</code></pre> <h2>初始化</h2> <pre><code class="language-java">//静态变量初始化 private static Cat s_instance = new Cat(); private static volatile boolean s_init = false; public static MessageProducer getProducer() { //初始化工作 checkAndInitialize(); return s_instance.m_producer; } private static void checkAndInitialize() { if (!s_init) { synchronized (s_instance) { if (!s_init) { //根据路径下的配置文件初始化 initialize(new File(getCatHome(), "client.xml")); log("WARN", "Cat is lazy initialized!"); s_init = true; } } } } public static void initialize(File configFile) { //这里用到了Plexus这个框架,类似于spring的一个IOC框架,加载/META-INF/plexus/plexus.xml配置文件,完成IOC容器的初始化。 PlexusContainer container = ContainerLoader.getDefaultContainer(); initialize(container, configFile); } public static void initialize(PlexusContainer container, File configFile) { //1.持有 plexus IOC 容器的引用; //2.持有 logger对象引用,用来打日志。 //3.持有 需要使用到的配置文件路径。 ModuleContext ctx = new DefaultModuleContext(container); //获取到具体的module实例 Module module = ctx.lookup(Module.class, CatClientModule.ID); if (!module.isInitialized()) { ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class); ctx.setAttribute("cat-client-config-file", configFile); //初始化工作 initializer.execute(ctx, module); } }</code></pre> <p>这里可能大家不太了解plexus,它相当于Spring的IoC容器,但是它和Spring框架不同,它并不是一个完整的,拥有各种组件的大型框架,仅仅是一个纯粹的IoC容器,它的开发者与Maven的开发者是同一拨人,最初开发Maven的时候,Spring并不成熟,所以Maven的开发者决定使用自己维护的IoC容器Plexus,它与Spring在语法和描述方式稍有不同。在Plexus中,有ROLE的概念,相当于Spring中的一个Bean。支持组件生命周期管理</p> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/29b38b9fb067e4a8019c114cbaede713?showdoc=.jpg" alt="" /> (图片来自网络)</p> <pre><code>cat的初始化工作完成后会创建四个守护线程: cat-StatusUpdateTask 用来每十秒钟上报客户端基本信息(心跳信息) cat-merge-atomic-task 消息合并检查,检查消息是否合并 cat-TcpSocketSender-ChannelManager(NIO 连接服务端检查,会每十秒去检查一下NIO的链接情况,并且返回可用的channel) cat-TcpSocketSender(消息发送服务线程)</code></pre> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/5b65a8fa9e31402f0bd1db5d6b94dbdd?showdoc=.jpg" alt="" /></p> <pre><code class="language-java">//完成初始化工作 protected void execute(final ModuleContext ctx) throws Exception { ctx.info("Current working directory is " + System.getProperty("user.dir")); // 初始化基本时间 MilliSecondTimer.initialize(); // 自定义线程池,并实现线程创建、销毁的监听 Threads.addListener(new CatThreadListener(ctx)); // 初始化 Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer()); // 初始化 TransportManager ctx.lookup(TransportManager.class); ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class); //初始化心跳上报线程 if (clientConfigManager.isCatEnabled()) { // start status update task StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class); Threads.forGroup("cat").start(statusUpdateTask); LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms // MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class); // Threads.forGroup("cat").start(mmapReaderTask); } } void setContainer(PlexusContainer container) { try { m_container = container; m_manager = container.lookup(MessageManager.class); m_producer = container.lookup(MessageProducer.class); } catch (ComponentLookupException e) { throw new RuntimeException("Unable to get instance of MessageManager, " + "please make sure the environment was setup correctly!", e); } }</code></pre> <pre><code class="language-java">public void initialize() throws InitializationException { //获取配置的服务器地址 List&lt;Server&gt; servers = m_configManager.getServers(); if (!m_configManager.isCatEnabled()) { m_tcpSocketSender = null; m_logger.warn("CAT was DISABLED due to not initialized yet!"); } else { List&lt;InetSocketAddress&gt; addresses = new ArrayList&lt;InetSocketAddress&gt;(); for (Server server : servers) { if (server.isEnabled()) { addresses.add(new InetSocketAddress(server.getIp(), server.getPort())); } } m_logger.info("Remote CAT servers: " + addresses); if (addresses.isEmpty()) { throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers); } else { m_tcpSocketSender.setServerAddresses(addresses); //初始化上报线程 m_tcpSocketSender.initialize(); } } }</code></pre> <pre><code class="language-java">private ChannelManager m_manager; private MessageQueue m_queue; private MessageQueue m_atomicTrees; public void initialize() { int len = getQueueSize(); m_queue = new DefaultMessageQueue(len); m_atomicTrees = new DefaultMessageQueue(len); //初始化channel管理线程 m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory); //初始化守护线程 Threads.forGroup("cat").start(this); Threads.forGroup("cat").start(m_manager); Threads.forGroup("cat").start(new MergeAtomicTask()); }</code></pre> <pre><code class="language-java">public ChannelManager(Logger logger, List&lt;InetSocketAddress&gt; serverAddresses, MessageQueue queue, ClientConfigManager configManager, MessageIdFactory idFactory) { m_logger = logger; m_queue = queue; m_configManager = configManager; m_idfactory = idFactory; //使用NIO进行TCP通信 EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() { @Override public Thread newThread(Runnable r) { Thread t = new Thread(r); t.setDaemon(true); return t; } }); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group).channel(NioSocketChannel.class); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); bootstrap.handler(new ChannelInitializer&lt;Channel&gt;() { @Override protected void initChannel(Channel ch) throws Exception { } }); m_bootstrap = bootstrap; //检测远端服务器是够正常 String serverConfig = loadServerConfig(); //初始化本地和远端的channel if (StringUtils.isNotEmpty(serverConfig)) { List&lt;InetSocketAddress&gt; configedAddresses = parseSocketAddress(serverConfig); ChannelHolder holder = initChannel(configedAddresses, serverConfig); if (holder != null) { m_activeChannelHolder = holder; } else { m_activeChannelHolder = new ChannelHolder(); m_activeChannelHolder.setServerAddresses(configedAddresses); } } else { //不能创建连接 ChannelHolder holder = initChannel(serverAddresses, null); if (holder != null) { m_activeChannelHolder = holder; } else { m_activeChannelHolder = new ChannelHolder(); m_activeChannelHolder.setServerAddresses(serverAddresses); m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml"); } } } //启动守护线程 public Thread start(Runnable runnable, boolean deamon) { Thread thread = m_factory.newThread(runnable); thread.setDaemon(deamon); thread.start(); return thread; }</code></pre> <h2>构建消息</h2> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/ad3fd814864b6c2cb8c2e9dd52b86875?showdoc=.jpg" alt="" /></p> <pre><code class="language-java">//消息的存储方式 private ThreadLocal&lt;Context&gt; m_context = new ThreadLocal&lt;Context&gt;(); public Transaction newTransaction(String type, String name) { // this enable CAT client logging cat message without explicit setup if (!m_manager.hasContext()) { //此处就是用ThreadLocal存储一个Context对象:ctx = new Context(m_domain.getId(), m_hostName, .getIp()); m_manager.setup(); } if (m_manager.isMessageEnabled()) { DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager); //向Context中填充构造的消息体:Context.m_tree;Context.m_stack;稍后看看Context这个对象 m_manager.start(transaction, false); return transaction; } else { return NullMessage.TRANSACTION; } } public void setup() { //初始化Context Context ctx; if (m_domain != null) { ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp()); } else { ctx = new Context("Unknown", m_hostName, ""); } m_context.set(ctx); } public void start(Transaction transaction, boolean forked) { Context ctx = getContext(); if (ctx != null) { //Context的start方法 ctx.start(transaction, forked); } else if (m_firstMessage) { m_firstMessage = false; m_logger.warn("CAT client is not enabled because it's not initialized yet"); } } public void start(Transaction transaction, boolean forked) { //如果栈不为空,将消息添加到Transaction的叶子节点 if (!m_stack.isEmpty()) { Transaction parent = m_stack.peek(); addTransactionChild(transaction, parent); } else { m_tree.setMessage(transaction); } if (!forked) { //将Transaction节点放入到栈中 m_stack.push(transaction); } } //创建event public void logEvent(String type, String name, String status, String nameValuePairs) { Event event = newEvent(type, name); if (nameValuePairs != null &amp;&amp; nameValuePairs.length() &gt; 0) { event.addData(nameValuePairs); } event.setStatus(status); //添加到context中 event.complete(); } public void complete() { setCompleted(true); if (m_manager != null) { m_manager.add(this); } } public void add(Message message) { //栈为空,说明没有transaction节点,直接flush if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); tree.setMessage(message); flush(tree); } else { //栈不为空,将此节点添加到transaction的子节点 Transaction parent = m_stack.peek(); addTransactionChild(message, parent); } } //Transaction类 public DefaultTransaction addChild(Message message) { if (m_children == null) { m_children = new ArrayList&lt;Message&gt;(); } if (message != null) { m_children.add(message); } else { Cat.logError(new Exception("null child message")); } return this; } public void flush(MessageTree tree) { if (tree.getMessageId() == null) { //生成唯一的一个messageID:ps-car-resource-local-ac1201e8-425698-2 /** *messageID分为四段: *第一段是应用名ps-car-resource-local *第二段是当前这台机器的IP的16进制格式,ac1201e8 *第三段的425698,是系统当前时间除以小时得到的整点数。 *第四段的2,是表示当前这个客户端在当前小时的顺序递增号(这里用了volatile和AtomicInteger实现) */ tree.setMessageId(nextMessageId()); } MessageSender sender = m_transportManager.getSender(); if (sender != null &amp;&amp; isMessageEnabled()) { //发送消息 sender.send(tree); //重置 reset(); } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } } public void reset() { // destroy current thread local data Context ctx = m_context.get(); if (ctx != null) { if (ctx.m_totalDurationInMicros == 0) { ctx.m_stack.clear(); ctx.m_knownExceptions.clear(); m_context.remove(); } else { ctx.m_knownExceptions.clear(); } } }</code></pre> <pre><code class="language-java">class Context { private MessageTree m_tree; private Stack&lt;Transaction&gt; m_stack; //初始化 public Context(String domain, String hostName, String ipAddress) { m_tree = new DefaultMessageTree(); m_stack = new Stack&lt;Transaction&gt;(); Thread thread = Thread.currentThread(); String groupName = thread.getThreadGroup().getName(); m_tree.setThreadGroupName(groupName); m_tree.setThreadId(String.valueOf(thread.getId())); m_tree.setThreadName(thread.getName()); m_tree.setDomain(domain); m_tree.setHostName(hostName); m_tree.setIpAddress(ipAddress); m_length = 1; m_knownExceptions = new HashSet&lt;Throwable&gt;(); } public void end(Transaction transaction) { Context ctx = getContext(); if (ctx != null &amp;&amp; transaction.isStandalone()) { //ctx.end 返回ftrue才会清空threadlocal if (ctx.end(this, transaction)) { m_context.remove(); } } } /** * //事物结束提交 * @param manager * @param transaction 当前事物 * @return */ public boolean end(DefaultMessageManager manager, Transaction transaction) { if (!m_stack.isEmpty()) { Transaction current = m_stack.pop(); if (transaction == current) { //校验子节点是否已经完成 m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current); } else { while (transaction != current &amp;&amp; !m_stack.empty()) { m_validator.validate(m_stack.peek(), current); current = m_stack.pop(); } } //如果栈为空走此段逻辑,说明当前的transaction是最顶层的transaction,构建消息数,然后flush if (m_stack.isEmpty()) { MessageTree tree = m_tree.copy(); m_tree.setMessageId(null); m_tree.setMessage(null); if (m_totalDurationInMicros &gt; 0) { adjustForTruncatedTransaction((Transaction) tree.getMessage()); } manager.flush(tree); return true; } } return false; } } public void flush(MessageTree tree) { if (tree.getMessageId() == null) { //设置消息ID tree.setMessageId(nextMessageId()); } MessageSender sender = m_transportManager.getSender(); if (sender != null &amp;&amp; isMessageEnabled()) { sender.send(tree); reset(); } else { m_throttleTimes++; if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) { m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes); } } }</code></pre> <p><img src="https://www.showdoc.cc/server/api/common/visitfile/sign/c330b3fc1216247e0c7cf68ff0aa5ad3?showdoc=.jpg" alt="" /></p> <h2>消息上报</h2> <p>消息完成后,将消息放入一个队列中,从而保证异步上报。</p> <p><code>transaction.complete();</code></p> <pre><code class="language-java">public void send(MessageTree tree) { //对消息做校验,isAtomicMessage的话放到m_atomicTrees,否则放到m_queue。logQueueFullInfo方法对队列溢出做处理 if (isAtomicMessage(tree)) { boolean result = m_atomicTrees.offer(tree, m_manager.getSample()); m_logger.info("消息添加到本地队列m_atomicTrees:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size()); if (!result) { logQueueFullInfo(tree); } } else { boolean result = m_queue.offer(tree, m_manager.getSample()); m_logger.info("消息添加到本地队列m_queue:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size()); if (!result) { logQueueFullInfo(tree); } } } private boolean isAtomicMessage(MessageTree tree) { Message message = tree.getMessage(); if (message instanceof Transaction) { String type = message.getType(); //Transaction类型的消息中,除Cache和SQL都是false。其他类型消息都是true if (type.startsWith("Cache.") || "SQL".equals(type)) { return true; } else { return false; } } else { return true; } } private void logQueueFullInfo(MessageTree tree) { //统计溢出的数量,打印日志,并直接将当前消息置为空 if (m_statistics != null) { m_statistics.onOverflowed(tree); } int count = m_errors.incrementAndGet(); if (count % 1000 == 0 || count == 1) { m_logger.error("Message queue is full in tcp socket sender! Count: " + count); } tree = null; } //守护线程run public void run() { m_active = true; try { while (m_active) { //从channelmanager拿到一个可用的channel ChannelFuture channel = m_manager.channel(); if (channel != null &amp;&amp; checkWritable(channel)) { try { //channel可用的话发送数据 MessageTree tree = m_queue.poll(); if (tree != null) { sendInternal(tree); tree.setMessage(null); } } catch (Throwable t) { m_logger.error("Error when sending message over TCP socket!", t); } } else { //统计当前小时外丢失的消息数量 long current = System.currentTimeMillis(); long oldTimestamp = current - HOUR; while (true) { try { MessageTree tree = m_queue.peek(); if (tree != null &amp;&amp; tree.getMessage().getTimestamp() &lt; oldTimestamp) { MessageTree discradTree = m_queue.poll(); if (discradTree != null) { m_statistics.onOverflowed(discradTree); } } else { break; } } catch (Exception e) { m_logger.error(e.getMessage(), e); break; } } TimeUnit.MILLISECONDS.sleep(5); } } } catch (InterruptedException e) { // ignore it m_active = false; } } private void sendInternal(MessageTree tree) { ChannelFuture future = m_manager.channel(); //每次开辟一个10K的内存去 ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K System.out.println(tree); m_codec.encode(tree, buf);//编码后发送 int size = buf.readableBytes(); Channel channel = future.channel(); channel.writeAndFlush(buf); if (m_statistics != null) { //统计发送的消息大小 m_statistics.onBytes(size); } }</code></pre> <p><strong>守护线程</strong> merge-atomic-task</p> <pre><code class="language-java"> @Override public void run() { while (true) { if (shouldMerge(m_atomicTrees)) { MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees队列中的消息merge为一条消息树 boolean result = m_queue.offer(tree);//放入m_queue队列,等待cat-TcpSocketSender线程正常消费 if (!result) { logQueueFullInfo(tree); } } else { try { Thread.sleep(5); } catch (InterruptedException e) { break; } } } } ..... private boolean shouldMerge(MessageQueue trees) { MessageTree tree = trees.peek();//获取对头元素,非移除 if (tree != null) { long firstTime = tree.getMessage().getTimestamp(); int maxDuration = 1000 * 30; //消息在30s内生成,或者队列挤压消息超过200,则需要merge if (System.currentTimeMillis() - firstTime &gt; maxDuration || trees.size() &gt;= MAX_CHILD_NUMBER) { return true; } } return false; } ...... private MessageTree mergeTree(MessageQueue trees) { int max = MAX_CHILD_NUMBER; DefaultTransaction tran = new DefaultTransaction("System", "MergeTree", null);//增加merge处理埋点 MessageTree first = trees.poll();//从队列头部移除 tran.setStatus(Transaction.SUCCESS); tran.setCompleted(true); tran.addChild(first.getMessage()); tran.setTimestamp(first.getMessage().getTimestamp()); long lastTimestamp = 0; long lastDuration = 0; //这段逻辑就是不停从这个m_atomicTrees队列头部拿去messsage,并使用同一个messageId,把队列中所有的消息合并为一条Transaction消息。 while (max &gt;= 0) { MessageTree tree = trees.poll();//接着 从队列头部移除 if (tree == null) { tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration); break; } lastTimestamp = tree.getMessage().getTimestamp(); if(tree.getMessage() instanceof DefaultTransaction){ lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis(); } else { lastDuration = 0; } tran.addChild(tree.getMessage()); m_factory.reuse(tree.getMessageId()); max--; } ((DefaultMessageTree) first).setMessage(tran); return first; } </code></pre> <p>TcpSocketSender-ChannelManager 后台线程</p> <p>这个线程是通过服务端配置的路由ip,10s轮询一次,当满足自旋n(n=m_count%30)次,去检查路由服务端ip是否变动,并保证连接正常。</p> <pre><code class="language-java">public void run() { while (m_active) { // make save message id index asyc m_idfactory.saveMark(); checkServerChanged();// 每100s检查连接信息(shouldCheckServerConfig),并进行连接,使用TCP协议建立长连接 ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();//根据服务端配置的路由,获取其中一个服务端ip并建立连接. try { } catch (Exception e) { e.printStackTrace(); } List&lt;InetSocketAddress&gt; serverAddresses = m_activeChannelHolder.getServerAddresses(); doubleCheckActiveServer(activeFuture);//检查当前连接是否正常 reconnectDefaultServer(activeFuture, serverAddresses);//如果不正常,则继续尝试建立其他连接。当所有default-server ip都无法连接时,默认会走backServer的Ip进行连接。 try { Thread.sleep(10 * 1000L); // check every 10 seconds } catch (InterruptedException e) { } } } .... private void checkServerChanged() { //检查是否需要改变连接 if (shouldCheckServerConfig(++m_count)) {//每遍历监听n(n=m_count%30)次或者没有成功的连接,则检查连接信息 Pair&lt;Boolean, String&gt; pair = routerConfigChanged(); if (pair.getKey()) { String servers = pair.getValue(); List&lt;InetSocketAddress&gt; serverAddresses = parseSocketAddress(servers); ChannelHolder newHolder = initChannel(serverAddresses, servers);//建立连接 if (newHolder != null) { if (newHolder.isConnectChanged()) { ChannelHolder last = m_activeChannelHolder; m_activeChannelHolder = newHolder; closeChannelHolder(last); m_logger.info("switch active channel to " + m_activeChannelHolder); } else { m_activeChannelHolder = newHolder; } } } } } private ChannelHolder initChannel(List&lt;InetSocketAddress&gt; addresses, String serverConfig) { try { int len = addresses.size(); for (int i = 0; i &lt; len; i++) {//遍历,连接成功返回 InetSocketAddress address = addresses.get(i); String hostAddress = address.getAddress().getHostAddress(); ChannelHolder holder = null; if (m_activeChannelHolder != null &amp;&amp; hostAddress.equals(m_activeChannelHolder.getIp())) {//当前的链接ip和address一致,那么就复用,否则新建立连接。(稍后关闭之前过期的连接。) holder = new ChannelHolder(); holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false); } else { ChannelFuture future = createChannel(address); if (future != null) { holder = new ChannelHolder(); holder.setActiveFuture(future).setConnectChanged(true);//true表示需要关闭之前的链接 } } if (holder != null) { holder.setActiveIndex(i).setIp(hostAddress); holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses); m_logger.info("success when init CAT server, new active holder" + holder.toString()); return holder; } } } catch (Exception e) { m_logger.error(e.getMessage(), e); } try { StringBuilder sb = new StringBuilder(); for (InetSocketAddress address : addresses) { sb.append(address.toString()).append(";"); } m_logger.info("Error when init CAT server " + sb.toString()); } catch (Exception e) { // ignore } return null; } private boolean shouldCheckServerConfig(int count) { int duration = 30; //m_activeChannelHolder.getActiveIndex() == -1表示关闭了当前连接 if (count % duration == 0 || m_activeChannelHolder.getActiveIndex() == -1) { return true; } else { return false; } } private Pair&lt;Boolean, String&gt; routerConfigChanged() { String current = loadServerConfig();//获取当前路由表中的服务地址信息。示例:ip1:2280;ip2:2280...; //current不为空 &amp;&amp; 路由表中的配置没有任何变化 if (!StringUtils.isEmpty(current) &amp;&amp; !current.equals(m_activeChannelHolder.getActiveServerConfig())) { return new Pair&lt;Boolean, String&gt;(true, current); } else { return new Pair&lt;Boolean, String&gt;(false, current); } } private String loadServerConfig() { try { //使用http请求获取路由表配置信息 //示例url:http://ip:port/cat/s/router?domain=someDomain&amp;ip=当前客户端ip&amp;op=json //返回的content :{"kvs":{"routers":"ip1:2280;ip2:2280;..;","sample":"1.0"}} String url = m_configManager.getServerConfigUrl(); InputStream inputstream = Urls.forIO().readTimeout(2000).connectTimeout(1000).openStream(url); String content = Files.forIO().readFrom(inputstream, "utf-8"); KVConfig routerConfig = (KVConfig) m_jsonBuilder.parse(content.trim(), KVConfig.class); String current = routerConfig.getValue("routers"); m_sample = Double.valueOf(routerConfig.getValue("sample").trim()); return current.trim(); } catch (Exception e) { // ignore } return null; }</code></pre> <p>StatusUpdateTask心跳上报线程</p> <p>这个线程比较简单,每分钟上报关于应用的各种信息。而且,在每次线程启动时上报一个Reboot消息表示重启动</p> <pre><code class="language-java">public void run() { // try to wait cat client init success try { Thread.sleep(10 * 1000); } catch (InterruptedException e) { return; } while (true) { Calendar cal = Calendar.getInstance(); int second = cal.get(Calendar.SECOND); // try to avoid send heartbeat at 59-01 second if (second &lt; 2 || second &gt; 58) { try { Thread.sleep(1000); } catch (InterruptedException e) { // ignore it } } else { break; } } try { buildClasspath(); } catch (Exception e) { e.printStackTrace(); } MessageProducer cat = Cat.getProducer(); Transaction reboot = cat.newTransaction("System", "Reboot"); reboot.setStatus(Message.SUCCESS); cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null); reboot.complete(); while (m_active) { long start = MilliSecondTimer.currentTimeMillis(); if (m_manager.isCatEnabled()) { Transaction t = cat.newTransaction("System", "Status"); Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress); StatusInfo status = new StatusInfo(); t.addData("dumpLocked", m_manager.isDumpLocked()); try { StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars); status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked())); buildExtensionData(status); h.addData(status.toString()); h.setStatus(Message.SUCCESS); } catch (Throwable e) { h.setStatus(e); cat.logError(e); } finally { h.complete(); } t.setStatus(Message.SUCCESS); t.complete(); } long elapsed = MilliSecondTimer.currentTimeMillis() - start; if (elapsed &lt; m_interval) { try { Thread.sleep(m_interval - elapsed); } catch (InterruptedException e) { break; } } } }</code></pre>

页面列表

ITEM_HTML