开源监控平台-cat客户端原理解析

简介

参考官网:https://github.com/dianping/cat

架构图


(图片来自网络)

类图


(图片来自网络)

原理解析

阅读思路:跟着简单的用例debug

        //静态方法获取Transaction对象
        //初始化
         Transaction t = Cat.getProducer().newTransaction("logTransaction", "logTransaction");
         Cat.logEvent("", catMonitor.name(), "0", JSON.toJSONString(joinPoint.getArgs()));
         //这里运行你的业务代码
         your.business();
         t.setStatus("0");
        //上报
        t.complete();

初始化

//静态变量初始化 
 private static Cat s_instance = new Cat();
 private static volatile boolean s_init = false;

public static MessageProducer getProducer() {
    //初始化工作
   checkAndInitialize();
   return s_instance.m_producer;
}

private static void checkAndInitialize() {
   if (!s_init) {
      synchronized (s_instance) {
         if (!s_init) {
            //根据路径下的配置文件初始化
            initialize(new File(getCatHome(), "client.xml"));
            log("WARN", "Cat is lazy initialized!");
            s_init = true;
         }
      }
   }
}

public static void initialize(File configFile) {
    //这里用到了Plexus这个框架,类似于spring的一个IOC框架,加载/META-INF/plexus/plexus.xml配置文件,完成IOC容器的初始化。
   PlexusContainer container = ContainerLoader.getDefaultContainer();
   initialize(container, configFile);
} 

public static void initialize(PlexusContainer container, File configFile) {
     //1.持有 plexus IOC 容器的引用;
     //2.持有 logger对象引用,用来打日志。
     //3.持有 需要使用到的配置文件路径。
   ModuleContext ctx = new DefaultModuleContext(container);
   //获取到具体的module实例 
   Module module = ctx.lookup(Module.class, CatClientModule.ID);

   if (!module.isInitialized()) {
      ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);

      ctx.setAttribute("cat-client-config-file", configFile);
     //初始化工作
      initializer.execute(ctx, module);
   }
}

这里可能大家不太了解plexus,它相当于Spring的IoC容器,但是它和Spring框架不同,它并不是一个完整的,拥有各种组件的大型框架,仅仅是一个纯粹的IoC容器,它的开发者与Maven的开发者是同一拨人,最初开发Maven的时候,Spring并不成熟,所以Maven的开发者决定使用自己维护的IoC容器Plexus,它与Spring在语法和描述方式稍有不同。在Plexus中,有ROLE的概念,相当于Spring中的一个Bean。支持组件生命周期管理


(图片来自网络)

cat的初始化工作完成后会创建四个守护线程:

cat-StatusUpdateTask  用来每十秒钟上报客户端基本信息(心跳信息)

cat-merge-atomic-task   消息合并检查,检查消息是否合并

cat-TcpSocketSender-ChannelManager(NIO 连接服务端检查,会每十秒去检查一下NIO的链接情况,并且返回可用的channel)

cat-TcpSocketSender(消息发送服务线程)

//完成初始化工作
protected void execute(final ModuleContext ctx) throws Exception {
   ctx.info("Current working directory is " + System.getProperty("user.dir"));

   // 初始化基本时间
   MilliSecondTimer.initialize();

   // 自定义线程池,并实现线程创建、销毁的监听
   Threads.addListener(new CatThreadListener(ctx));

   // 初始化
   Cat.getInstance().setContainer(((DefaultModuleContext) ctx).getContainer());

   // 初始化 TransportManager
   ctx.lookup(TransportManager.class);

   ClientConfigManager clientConfigManager = ctx.lookup(ClientConfigManager.class);

   //初始化心跳上报线程
   if (clientConfigManager.isCatEnabled()) {
      // start status update task
      StatusUpdateTask statusUpdateTask = ctx.lookup(StatusUpdateTask.class);

      Threads.forGroup("cat").start(statusUpdateTask);
      LockSupport.parkNanos(10 * 1000 * 1000L); // wait 10 ms

      // MmapConsumerTask mmapReaderTask = ctx.lookup(MmapConsumerTask.class);
      // Threads.forGroup("cat").start(mmapReaderTask);
   }
}


void setContainer(PlexusContainer container) {
   try {
      m_container = container;
      m_manager = container.lookup(MessageManager.class);
      m_producer = container.lookup(MessageProducer.class);
   } catch (ComponentLookupException e) {
      throw new RuntimeException("Unable to get instance of MessageManager, "
            + "please make sure the environment was setup correctly!", e);
   }
}
public void initialize() throws InitializationException {
    //获取配置的服务器地址
   List<Server> servers = m_configManager.getServers();

   if (!m_configManager.isCatEnabled()) {
      m_tcpSocketSender = null;
      m_logger.warn("CAT was DISABLED due to not initialized yet!");
   } else {
      List<InetSocketAddress> addresses = new ArrayList<InetSocketAddress>();

      for (Server server : servers) {
         if (server.isEnabled()) {
            addresses.add(new InetSocketAddress(server.getIp(), server.getPort()));
         }
      }

      m_logger.info("Remote CAT servers: " + addresses);

      if (addresses.isEmpty()) {
         throw new RuntimeException("All servers in configuration are disabled!\r\n" + servers);
      } else {
         m_tcpSocketSender.setServerAddresses(addresses);
        //初始化上报线程
         m_tcpSocketSender.initialize();
      }
   }
}
private ChannelManager m_manager;
private MessageQueue m_queue;
private MessageQueue m_atomicTrees;

public void initialize() {
   int len = getQueueSize();

   m_queue = new DefaultMessageQueue(len);
   m_atomicTrees = new DefaultMessageQueue(len);
    //初始化channel管理线程
   m_manager = new ChannelManager(m_logger, m_serverAddresses, m_queue, m_configManager, m_factory);

    //初始化守护线程
   Threads.forGroup("cat").start(this);
   Threads.forGroup("cat").start(m_manager);
   Threads.forGroup("cat").start(new MergeAtomicTask());
}
public ChannelManager(Logger logger, List<InetSocketAddress> serverAddresses, MessageQueue queue,
      ClientConfigManager configManager, MessageIdFactory idFactory) {
   m_logger = logger;
   m_queue = queue;
   m_configManager = configManager;
   m_idfactory = idFactory;

    //使用NIO进行TCP通信
   EventLoopGroup group = new NioEventLoopGroup(1, new ThreadFactory() {
      @Override
      public Thread newThread(Runnable r) {
         Thread t = new Thread(r);
         t.setDaemon(true);
         return t;
      }
   });

   Bootstrap bootstrap = new Bootstrap();
   bootstrap.group(group).channel(NioSocketChannel.class);
   bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
   bootstrap.handler(new ChannelInitializer<Channel>() {
      @Override
      protected void initChannel(Channel ch) throws Exception {
      }
   });
   m_bootstrap = bootstrap;

     //检测远端服务器是够正常
   String serverConfig = loadServerConfig();

    //初始化本地和远端的channel
   if (StringUtils.isNotEmpty(serverConfig)) {
      List<InetSocketAddress> configedAddresses = parseSocketAddress(serverConfig);
      ChannelHolder holder = initChannel(configedAddresses, serverConfig);

      if (holder != null) {
         m_activeChannelHolder = holder;
      } else {
         m_activeChannelHolder = new ChannelHolder();
         m_activeChannelHolder.setServerAddresses(configedAddresses);
      }
   } else {
       //不能创建连接
      ChannelHolder holder = initChannel(serverAddresses, null);

      if (holder != null) {
         m_activeChannelHolder = holder;
      } else {
         m_activeChannelHolder = new ChannelHolder();
         m_activeChannelHolder.setServerAddresses(serverAddresses);
         m_logger.error("error when init cat module due to error config xml in /data/appdatas/cat/client.xml");
      }
   }
}

//启动守护线程
public Thread start(Runnable runnable, boolean deamon) {
   Thread thread = m_factory.newThread(runnable);
   thread.setDaemon(deamon);
   thread.start();
   return thread;
}

构建消息

//消息的存储方式
private ThreadLocal<Context> m_context = new ThreadLocal<Context>(); 

public Transaction newTransaction(String type, String name) {
        // this enable CAT client logging cat message without explicit setup
        if (!m_manager.hasContext()) {          
            //此处就是用ThreadLocal存储一个Context对象:ctx = new Context(m_domain.getId(), m_hostName,           .getIp());
            m_manager.setup();

        }

        if (m_manager.isMessageEnabled()) {
            DefaultTransaction transaction = new DefaultTransaction(type, name, m_manager);

         //向Context中填充构造的消息体:Context.m_tree;Context.m_stack;稍后看看Context这个对象
            m_manager.start(transaction, false);
            return transaction;
        } else {
            return NullMessage.TRANSACTION;
        }
    }

 public void setup() {
  //初始化Context 
   Context ctx;
   if (m_domain != null) {
      ctx = new Context(m_domain.getId(), m_hostName, m_domain.getIp());
   } else {
      ctx = new Context("Unknown", m_hostName, "");
   }    
   m_context.set(ctx);
}
public void start(Transaction transaction, boolean forked) {
        Context ctx = getContext();
        if (ctx != null) {
            //Context的start方法
            ctx.start(transaction, forked);
        } else if (m_firstMessage) {
            m_firstMessage = false;
            m_logger.warn("CAT client is not enabled because it's not initialized yet");
        }
    }


            public void start(Transaction transaction, boolean forked) {
            //如果栈不为空,将消息添加到Transaction的叶子节点
            if (!m_stack.isEmpty()) {
                Transaction parent = m_stack.peek();
                addTransactionChild(transaction, parent);
            } else {
                m_tree.setMessage(transaction);
            }
            if (!forked) {
                //将Transaction节点放入到栈中
                m_stack.push(transaction);
            }
        }


//创建event
 public void logEvent(String type, String name, String status, String nameValuePairs) {
   Event event = newEvent(type, name);

   if (nameValuePairs != null && nameValuePairs.length() > 0) {
      event.addData(nameValuePairs);
   }

   event.setStatus(status);
   //添加到context中    
   event.complete();
}
public void complete() {
   setCompleted(true);

   if (m_manager != null) {
      m_manager.add(this);
   }
}

public void add(Message message) {
            //栈为空,说明没有transaction节点,直接flush
            if (m_stack.isEmpty()) {
                MessageTree tree = m_tree.copy();
                tree.setMessage(message);
                flush(tree);
            } else {
                //栈不为空,将此节点添加到transaction的子节点
                Transaction parent = m_stack.peek();
                addTransactionChild(message, parent);
            }
        }

//Transaction类
public DefaultTransaction addChild(Message message) {
        if (m_children == null) {
            m_children = new ArrayList<Message>();
        }
        if (message != null) {
            m_children.add(message);
        } else {
            Cat.logError(new Exception("null child message"));
        }
        return this;
    }

public void flush(MessageTree tree) {
        if (tree.getMessageId() == null) {
            //生成唯一的一个messageID:ps-car-resource-local-ac1201e8-425698-2   
            /**
            *messageID分为四段:
            *第一段是应用名ps-car-resource-local
            *第二段是当前这台机器的IP的16进制格式,ac1201e8
            *第三段的425698,是系统当前时间除以小时得到的整点数。
            *第四段的2,是表示当前这个客户端在当前小时的顺序递增号(这里用了volatile和AtomicInteger实现)
            */
            tree.setMessageId(nextMessageId());
        }


        MessageSender sender = m_transportManager.getSender();

        if (sender != null && isMessageEnabled()) {

            //发送消息
            sender.send(tree);
            //重置
            reset();
        } else {
            m_throttleTimes++;

            if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
                m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
            }
        }
    }


    public void reset() {
        // destroy current thread local data
        Context ctx = m_context.get();

        if (ctx != null) {
            if (ctx.m_totalDurationInMicros == 0) {
                ctx.m_stack.clear();
                ctx.m_knownExceptions.clear();
                m_context.remove();
            } else {
                ctx.m_knownExceptions.clear();
            }
        }
    }
class Context {
   private MessageTree m_tree;
   private Stack<Transaction> m_stack;
    //初始化
   public Context(String domain, String hostName, String ipAddress) {
      m_tree = new DefaultMessageTree();
      m_stack = new Stack<Transaction>();
      Thread thread = Thread.currentThread();
      String groupName = thread.getThreadGroup().getName();
      m_tree.setThreadGroupName(groupName);
      m_tree.setThreadId(String.valueOf(thread.getId()));
      m_tree.setThreadName(thread.getName());
      m_tree.setDomain(domain);
      m_tree.setHostName(hostName);
      m_tree.setIpAddress(ipAddress);
      m_length = 1;
      m_knownExceptions = new HashSet<Throwable>();
   }
   public void end(Transaction transaction) {
        Context ctx = getContext();
        if (ctx != null && transaction.isStandalone()) {
            //ctx.end 返回ftrue才会清空threadlocal
            if (ctx.end(this, transaction)) {
                m_context.remove();
            }
        }
    }

        /**
         * //事物结束提交
         * @param manager
         * @param transaction 当前事物
         * @return 
         */
   public boolean end(DefaultMessageManager manager, Transaction transaction) {
      if (!m_stack.isEmpty()) {

         Transaction current = m_stack.pop();
         if (transaction == current) {
             //校验子节点是否已经完成
            m_validator.validate(m_stack.isEmpty() ? null : m_stack.peek(), current);
         } else {
            while (transaction != current && !m_stack.empty()) {
               m_validator.validate(m_stack.peek(), current);
               current = m_stack.pop();
            }
         }
         //如果栈为空走此段逻辑,说明当前的transaction是最顶层的transaction,构建消息数,然后flush
         if (m_stack.isEmpty()) {
            MessageTree tree = m_tree.copy();
            m_tree.setMessageId(null);
            m_tree.setMessage(null);
            if (m_totalDurationInMicros > 0) {
               adjustForTruncatedTransaction((Transaction) tree.getMessage());
            }
            manager.flush(tree);
            return true;
         }
      }
      return false;
   }

}

public void flush(MessageTree tree) {
   if (tree.getMessageId() == null) {
    //设置消息ID
      tree.setMessageId(nextMessageId());
   }

   MessageSender sender = m_transportManager.getSender();

   if (sender != null && isMessageEnabled()) {
      sender.send(tree);
      reset();
   } else {
      m_throttleTimes++;

      if (m_throttleTimes % 10000 == 0 || m_throttleTimes == 1) {
         m_logger.info("Cat Message is throttled! Times:" + m_throttleTimes);
      }
   }
}

消息上报

消息完成后,将消息放入一个队列中,从而保证异步上报。

transaction.complete();

public void send(MessageTree tree) {
        //对消息做校验,isAtomicMessage的话放到m_atomicTrees,否则放到m_queue。logQueueFullInfo方法对队列溢出做处理
        if (isAtomicMessage(tree)) {
            boolean result = m_atomicTrees.offer(tree, m_manager.getSample());
            m_logger.info("消息添加到本地队列m_atomicTrees:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size());

            if (!result) {
                logQueueFullInfo(tree);
            }
        } else {
            boolean result = m_queue.offer(tree, m_manager.getSample());
            m_logger.info("消息添加到本地队列m_queue:" + tree.getMessage() + "队列大小:{}" + m_atomicTrees.size());
            if (!result) {
                logQueueFullInfo(tree);
            }
        }
    }
private boolean isAtomicMessage(MessageTree tree) {   
        Message message = tree.getMessage();
        if (message instanceof Transaction) {
            String type = message.getType();
            //Transaction类型的消息中,除Cache和SQL都是false。其他类型消息都是true
            if (type.startsWith("Cache.") || "SQL".equals(type)) {
                return true;
            } else {
                return false;
            }
        } else {
            return true;
        }
    }

 private void logQueueFullInfo(MessageTree tree) {
  //统计溢出的数量,打印日志,并直接将当前消息置为空
 if (m_statistics != null) {
      m_statistics.onOverflowed(tree);
   }

   int count = m_errors.incrementAndGet();

   if (count % 1000 == 0 || count == 1) {
      m_logger.error("Message queue is full in tcp socket sender! Count: " + count);
   }

   tree = null;
}

    //守护线程run
 public void run() {

        m_active = true;
        try {
            while (m_active) {
                //从channelmanager拿到一个可用的channel
                ChannelFuture channel = m_manager.channel();
                if (channel != null && checkWritable(channel)) {
                    try {
                        //channel可用的话发送数据
                        MessageTree tree = m_queue.poll();
                        if (tree != null) {
                            sendInternal(tree);
                            tree.setMessage(null);
                        }
                    } catch (Throwable t) {
                        m_logger.error("Error when sending message over TCP socket!", t);
                    }
                } else {
                    //统计当前小时外丢失的消息数量
                    long current = System.currentTimeMillis();
                    long oldTimestamp = current - HOUR;
                    while (true) {
                        try {
                            MessageTree tree = m_queue.peek();
                            if (tree != null && tree.getMessage().getTimestamp() < oldTimestamp) {
                                MessageTree discradTree = m_queue.poll();
                                if (discradTree != null) {
                                    m_statistics.onOverflowed(discradTree);
                                }
                            } else {
                                break;
                            }
                        } catch (Exception e) {
                            m_logger.error(e.getMessage(), e);
                            break;
                        }
                    }
                    TimeUnit.MILLISECONDS.sleep(5);
                }
            }
        } catch (InterruptedException e) {
            // ignore it
            m_active = false;
        }
    }

  private void sendInternal(MessageTree tree) {
        ChannelFuture future = m_manager.channel();
        //每次开辟一个10K的内存去
         ByteBuf buf = PooledByteBufAllocator.DEFAULT.buffer(10 * 1024); // 10K

        System.out.println(tree);

        m_codec.encode(tree, buf);//编码后发送

        int size = buf.readableBytes();
        Channel channel = future.channel();

        channel.writeAndFlush(buf);
        if (m_statistics != null) {
            //统计发送的消息大小 
         m_statistics.onBytes(size);
        }
    }

守护线程
merge-atomic-task

 @Override
        public void run() {
            while (true) {
                if (shouldMerge(m_atomicTrees)) {
                    MessageTree tree = mergeTree(m_atomicTrees);//把m_atomicTrees队列中的消息merge为一条消息树
                    boolean result = m_queue.offer(tree);//放入m_queue队列,等待cat-TcpSocketSender线程正常消费

                    if (!result) {
                        logQueueFullInfo(tree);
                    }
                } else {
                    try {
                        Thread.sleep(5);
                    } catch (InterruptedException e) {
                        break;
                    }
                }
            }
        }

.....

private boolean shouldMerge(MessageQueue trees) {
        MessageTree tree = trees.peek();//获取对头元素,非移除

        if (tree != null) {
            long firstTime = tree.getMessage().getTimestamp();
            int maxDuration = 1000 * 30;
            //消息在30s内生成,或者队列挤压消息超过200,则需要merge
            if (System.currentTimeMillis() - firstTime > maxDuration || trees.size() >= MAX_CHILD_NUMBER) {
                return true;
            }
        }
        return false;
    }

......


 private MessageTree mergeTree(MessageQueue trees) {
        int max = MAX_CHILD_NUMBER;
        DefaultTransaction tran = new DefaultTransaction("System", "MergeTree", null);//增加merge处理埋点
        MessageTree first = trees.poll();//从队列头部移除

        tran.setStatus(Transaction.SUCCESS);
        tran.setCompleted(true);
        tran.addChild(first.getMessage());
        tran.setTimestamp(first.getMessage().getTimestamp());
        long lastTimestamp = 0;
        long lastDuration = 0;

        //这段逻辑就是不停从这个m_atomicTrees队列头部拿去messsage,并使用同一个messageId,把队列中所有的消息合并为一条Transaction消息。
        while (max >= 0) {
            MessageTree tree = trees.poll();//接着 从队列头部移除

            if (tree == null) {
                tran.setDurationInMillis(lastTimestamp - tran.getTimestamp() + lastDuration);
                break;
            }
            lastTimestamp = tree.getMessage().getTimestamp();
            if(tree.getMessage() instanceof DefaultTransaction){
                lastDuration = ((DefaultTransaction) tree.getMessage()).getDurationInMillis();
            } else {
                lastDuration = 0;
            }
            tran.addChild(tree.getMessage());
            m_factory.reuse(tree.getMessageId());
            max--;
        }
        ((DefaultMessageTree) first).setMessage(tran);
        return first;
    }

TcpSocketSender-ChannelManager 后台线程

这个线程是通过服务端配置的路由ip,10s轮询一次,当满足自旋n(n=m_count%30)次,去检查路由服务端ip是否变动,并保证连接正常。

public void run() {
        while (m_active) {
            // make save message id index asyc
            m_idfactory.saveMark();
            checkServerChanged();// 每100s检查连接信息(shouldCheckServerConfig),并进行连接,使用TCP协议建立长连接

            ChannelFuture activeFuture = m_activeChannelHolder.getActiveFuture();//根据服务端配置的路由,获取其中一个服务端ip并建立连接.
            try {
            } catch (Exception e) {
                e.printStackTrace();
            }
            List<InetSocketAddress> serverAddresses = m_activeChannelHolder.getServerAddresses();

            doubleCheckActiveServer(activeFuture);//检查当前连接是否正常
            reconnectDefaultServer(activeFuture, serverAddresses);//如果不正常,则继续尝试建立其他连接。当所有default-server ip都无法连接时,默认会走backServer的Ip进行连接。

             try {
            Thread.sleep(10 * 1000L); // check every 10 seconds
             } catch (InterruptedException e) {
             }
        }
    }
....

    private void checkServerChanged() {
      //检查是否需要改变连接
       if (shouldCheckServerConfig(++m_count)) {//每遍历监听n(n=m_count%30)次或者没有成功的连接,则检查连接信息
            Pair<Boolean, String> pair = routerConfigChanged();

            if (pair.getKey()) {
                String servers = pair.getValue();
                List<InetSocketAddress> serverAddresses = parseSocketAddress(servers);
                ChannelHolder newHolder = initChannel(serverAddresses, servers);//建立连接

                if (newHolder != null) {
                    if (newHolder.isConnectChanged()) {
                        ChannelHolder last = m_activeChannelHolder;

                        m_activeChannelHolder = newHolder;
                        closeChannelHolder(last);
                        m_logger.info("switch active channel to " + m_activeChannelHolder);
                    } else {
                        m_activeChannelHolder = newHolder;
                    }
                }
            }
        }
    }

private ChannelHolder initChannel(List<InetSocketAddress> addresses, String serverConfig) {
        try {
            int len = addresses.size();

            for (int i = 0; i < len; i++) {//遍历,连接成功返回
                InetSocketAddress address = addresses.get(i);
                String hostAddress = address.getAddress().getHostAddress();
                ChannelHolder holder = null;

                if (m_activeChannelHolder != null && hostAddress.equals(m_activeChannelHolder.getIp())) {//当前的链接ip和address一致,那么就复用,否则新建立连接。(稍后关闭之前过期的连接。)
                    holder = new ChannelHolder();
                    holder.setActiveFuture(m_activeChannelHolder.getActiveFuture()).setConnectChanged(false);
                } else {
                    ChannelFuture future = createChannel(address);

                    if (future != null) {
                        holder = new ChannelHolder();
                        holder.setActiveFuture(future).setConnectChanged(true);//true表示需要关闭之前的链接
                    }
                }
                if (holder != null) {
                    holder.setActiveIndex(i).setIp(hostAddress);
                    holder.setActiveServerConfig(serverConfig).setServerAddresses(addresses);

                    m_logger.info("success when init CAT server, new active holder" + holder.toString());
                    return holder;
                }
            }
        } catch (Exception e) {
            m_logger.error(e.getMessage(), e);
        }

        try {
            StringBuilder sb = new StringBuilder();

            for (InetSocketAddress address : addresses) {
                sb.append(address.toString()).append(";");
            }
            m_logger.info("Error when init CAT server " + sb.toString());
        } catch (Exception e) {
            // ignore
        }
        return null;
    }

private boolean shouldCheckServerConfig(int count) {
        int duration = 30;
//m_activeChannelHolder.getActiveIndex() == -1表示关闭了当前连接
        if (count % duration == 0 || m_activeChannelHolder.getActiveIndex() == -1) {
            return true;
        } else {
            return false;
        }
    }

private Pair<Boolean, String> routerConfigChanged() {
        String current = loadServerConfig();//获取当前路由表中的服务地址信息。示例:ip1:2280;ip2:2280...;

//current不为空 && 路由表中的配置没有任何变化
        if (!StringUtils.isEmpty(current) && !current.equals(m_activeChannelHolder.getActiveServerConfig())) {

            return new Pair<Boolean, String>(true, current);
        } else {
            return new Pair<Boolean, String>(false, current);
        }
    }

private String loadServerConfig() {
        try {
        //使用http请求获取路由表配置信息
        //示例url:http://ip:port/cat/s/router?domain=someDomain&ip=当前客户端ip&op=json
        //返回的content :{"kvs":{"routers":"ip1:2280;ip2:2280;..;","sample":"1.0"}}

            String url = m_configManager.getServerConfigUrl();
            InputStream inputstream = Urls.forIO().readTimeout(2000).connectTimeout(1000).openStream(url);
            String content = Files.forIO().readFrom(inputstream, "utf-8");

            KVConfig routerConfig = (KVConfig) m_jsonBuilder.parse(content.trim(), KVConfig.class);
            String current = routerConfig.getValue("routers");
            m_sample = Double.valueOf(routerConfig.getValue("sample").trim());

            return current.trim();
        } catch (Exception e) {
            // ignore
        }
        return null;
    }

StatusUpdateTask心跳上报线程

这个线程比较简单,每分钟上报关于应用的各种信息。而且,在每次线程启动时上报一个Reboot消息表示重启动

public void run() {
   // try to wait cat client init success
   try {
      Thread.sleep(10 * 1000);
   } catch (InterruptedException e) {
      return;
   }

   while (true) {
      Calendar cal = Calendar.getInstance();
      int second = cal.get(Calendar.SECOND);

      // try to avoid send heartbeat at 59-01 second
      if (second < 2 || second > 58) {
         try {
            Thread.sleep(1000);
         } catch (InterruptedException e) {
            // ignore it
         }
      } else {
         break;
      }
   }

   try {
      buildClasspath();
   } catch (Exception e) {
      e.printStackTrace();
   }
   MessageProducer cat = Cat.getProducer();
   Transaction reboot = cat.newTransaction("System", "Reboot");

   reboot.setStatus(Message.SUCCESS);
   cat.logEvent("Reboot", NetworkInterfaceManager.INSTANCE.getLocalHostAddress(), Message.SUCCESS, null);
   reboot.complete();

   while (m_active) {
      long start = MilliSecondTimer.currentTimeMillis();

      if (m_manager.isCatEnabled()) {
         Transaction t = cat.newTransaction("System", "Status");
         Heartbeat h = cat.newHeartbeat("Heartbeat", m_ipAddress);
         StatusInfo status = new StatusInfo();

         t.addData("dumpLocked", m_manager.isDumpLocked());
         try {
            StatusInfoCollector statusInfoCollector = new StatusInfoCollector(m_statistics, m_jars);

            status.accept(statusInfoCollector.setDumpLocked(m_manager.isDumpLocked()));

            buildExtensionData(status);
            h.addData(status.toString());
            h.setStatus(Message.SUCCESS);
         } catch (Throwable e) {
            h.setStatus(e);
            cat.logError(e);
         } finally {
            h.complete();
         }
         t.setStatus(Message.SUCCESS);
         t.complete();
      }
      long elapsed = MilliSecondTimer.currentTimeMillis() - start;

      if (elapsed < m_interval) {
         try {
            Thread.sleep(m_interval - elapsed);
         } catch (InterruptedException e) {
            break;
         }
      }
   }
}