开源监控平台-cat服务端原理解析

概况

cat服务器端分为两大模块:

Cat-consumer: 用于实时分析从客户端提供的数据

Cat-home:作为用户给用户提供展示的控制端 ,并且Cat-home做展示时,通过对Cat-Consumer的调用获取其他节点的数据,将所有数据汇总展示

consumer、home是部署在一起的,每个服务端节点都可以充当任何一个角色

CAT服务端在整个实时处理中,基本上实现了全异步化处理:

  • 消息消费基于Netty的NIO实现(Netty-Server)。
  • 消息消费到服务端就存放内存队列,然后程序开启一个线程会消费这个消息做消息分发(异步消费处理)。
  • 每个消息都会有一批线程并发消费各自队列的数据,以做到消息处理的隔离。(每报表每线程,分别按照自己的规则解析消费这个消息,并且可以动态控制对某种报表类型的处理线程个数)
  • 消息(原始的消息logView)存储是先存入本地磁盘,然后异步上传到HDFS文件,这也避免了强依赖HDFS。

初始化

CAT目前是使用war包放入Servlet容器(tomcat容器)中的方式部署启动。 熟悉servlet容器的同学应该知道,容器启动时会读取每个Context(可理解为web工程)中的web.xml然后启动Servlet等其他组件。在cat-home模块中的web.xml中可以看到,除了容器默认的Servlet之外,tomcat启动时会启动CatServlet、MVC这两个Servlet(因为load-on-startup>0,也就是会调用init方法初始化):

web.xml

<filter>
   <filter-name>cat-filter</filter-name>
   <filter-class>com.dianping.cat.servlet.CatFilter</filter-class>
</filter>
<filter>
   <filter-name>domain-filter</filter-name>
   <filter-class>com.dianping.cat.report.view.DomainFilter</filter-class>
</filter>
<servlet>
   <servlet-name>cat-servlet</servlet-name>
   <servlet-class>com.dianping.cat.servlet.CatServlet</servlet-class>
   <load-on-startup>1</load-on-startup>
</servlet>
<servlet>
   <servlet-name>mvc-servlet</servlet-name>
   <servlet-class>org.unidal.web.MVC</servlet-class>
   <init-param>
      <param-name>cat-client-xml</param-name>
      <param-value>client.xml</param-value>
   </init-param>
   <init-param>
      <param-name>init-modules</param-name>
      <param-value>false</param-value>
   </init-param>
   <load-on-startup>2</load-on-startup>
</servlet>
<filter-mapping>
   <filter-name>cat-filter</filter-name>
   <url-pattern>/r/*</url-pattern>
   <dispatcher>REQUEST</dispatcher>
</filter-mapping>
<filter-mapping>
   <filter-name>domain-filter</filter-name>
   <url-pattern>/r/*</url-pattern>
   <dispatcher>REQUEST</dispatcher>
</filter-mapping>
<filter-mapping>
   <filter-name>cat-filter</filter-name>
   <url-pattern>/s/*</url-pattern>
   <dispatcher>REQUEST</dispatcher>
</filter-mapping>
<filter-mapping>
   <filter-name>cat-filter</filter-name>
   <url-pattern>/jsp/*</url-pattern>
   <dispatcher>FORWARD</dispatcher>
</filter-mapping>
<servlet-mapping>
   <servlet-name>mvc-servlet</servlet-name>
   <url-pattern>/r/*</url-pattern>
</servlet-mapping>
<servlet-mapping>
   <servlet-name>mvc-servlet</servlet-name>
   <url-pattern>/s/*</url-pattern>
</servlet-mapping>
<jsp-config>
   <taglib>
      <taglib-uri>/WEB-INF/app.tld</taglib-uri>
      <taglib-location>/WEB-INF/app.tld</taglib-location>
   </taglib>
</jsp-config>

CatServlet.java

public void init(ServletConfig config) throws ServletException {
        super.init(config);
        try {
            //1.plexus IOC容器初始化(根据components.xml的设定完成IOC初始化)
            if (m_container == null) {
                m_container = ContainerLoader.getDefaultContainer();
            }
             //2.用来打印日志的m_logger对象实例化(根据plexus.xml设定完成实例化)
            m_logger = m_container.getLogger();
            //3.初始化CAT-Server必备的组件模块:cat-home\cat-consumer\cat-core
            initComponents(config);
        } catch (Exception e) {
            if (m_logger != null) {
                m_logger.error("Servlet initializing failed. " + e, e);
            } else {
                System.out.println("Servlet initializing failed. " + e);
                e.printStackTrace(System.out);
            }
            throw new ServletException("Servlet initializing failed. " + e, e);
        }
    }


protected void initComponents(ServletConfig servletConfig) throws ServletException {
   try {
       //ModuleContext ctx这个对象里主要作用:
        //1.持有 plexus IOC 容器的引用;
        //2.持有 logger对象引用,用来打日志。
        //3.持有 需要使用到的配置文件路径。
        //比如:cat-server-config-file=\data\appdatas\cat\server.xml
        //cat-client-config-file=\data\appdatas\cat\client.xml
      ModuleContext ctx = new DefaultModuleContext(getContainer());
      ModuleInitializer initializer = ctx.lookup(ModuleInitializer.class);
      File clientXmlFile = getConfigFile(servletConfig, "cat-client-xml", "client.xml");
      File serverXmlFile = getConfigFile(servletConfig, "cat-server-xml", "server.xml");
      ctx.setAttribute("cat-client-config-file", clientXmlFile);
      ctx.setAttribute("cat-server-config-file", serverXmlFile);
        //通过查找启动cat-home必要的模块,然后依次初始化各个模块。
      initializer.execute(ctx);
   } catch (Exception e) {
      m_exception = e;
      System.err.println(e);
      throw new ServletException(e);
   }
}

DefaultModuleInitializer

public void execute(ModuleContext ctx) {
    //通过配置文件components-cat-home.xml,可以看到我们的topLevelModule是cat-home模块,通过这个模块去查找需要依赖的其他模块并初始化他们。
    Module[] modules = m_manager.getTopLevelModules();
    execute(ctx, modules);
 }

 public void execute(ModuleContext ctx, Module... modules) {
    Set<Module> all = new LinkedHashSet<Module>();
    info(ctx, "Initializing top level modules:");
    for (Module module : modules) {
       info(ctx, "   " + module.getClass().getName());
    }
    try {
        //1.根据顶层Module获取到下层所有依赖到的modules,并分别调用他们的setup方法
       expandAll(ctx, modules, all);
       //2.依次调用module实现类的execute方法
       for (Module module : all) {
          if (!module.isInitialized()) {
             executeModule(ctx, module, m_index++);
          }
       }
    } catch (RuntimeException e) {
       throw e;
    } catch (Error e) {
       throw e;
    } catch (Exception e) {
       throw new RuntimeException("Error when initializing modules! Exception: " + e, e);
    }
 }

 private void expandAll(ModuleContext ctx, Module[] modules, Set<Module> all) throws Exception {
    if (modules != null) {
       for (Module module : modules) {
          expandAll(ctx, module.getDependencies(ctx), all);
          if (!all.contains(module)) {
             if (module instanceof AbstractModule) {
                ((AbstractModule) module).setup(ctx);//调用各个module实现类的setup,除了CatHomeModule,其他的setup方法为空
             }
              //all 最终元素以及顺序:
              //CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule
             all.add(module);
          }
       }
    }
 }

CatHomeModule.java

protected void setup(ModuleContext ctx) throws Exception {
    if (!isInitialized()) {
         //通过 plexus IOC 初始化一个 ServerConfigManager bean
        File serverConfigFile = ctx.getAttribute("cat-server-config-file");
        ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
         //通过 plexus IOC 初始化一个 TcpSocketReceiver bean
        final TcpSocketReceiver messageReceiver = ctx.lookup(TcpSocketReceiver.class);
         //加载\...\server.xml中的配置
        serverConfigManager.initialize(serverConfigFile);
        //启动TCPSocketReceiver, netty 事件驱动服务器,用来接收客户端的TCP长连接请求
        messageReceiver.init();
        //增加一个钩子,在这个JVM关闭时回调
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                messageReceiver.destory();
            }
        });
    }
}

TcpSocketReceiver.java

public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();

        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast("decode", new MessageDecoder());
            }
        });

        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }

各个模块的启动,executeModule
各个模块setup就说到这里,setup完成后,会依次调用module.execute(…)用来完成各个模块的启动。

依次调用:
CatClientModule\CatCoreModule\CatConsumerModule\CatHomeModule.其中只有CatClientModule、CatHomeModule实现了有效的execute方法。

CatHomeModule.java

protected void execute(ModuleContext ctx) throws Exception {

        //1,获取服务器配置信息
        ServerConfigManager serverConfigManager = ctx.lookup(ServerConfigManager.class);
        /**初始化MessageConsumer子类RealtimeConsumer,不仅实例化这个类MessageConsumer对象,还会把这个类中的成员全部实例化
              <plexus>
              <components>
                  <component>
                      <role>com.dianping.cat.analysis.MessageConsumer</role>
                      <implementation>com.dianping.cat.analysis.RealtimeConsumer</implementation>
                      <requirements>
                          <requirement>
                              <role>com.dianping.cat.analysis.MessageAnalyzerManager</role>
                          </requirement>
                          <requirement>
                              <role>com.dianping.cat.statistic.ServerStatisticManager</role>
                          </requirement>
                          <requirement>
                              <role>com.dianping.cat.config.server.BlackListManager</role>
                          </requirement>
                      </requirements>
                  </component>
        **/
        ctx.lookup(MessageConsumer.class);

        //启动配置加载线程
        ConfigReloadTask configReloadTask = ctx.lookup(ConfigReloadTask.class);
        Threads.forGroup("cat").start(configReloadTask);
        if (serverConfigManager.isJobMachine()) {
            //如果此服务器是报告工作机,生成DefaultTaskConsumer任务,开启生成汇总报告和统计报告的
            DefaultTaskConsumer taskConsumer = ctx.lookup(DefaultTaskConsumer.class);
            Threads.forGroup("cat").start(taskConsumer);
        }
        if (serverConfigManager.isAlertMachine()) {
            //如果此服务器是报警工作机,开启各类报警监听
            TransactionAlert transactionAlert = ctx.lookup(TransactionAlert.class);
            EventAlert eventAlert = ctx.lookup(EventAlert.class);
            BusinessAlert metricAlert = ctx.lookup(BusinessAlert.class);
            ExceptionAlert exceptionAlert = ctx.lookup(ExceptionAlert.class);
            HeartbeatAlert heartbeatAlert = ctx.lookup(HeartbeatAlert.class);
            //ThirdPartyAlert thirdPartyAlert = ctx.lookup(ThirdPartyAlert.class);
            //ThirdPartyAlertBuilder alertBuildingTask = ctx.lookup(ThirdPartyAlertBuilder.class);
            // DatabaseAlert databaseAlert = ctx.lookup(DatabaseAlert.class);
            // SystemAlert systemAlert = ctx.lookup(SystemAlert.class);
            // NetworkAlert networkAlert = ctx.lookup(NetworkAlert.class);
            // FrontEndExceptionAlert frontEndExceptionAlert = ctx.lookup(FrontEndExceptionAlert.class);
            // AppAlert appAlert = ctx.lookup(AppAlert.class);
            // WebAlert webAlert = ctx.lookup(WebAlert.class);
            // StorageSQLAlert storageDatabaseAlert = ctx.lookup(StorageSQLAlert.class);
            // StorageCacheAlert storageCacheAlert = ctx.lookup(StorageCacheAlert.class);
            Threads.forGroup("cat").start(metricAlert);
            Threads.forGroup("cat").start(exceptionAlert);
            Threads.forGroup("cat").start(heartbeatAlert);
            Threads.forGroup("cat").start(transactionAlert);
            Threads.forGroup("cat").start(eventAlert);
            //Threads.forGroup("cat").start(thirdPartyAlert);
            //Threads.forGroup("cat").start(alertBuildingTask);
            // Threads.forGroup("cat").start(appAlert);
            // Threads.forGroup("cat").start(webAlert);
            // Threads.forGroup("cat").start(storageDatabaseAlert);
            // Threads.forGroup("cat").start(storageCacheAlert);
            // Threads.forGroup("cat").start(frontEndExceptionAlert);
            // Threads.forGroup("cat").start(networkAlert);
            // Threads.forGroup("cat").start(databaseAlert);
            // Threads.forGroup("cat").start(systemAlert);
        }
        final MessageConsumer consumer = ctx.lookup(MessageConsumer.class);
        //定义一个虚拟机关闭钩子
        Runtime.getRuntime().addShutdownHook(new Thread() {
            @Override
            public void run() {
                consumer.doCheckpoint();
            }
        });
    }

至此,CatServlet初始化完成了,接下来会初始化org.unidal.web.MVC这个Servlet。
我们接着看一下另外一个Servlet:mvc-servlet

MVC这个Servlet继承了AbstractContainerServlet,与CatServlet非常类似,均是AbstractContainerServlet 的实现类。这个Servlet顾名思义就是用来处理请求的,类似Spring中的DispatcherServlet,集中分配进入的请求到对应的Controller。

MVC

protected void initComponents(ServletConfig config) throws Exception {
    // /cat
    String contextPath = config.getServletContext().getContextPath();
    // /cat
    String path = contextPath == null || contextPath.length() == 0 ? "/" : contextPath;
    getLogger().info("MVC is starting at " + path);
   //使用client.xml初始化代表CATClient的com.dianping.cat.Cat对象(如果CAT未被初始化)。
    initializeCat(config);
    //如果CatHomeModule没有初始化的话,则初始化
    initializeModules(config);
    m_handler = lookup(RequestLifecycle.class, "mvc");
    m_handler.setServletContext(config.getServletContext());
    config.getServletContext().setAttribute(ID, this);
    getLogger().info("MVC started at " + path);
 }

初始化完成后看看后端的启动的守护线程

消息接收、处理

消息消费、解析是在TcpSocketReceiver完成的

TcpSocketReceiver.java

// 在CatHomeModule启动时被调用
    public void init() {
        try {
            startServer(m_port);
        } catch (Throwable e) {-
            m_logger.error(e.getMessage(), e);
        }
    }
    /**
     * 启动一个netty服务端
     * @param port
     * @throws InterruptedException
     */
    public synchronized void startServer(int port) throws InterruptedException {
        boolean linux = getOSMatches("Linux") || getOSMatches("LINUX");
        int threads = 24;
        ServerBootstrap bootstrap = new ServerBootstrap();
        //linux走epoll的事件驱动模型
        m_bossGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用来做为接受请求的线程池 master线程
        m_workerGroup = linux ? new EpollEventLoopGroup(threads) : new NioEventLoopGroup(threads);//用来做为处理请求的线程池 slave线程
        bootstrap.group(m_bossGroup, m_workerGroup);
        bootstrap.channel(linux ? EpollServerSocketChannel.class : NioServerSocketChannel.class);

        bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {//channel初始化设置
            @Override
            protected void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();

                pipeline.addLast("decode", new MessageDecoder());//增加消息解码器,handle处理
            }
        });
        // 设置channel的参数
        bootstrap.childOption(ChannelOption.SO_REUSEADDR, true);
        bootstrap.childOption(ChannelOption.TCP_NODELAY, true);
        bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true);
        bootstrap.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);

        try {
            m_future = bootstrap.bind(port).sync();//绑定监听端口,并同步等待启动完成
            m_logger.info("start netty server!");
        } catch (Exception e) {
            m_logger.error("Started Netty Server Failed:" + port, e);
        }
    }






   protected void decode(ChannelHandlerContext ctx, ByteBuf buffer, List<Object> out) throws Exception {
      if (buffer.readableBytes() < 4) {
         return;
      }
      buffer.markReaderIndex();
      int length = buffer.readInt();
      buffer.resetReaderIndex();
      if (buffer.readableBytes() < length + 4) {
         return;
      }
      try {
         if (length > 0) {
            ByteBuf readBytes = buffer.readBytes(length + 4);
            readBytes.markReaderIndex();
            readBytes.readInt();

            DefaultMessageTree tree = (DefaultMessageTree) m_codec.decode(readBytes);

            readBytes.resetReaderIndex();
            tree.setBuffer(readBytes);
            //交给handler处理
            m_handler.handle(tree);
            m_processCount++;

            long flag = m_processCount % CatConstants.SUCCESS_COUNT;

            if (flag == 0) {
               m_serverStateManager.addMessageTotal(CatConstants.SUCCESS_COUNT);
            }
         } else {
            // client message is error
            buffer.readBytes(length);
         }
      } catch (Exception e) {
         m_serverStateManager.addMessageTotalLoss(1);
         m_logger.error(e.getMessage(), e);
      }
   }
}

启动netty,对每个客户端上报的消息都会做解码处理,从字节流转换为消息树MessageTree tree,接着交给DefaultMessageHandler处理。

DefaultMessageHandler

public class DefaultMessageHandler extends ContainerHolder implements MessageHandler, LogEnabled {

    /*
     * MessageConsumer按每个period(整小时一个period)组合了多个解析器,用来解析生产多个报表(如:Transaction、
     * Event、Problem等等)。一个解析器对象-一个有界队列-一个整小时时间组合了一个PeriodTask,轮询的处理这个有界队列中的消息
     */
    @Inject
    private MessageConsumer m_consumer;

    private Logger m_logger;

    @Override
    public void enableLogging(Logger logger) {
        m_logger = logger;
    }

    @Override
    public void handle(MessageTree tree) {
        if (m_consumer == null) {
            //从容器中加载MessageConsumer实例
         m_consumer = lookup(MessageConsumer.class);
        }

        try {
            m_consumer.consume(tree);//消息消费
        } catch (Throwable e) {
            m_logger.error("Error when consuming message in " + m_consumer + "! tree: " + tree, e);
        }
    }
}

CAT服务器端的设计是按照每个小时来汇总数据的,这个是一个trade-off,实时内存数据处理的复杂度和内存开销方面的一个折中。在当前小时结束后将生成的报表存储到mysql、本地磁盘(可以异步同步到HDFS)等,当前小时内的数据是存在内存中的。

这块比较乱,大致的依赖关系如下图

RealtimeConsumer.java

public class RealtimeConsumer extends ContainerHolder implements MessageConsumer, Initializable, LogEnabled {
    private PeriodManager m_periodManager;
    private Logger m_logger;
    public static final long MINUTE = 60 * 1000L;
    public static final long HOUR = 60 * MINUTE;
    @Override
    public void consume(MessageTree tree) {
        long timestamp = tree.getMessage().getTimestamp();
        //获取到当前小时的period
        Period period = m_periodManager.findPeriod(timestamp);
        if (period != null) {
            //period具体的逻辑处理
            period.distribute(tree);
        } else {
            m_serverStateManager.addNetworkTimeError(1);
        }
    }
    //初始化
    public void initialize() throws InitializationException {
        //初始化PeriodManager
        m_periodManager = new PeriodManager(HOUR, m_analyzerManager, m_serverStateManager, m_logger);
        m_periodManager.init();
        //启动守护线程PeriodManager
        Threads.forGroup("cat").start(m_periodManager);
    }
}

PeriodManager.java

public class PeriodManager implements Task {
    private PeriodStrategy m_strategy;
    private List<Period> m_periods = new ArrayList<Period>();
    private boolean m_active;
    @Inject
    private Logger m_logger;
    public static long EXTRATIME = 3 * 60 * 1000L;
    public PeriodManager(long duration, MessageAnalyzerManager analyzerManager,ServerStatisticManager serverStateManager, Logger logger) {
        //初始化PeriodStrategy
        m_strategy = new PeriodStrategy(duration, EXTRATIME, EXTRATIME);
        m_active = true;
        m_analyzerManager = analyzerManager;
        m_serverStateManager = serverStateManager;
        m_logger = logger;
    }

     //当这个小时结束后,会异步的调用这个方法,将过期的Period对象移除
    private void endPeriod(long startTime) {
        int len = m_periods.size();
        for (int i = 0; i < len; i++) {
            Period period = m_periods.get(i);
            if (period.isIn(startTime)) {
                period.finish();
                m_periods.remove(i);
                break;
            }
        }
    }
    public Period findPeriod(long timestamp) {
        for (Period period : m_periods) {
            if (period.isIn(timestamp)) {
                return period;
            }
        }
        return null;
    }
    //初始化
    public void init() {
        //当前小时的起始时间
        long startTime = m_strategy.next(System.currentTimeMillis());
        startPeriod(startTime);
    }
    @Override
    public void run() {
         // 每隔1s检查一下当前小时的Period对象是否需要创建(一般都是新的小时需要创建一个Period代表当前小时)
        while (m_active) {
            try {
                long now = System.currentTimeMillis();
                long value = m_strategy.next(now);
                 //value>0表示当前小时的Period不存在,需要创建一个,如果当前线小时的Period存在,那么Value==0
                if (value > 0) {
                    startPeriod(value);
                } else if (value < 0) {
                    //当这个小时结束后,启动守护线程EndTaskThread,会异步的调用endPeriod(..),将过期的Period对象移除
                    Threads.forGroup("cat").start(new EndTaskThread(-value));
                }
            } catch (Throwable e) {
                Cat.logError(e);
            }
            try {
                Thread.sleep(1000L);
            } catch (InterruptedException e) {
                break;
            }
        }
    }
    private void startPeriod(long startTime) {
        long endTime = startTime + m_strategy.getDuration();
        Period period = new Period(startTime, endTime, m_analyzerManager, m_serverStateManager, m_logger);
        m_periods.add(period);
        period.start();
    }
    private class EndTaskThread implements Task {
        private long m_startTime;
        public EndTaskThread(long startTime) {
            m_startTime = startTime;
        }
        @Override
        public String getName() {
            return "End-Consumer-Task";
        }
        @Override
        public void run() {
            endPeriod(m_startTime);
        }
        @Override
        public void shutdown() {
        }
    }
}

周期策略

在创建周期策略对象的时候,会传入3个参数,一个是duration,也就是每个周期的时间长度,默认为1个小时,另外两个extraTimeaheadTime分别表示我提前启动一个周期的时间和延迟结束一个周期的时间,默认都是3分钟,我们并不会卡在整点时间,例如10:00去开启或结束一个周期,因为周期创建是需要消耗一定时间,这样可以避免消息过来周期对象还未创建好,或者消息还没有处理完,就要去结束周期。

当然,即使提前创建了周期对象(Period),并不意味着就会立即被分发消息,只有到了该周期时间才会被分发消息。

下面看看具体的策略方法,我们首先计算当前时间的周期启动时间(startTime),是当前时间的整点时间,比如当前时间是 22:47.123,那么startTime就是 22:00.000,注意这里的时间都是时间戳,单位为毫秒。

接下来判断是否开启当前周期,如果startTime大于上次周期启动时间(m_lastStartTime),说明应该开启新的周期,由于m_lastStartTime初始化为 -1, 所以CAT服务端初始化之后第一个周期会执行到这里,并记录m_lastStartTime。

上面if如果未执行,我们会判断当前时间比起上次周期启动时间是不是已经过了 57 分钟(duration - aheadTime ),即提前3分钟启动下一个周期。

如果上面if还未执行,我们则认为当前周期已经被启动,那么会判断是否需要结束当前周期,即当前时间比起上次周期启动时间是不是已经过了 63 分钟(duration + extraTime),即延迟3分钟关闭上一个周期。

PeriodStrategy.java

package com.dianping.cat.analysis;

public class PeriodStrategy {
   private long m_duration;

   private long m_extraTime;

   private long m_aheadTime;

   private long m_lastStartTime;

   private long m_lastEndTime;

   public PeriodStrategy(long duration, long extraTime, long aheadTime) {
      m_duration = duration;
      m_extraTime = extraTime;
      m_aheadTime = aheadTime;
      m_lastStartTime = -1;
      m_lastEndTime = 0;
   }

   public long getDuration() {
      return m_duration;
   }

   public long next(long now) {
      long startTime = now - now % m_duration;

      // for current period
      if (startTime > m_lastStartTime) {
         m_lastStartTime = startTime;
         return startTime;
      }

      // prepare next period ahead
      if (now - m_lastStartTime >= m_duration - m_aheadTime) {
         m_lastStartTime = startTime + m_duration;
         return startTime + m_duration;
      }

      // last period is over
      if (now - m_lastEndTime >= m_duration + m_extraTime) {
         long lastEndTime = m_lastEndTime;
         m_lastEndTime = startTime;
         return -lastEndTime;
      }

      return 0;
   }
}

Period.java

public class Period {
    private long m_startTime;
    private long m_endTime;
    private Map<String, List<PeriodTask>> m_tasks;
    @Inject
    private Logger m_logger;
    private static int QUEUE_SIZE = 30000;
    /**
     * 将解码后的tree消息依次分发给所有类型解析器
     * @param tree
     */
    public void distribute(MessageTree tree) {
        // 根据domain,统计消息量
        m_serverStateManager.addMessageTotal(tree.getDomain(), 1);
        boolean success = true;
        String domain = tree.getDomain();
        for (Entry<String, List<PeriodTask>> entry : m_tasks.entrySet()) {
            //某种类型报表的解析器
            List<PeriodTask> tasks = entry.getValue();
            int length = tasks.size();
            int index = 0;
            boolean manyTasks = length > 1;
            if (manyTasks) {
                //hashCode的绝对值 % 长度 =0~length-1之间的任一个数
                index = Math.abs(domain.hashCode()) % length;
            }
            PeriodTask task = tasks.get(index);
            //这里会把同一个消息依依放入各个报表解析中的队列中
            boolean enqueue = task.enqueue(tree);
            if (enqueue == false) {
                if (manyTasks) {
                    task = tasks.get((index + 1) % length);
                    //放入队列,异步消费
                    enqueue = task.enqueue(tree);
                    if (enqueue == false) {
                        success = false;
                    }
                } else {
                    success = false;
                }
            }
        }
        if (!success) {
            m_serverStateManager.addMessageTotalLoss(tree.getDomain(), 1);
        }
    }
}

PeriodTask.java

public class PeriodTask implements Task, LogEnabled {
    @Override
    public void run() {
        try {
            //每个periodTask对应一个线程,m_analyzer对应解析器处理m_queue中的消息
            m_analyzer.analyze(m_queue);
        } catch (Exception e) {
            Cat.logError(e);
        }
    }

AbstractMessageAnalyzer.java

public abstract class AbstractMessageAnalyzer<R> extends ContainerHolder implements MessageAnalyzer {
    public static final long MINUTE = 60 * 1000L;
    public static final long ONE_HOUR = 60 * 60 * 1000L;
    public static final long ONE_DAY = 24 * ONE_HOUR;
    public void analyze(MessageQueue queue) {
        /**
        *解析器在当前小时内自旋,不停从队列中拿取消息,然后处理
        *timeOut:当前时间>当前小时的开始时间+一小时+三分钟;
        *isActive默认为true,调用shutdown后为false
        */

        while (!isTimeout() && isActive()) {
            //从队列弹出一个消息处理
            MessageTree tree = queue.poll();
            if (tree != null) {
                try {
                    //处理消息,具体的由子类去实现,典型的模板模式
                    process(tree);
                } catch (Throwable e) {
                    m_errors++
                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            }
        }
        // 如果当前解析器已经超时,那么处理完对应队列内的消息后返回。
        while (true) {
            MessageTree tree = queue.poll();
            if (tree != null) {
                try {
                    process(tree);
                } catch (Throwable e) {
                    m_errors++;
                    if (m_errors == 1 || m_errors % 10000 == 0) {
                        Cat.logError(e);
                    }
                }
            } else {
                break;
            }
        }
    }
}

TransactionAnalyzer

//报表管理器
private ReportManager<TransactionReport> m_reportManager;
public class TransactionAnalyzer extends AbstractMessageAnalyzer<TransactionReport> implements LogEnabled {
    public void process(MessageTree tree) {
            //具体的报表处理逻辑
    String domain = tree.getDomain();
    //生成一个TransactionReport 对象
    TransactionReport report = m_reportManager.getHourlyReport(getStartTime(), domain, true);
    Message message = tree.getMessage();

    report.addIp(tree.getIpAddress());

    if (message instanceof Transaction) {
            Transaction root = (Transaction) message;
            //如果是消息是Transaction,继续处理
            processTransaction(report, tree, root);
            }
        }
    }


public T getHourlyReport(long startTime, String domain, boolean createIfNotExist) {
   Map<String, T> reports = m_reports.get(startTime);

   if (reports == null && createIfNotExist) {
      synchronized (m_reports) {
         reports = m_reports.get(startTime);

         if (reports == null) {
            //新建一个reports,并放入m_reports,m_reports后续会讲到
            reports = new ConcurrentHashMap<String, T>();
            m_reports.put(startTime, reports);
         }
      }
   }

   if (reports == null) {
      reports = new LinkedHashMap<String, T>();
   }

   T report = reports.get(domain);

   if (report == null && createIfNotExist) {
      synchronized (reports) {
         report = m_reportDelegate.makeReport(domain, startTime, HOUR);
         reports.put(domain, report);
      }
   }

   if (report == null) {
      report = m_reportDelegate.makeReport(domain, startTime, HOUR);
   }

   return report;
}

public TransactionReport makeReport(String domain, long startTime, long duration) {
   TransactionReport report = new TransactionReport(domain);

   report.setStartTime(new Date(startTime));
   report.setEndTime(new Date(startTime + duration - 1));

   return report;
}

protected void processTransaction(TransactionReport report, MessageTree tree, Transaction t) {
        String type = t.getType();
        String name = t.getName();
        //根据type和name的确定是否需要处理
        if (m_serverFilterConfigManager.discardTransaction(type, name)) {
            return;
        } else {
            Pair<Boolean, Long> pair = checkForTruncatedMessage(tree, t);
            if (pair.getKey().booleanValue()) {
                String ip = tree.getIpAddress();
                TransactionType transactionType = report.findOrCreateMachine(ip).findOrCreateType(type);
                TransactionName transactionName = transactionType.findOrCreateName(name);
                String messageId = tree.getMessageId();
                //根据type和name处理
                processTypeAndName(t, transactionType, transactionName, messageId, pair.getValue().doubleValue() / 1000d);
            }
            List<Message> children = t.getChildren();
            for (Message child : children) {
                if (child instanceof Transaction) {
                    //递归的去处理
                    processTransaction(report, tree, (Transaction) child);
                }
            }
        }
    }



    protected void processTypeAndName(Transaction t, TransactionType type, TransactionName name, String messageId,
          double duration) {

        //计算
        type.incTotalCount();
        name.incTotalCount();
        if (t.isSuccess()) {
            type.setSuccessMessageUrl(messageId);
            name.setSuccessMessageUrl(messageId);
        } else {
            type.incFailCount();
            name.incFailCount();
            type.setFailMessageUrl(messageId);
            name.setFailMessageUrl(messageId);
        }
        int allDuration = ((int) computeDuration(duration));
        double sum = duration * duration;
        name.setMax(Math.max(name.getMax(), duration));
        name.setMin(Math.min(name.getMin(), duration));
        name.setSum(name.getSum() + duration);
        name.setSum2(name.getSum2() + sum);
        name.findOrCreateAllDuration(allDuration).incCount();
        type.setMax(Math.max(type.getMax(), duration));
        type.setMin(Math.min(type.getMin(), duration));
        type.setSum(type.getSum() + duration);
        type.setSum2(type.getSum2() + sum);
        type.findOrCreateAllDuration(allDuration).incCount();
        long current = t.getTimestamp() / 1000 / 60;
        int min = (int) (current % (60));
        //处理type和name
        processNameGraph(t, name, min, duration);
        processTypeRange(t, type, min, duration);
    }

每个分析器都包含有多个报表,报表交由报表管理器(ReportManage)管理,报表在报表管理器中存储结构如下:

Map<Long, Map<String, T>> m_reports
最外层是个Map, key 为long类型,代表的是当前时间周期的报表,value还是一个Map,key类型为String,代表的是不同的domain,一个domain可以理解为一个 Project,value是不同report对象,在分析器处理报表的时候,我们会通过周期管理器(DefaultReportManage)的getHourlyReport方法根据周期时间和domain获取对应的Report。

分析器分析上报的消息之后,生成相应的报表存于Report对象中,报表实体类XxxReport的结构配置位于 cat-sonsumer/src/main/resources/META-INFO/dal/model/*.xml 中

统计结果存于TransactionReport,依然是以周期时间和domain来划分不同的报表。 相同的domain下的不同IP对应的统计信息是存于不同的Machine对象中,可以在cat-consumer/target/generated-sources/dal-model/com/dianping/cat/consumer/ 目录下去查看这个类。

每台机器下面,不同类型的事务统计信息会存于不同的TransactionType对象里,在管理页面上,我们展开指定Type,会看到该Type下所有Name的统计信息,相同Type下的不同名称的统计信息就是分别存在于不同的TransactionName下面,点开每条记录前面的 [:: show ::], 我们将会看到该周期小时内每分钟的统计信息,每分钟的统计存储在 Type 的 Range2对象、Name的Range对象内,实际上Range2和Range对象的代码结构完全一致,除了类名不同,你可以认为他们就是同一个东西。

Type和Name都会统计总执行次数、失败次数、示例链接、最小时间、最大调用时间、平均值、标准差等等信息,同时分析器会选取最近一条消息作为他的示例链接,将messageId存于m_successMessageUrl或者m_failMessageUrl中。

我们会根据一定规则划分几个执行时间区间,将该区间的事务消息总数统计在 AllDuration 和 Duration 对象中。

总结:

消息发送到服务端,服务端解码为 MessageTree准备消费。期间存在一个demon线程,1s检查一下当前小时的Period对象是否需要创建(一般都是新的小时需要创建一个Period代表当前小时)。

如果当前小时的Period存在,那么我们的MessageTree会被分发给各个PeriodTask,这里其实就是把消息发送到每个PeriodTask中的内存队列里,然后每个Task异步去消费。就是通过使用Queue实现了解耦与延迟异步消费。

每个PeriodTask持有MessageAnalyzer analyzer(Transaction\Event\Problean…每种报表都对应一个解析器的实现类)、MessageQueue queue对象,PeriodTask会不停地解析被分发进来的MessageTree,形成这个解析器所代表的报表。

当前时间进入下个小时,会创建一个新的当前小时的Period,并且异步的remove之前的Period。

存储

存储主要分成两类:一个是 报表(Transaction、Event、Problem….),一个是logview,也是就是原始的MessageTree。

所有原始消息会先存储在本地文件系统,然后上传到HDFS中保存;而对于报表,因其远比原始日志小,则以K/V的方式保存在MySQL中。

报表存储:在每个小时结束后,将内存中的各个XML报表 保存到Mysql、File(\data\appdatas\cat\bucket\report…)中。

EndTaskThread.java

private class EndTaskThread implements Task {

   private long m_startTime;

   public EndTaskThread(long startTime) {
      m_startTime = startTime;
   }

   @Override
   public String getName() {
      return "End-Consumer-Task";
   }

   @Override
   public void run() {
      endPeriod(m_startTime);
   }

   @Override
   public void shutdown() {
   }
}
private void endPeriod(long startTime) {
   int len = m_periods.size();

   for (int i = 0; i < len; i++) {
      Period period = m_periods.get(i);

      if (period.isIn(startTime)) {
        //处理结束小时的逻辑
         period.finish();
         m_periods.remove(i);
         break;
      }
   }
}
public void finish() {
   SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
   Date startDate = new Date(m_startTime);
   Date endDate = new Date(m_endTime - 1);

   m_logger.info(String.format("Finishing %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
         df.format(endDate)));

   try {
      for (Entry<String, List<PeriodTask>> tasks : m_tasks.entrySet()) {
         for (PeriodTask task : tasks.getValue()) {
            //依次调用PeriodTask的finish
            task.finish();
         }
      }
   } catch (Throwable e) {
      Cat.logError(e);
   } finally {
      m_logger.info(String.format("Finished %s tasks in period [%s, %s]", m_tasks.size(), df.format(startDate),
            df.format(endDate)));
   }
}
public void finish() {
   try {
      //调用m_analyzer的doCheckpoint和destroy
      m_analyzer.doCheckpoint(true);
      m_analyzer.destroy();
   } catch (Exception e) {
      Cat.logError(e);
   }
}

我们先详细剖析storeHourlyReports 的过程,然后再看看几个特殊的分析器的结束逻辑。storeHourlyReports 首先将该分析器生成的所有报表都取出,然后我们会校验报表的domain名称是否合法,不合法的报表将被移除,在序列化之前,我们会调用ReportDelegate.beforeSave(…)方法做一些预处理的工作。不同种类的报表,预处理所做的工作是不同的,后续我们分别讲解,做完预处理的工作之后,我们就正式持久化了,支持文件和数据库两种持久化方式,我们会根据传入的序列化策略(StoragePolicy) 来选择需要进行哪种序列化,一般来说,如果是正常的周期结束,数据会持久化到文件和数据库,如果是JVM Shutdown导致的结束,只持久化到文件,两种持久化的细节后续我们也会分别详细讲解。

TransactionAnalyzer

public synchronized void doCheckpoint(boolean atEnd) {
   if (atEnd) {
      m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE_AND_DB, m_index);
   } else {
      m_reportManager.storeHourlyReports(getStartTime(), StoragePolicy.FILE, m_index);
   }
}

public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
   Transaction t = Cat.newTransaction("Checkpoint", m_name);
   Map<String, T> reports = m_reports.get(startTime);
   ReportBucket bucket = null;

   try {
      t.addData("reports", reports == null ? 0 : reports.size());

      if (reports != null) {
         Set<String> errorDomains = new HashSet<String>();

         for (String domain : reports.keySet()) {
            if (!m_validator.validate(domain)) {
               errorDomains.add(domain);
            }
         }
         for (String domain : errorDomains) {
            //将不符合规则的domain去除,这里会过滤掉所有的中文
            reports.remove(domain);
         }
         if (!errorDomains.isEmpty()) {
            m_logger.info("error domain:" + errorDomains);
         }

         m_reportDelegate.beforeSave(reports);

         if (policy.forFile()) {
            //初始化一个存放报表的bucket
            bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

            try {
               //存储到文件
               storeFile(reports, bucket);
            } finally {
               //关闭bucket
               m_bucketManager.closeBucket(bucket);
            }
         }

         if (policy.forDatabase()) {
            //将报表存储到DB
            storeDatabase(startTime, reports);
         }
      }
      t.setStatus(Message.SUCCESS);
   } catch (Throwable e) {
      Cat.logError(e);
      t.setStatus(e);
      m_logger.error(String.format("Error when storing %s reports of %s!", m_name, new Date(startTime)), e);
   } finally {
      cleanup(startTime);
      t.complete();

      if (bucket != null) {
         m_bucketManager.closeBucket(bucket);
      }
   }
}

在做完预处理之后,所有报表都将被持久化到文件,在DefaultReportManager调用storeFile存储文件之前,我们先调用 m_bucketManager.getReportBucket(…) 来创建并初始化ReportBucket,文件的读写相关操作都封装于ReportBucket里面。

DefaultReportManager.java

private ReportDelegate<T> m_reportDelegate;
private ReportBucketManager m_bucketManager;
private HourlyReportDao m_reportDao;
private HourlyReportContentDao m_reportContentDao;
//key:时间戳 value:报表内容(key:domain,value:内容)
private Map<Long, Map<String, T>> m_reports = new ConcurrentHashMap<Long, Map<String, T>>();

public void storeHourlyReports(long startTime, StoragePolicy policy, int index) {
   Transaction t = Cat.newTransaction("Checkpoint", m_name);
   Map<String, T> reports = m_reports.get(startTime);
   ReportBucket bucket = null;

   try {
      t.addData("reports", reports == null ? 0 : reports.size());

      if (reports != null) {
         Set<String> errorDomains = new HashSet<String>();

         for (String domain : reports.keySet()) {
            if (!m_validator.validate(domain)) {
               errorDomains.add(domain);
            }
         }
         for (String domain : errorDomains) {
            //将不符合规则的domain去除,这里会过滤掉所有的中文
            reports.remove(domain);
         }
         if (!errorDomains.isEmpty()) {
            m_logger.info("error domain:" + errorDomains);
         }

         m_reportDelegate.beforeSave(reports);

         if (policy.forFile()) {
            //初始化一个存放报表的bucket
            bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

            try {
               //存储到文件
               storeFile(reports, bucket);
            } finally {
               //关闭bucket
               m_bucketManager.closeBucket(bucket);
            }
         }

         if (policy.forDatabase()) {
            //将报表存储到DB
            storeDatabase(startTime, reports);
         }
      }
      t.setStatus(Message.SUCCESS);
   } catch (Throwable e) {
      Cat.logError(e);
      t.setStatus(e);
      m_logger.error(String.format("Error when storing %s reports of %s!", m_name, new Date(startTime)), e);
   } finally {
      cleanup(startTime);
      t.complete();

      if (bucket != null) {
         m_bucketManager.closeBucket(bucket);
      }
   }
}

private void storeFile(Map<String, T> reports, ReportBucket bucket) {
   for (T report : reports.values()) {
      try {
         String domain = m_reportDelegate.getDomain(report);
         String xml = m_reportDelegate.buildXml(report);

         bucket.storeById(domain, xml);
      } catch (Exception e) {
         Cat.logError(e);
      }
   }
}

private void storeFile(Map<String, T> reports, ReportBucket bucket) {
   for (T report : reports.values()) {
      try {
         String domain = m_reportDelegate.getDomain(report);
         String xml = m_reportDelegate.buildXml(report);

         bucket.storeById(domain, xml);
      } catch (Exception e) {
         Cat.logError(e);
      }
   }
}

private void storeDatabase(long startTime, Map<String, T> reports) {
   Date period = new Date(startTime);
   String ip = NetworkInterfaceManager.INSTANCE.getLocalHostAddress();

   for (T report : reports.values()) {
      try {
         String domain = m_reportDelegate.getDomain(report);
         HourlyReport r = m_reportDao.createLocal();

         r.setName(m_name);
         r.setDomain(domain);
         r.setPeriod(period);
         r.setIp(ip);
         r.setType(1);

         m_reportDao.insert(r);

         int id = r.getId();
         byte[] binaryContent = m_reportDelegate.buildBinary(report);
         HourlyReportContent content = m_reportContentDao.createLocal();

         content.setReportId(id);
         content.setContent(binaryContent);
         m_reportContentDao.insert(content);
         m_reportDelegate.createHourlyTask(report);
      } catch (Throwable e) {
         Cat.getProducer().logError(e);
      }
   }
}

public Map<String, T> loadHourlyReports(long startTime, StoragePolicy policy, int index) {
   Transaction t = Cat.newTransaction("Restore", m_name);
   Map<String, T> reports = m_reports.get(startTime);
   Cat.logEvent("Restore", m_name + ":" + index);
   ReportBucket bucket = null;

   if (reports == null) {
      reports = new ConcurrentHashMap<String, T>();
      m_reports.put(startTime, reports);
   }

   try {
      bucket = m_bucketManager.getReportBucket(startTime, m_name, index);

      for (String id : bucket.getIds()) {
         String xml = bucket.findById(id);
         T report = m_reportDelegate.parseXml(xml);

         reports.put(id, report);
      }

      m_reportDelegate.afterLoad(reports);
      t.setStatus(Message.SUCCESS);
   } catch (Throwable e) {
      t.setStatus(e);
      Cat.logError(e);
      m_logger.error(String.format("Error when loading %s reports of %s!", m_name, new Date(startTime)), e);
   } finally {
      t.complete();

      if (bucket != null) {
         m_bucketManager.closeBucket(bucket);
      }
   }
   return reports;
}

报表的表结构如下:

数据库的持久化完成标志着一个完整周期的结束,CAT实时处理报表都是产生小时级别统计,小时级报表中会带有最低分钟级别粒度的统计,在数据库持久化完成之后,我们会调用 m_reportDelegate.createHourlyTask(report) 创建一些定时任务,去创建、天模式、周模式、月模式等等粒度更粗的视图,这里不再展开。

LogView的保存有后台线程(默认20个,Daemon Thread [cat-Message-Gzip-n])轮询处理:会间隔一段时间后从消息队列中拿取MessageTree,并进行编码压缩,保存到\data\appdatas\cat\bucket\dump\年月\日\domain-ip1-ip2-ipn目录下

LocalMessageBucketManager

public void run() {
   try {
      while (true) {
         MessageItem item = m_messageQueue.poll(5, TimeUnit.MILLISECONDS);

         if (item != null) {
            m_count++;
            if (m_count % (10000) == 0) {
               gzipMessageWithMonitor(item);
            } else {
               gzipMessage(item);
            }
         }
      }
   } catch (InterruptedException e) {
      // ignore it
   }
}

DumpAnalyzer — 原始消息LogView存储

DumpAnalyzer 与其它分析器有点不同,它不是为了报表而设计,而是用于原始消息LogView的存储,与报表统计不一样,他的数据量非常大。因为数据量比较大所以存储整体要求就是批量压缩以及随机读,采用队列化、异步化、线程池等技术来保证。

当有客户端消息过来,DumpAnalyzer会调用本地消息处理器管理类(LocalMessageBucketManager) 的 storeMessage 方法存储消息,LocalMessageBucketManager是LogView管理的核心类,我们先看一看 LocalMessageBucketManager 对象的初始化函数 initialize() 的处理逻辑:

1、首先获取消息存储的基础路径(m_baseDir),默认是 /data/appdatas/cat/bucket/dump, 在 server.xml 中可以配置,消息在基础路径之内,会根据domain、机器、时间等元素来分门别类的存储。

2、开启 BlockDumper 线程, 将本地消息处理器(LocalMessageBucket)、阻塞队列(BlockingQueue)以及统计信息的指针传入BlockDumper 对象,当内存消息块到达 64K 的时候, 该线程会异步将内存消息块写入数据文件和索引文件。

3、开启LogviewUploader线程,将自己的指针、本地消息处理器、HDFS上传对象(HdfsUploader)以及配置管理器的指针传入LogviewUploader对象,该用于异步将文件上传到 HDFS, 前提是配置了 hdfs 上传配置。

4、开启20个消息压缩线程,并为每个线程分配一个阻塞队列,当DumpAnalyzer接收到消息请求,会将消息写入该队列,MessageGzip会轮训从队列取消息处理,注意这里虽然有20个队列,然而正常我们只插入前19个队列,只有在前面入队失败了,消息将会被插入最后那个队列,可以认为最后那个队列是前面队列的一个备用队列。

LocalMessageBucketManager.java

public void initialize() throws InitializationException {
   m_baseDir = new File(m_configManager.getHdfsLocalBaseDir(ServerConfigManager.DUMP_DIR));

   //开启守护线程
   Threads.forGroup("cat").start(new BlockDumper(m_buckets, m_messageBlocks, m_serverStateManager));
   Threads.forGroup("cat").start(new LogviewUploader(this, m_buckets, m_logviewUploader, m_configManager));

   for (int i = 0; i < m_gzipThreads; i++) {
      LinkedBlockingQueue<MessageItem> messageQueue = new LinkedBlockingQueue<MessageItem>(m_gzipMessageSize);

      m_messageQueues.put(i, messageQueue);
      //开启守护线程,20个压缩线程
      Threads.forGroup("cat").start(new MessageGzip(messageQueue, i));
   }
   m_last = m_messageQueues.get(m_gzipThreads - 1);
}

public void storeMessage(final MessageTree tree, final MessageId id) {
   boolean errorFlag = true;
   int hash = Math.abs((id.getDomain() + '-' + id.getIpAddress()).hashCode());
   int index = (int) (hash % m_gzipThreads);
   MessageItem item = new MessageItem(tree, id);
   LinkedBlockingQueue<MessageItem> queue = m_messageQueues.get(index % (m_gzipThreads - 1));
   boolean result = queue.offer(item);

   if (result) {
      errorFlag = false;
   } else {
      if (m_last.offer(item)) {
         errorFlag = false;
      }
   }

   if (errorFlag) {
      m_serverStateManager.addMessageDumpLoss(1);
   }
   logStorageState(tree);
}

当DumpAnalyzer接收到消息请求,会调用storeMessage(…) 函数处理消息,如上源码,函数会根据domain和客户端ip将消息均匀分配到那19个阻塞队列(LinkedBlockingQueue)中,然后MessageGzip会轮询从队列获取消息数据,调用gzipMessage(item)函数处理,每处理 10000 条消息,MessageGzip会上报一条Gzip压缩线程监控记录。

我们再看看最核心的 gzipMessage(MessageItem item) 函数的处理逻辑,CAT根据日期,周期小时,domain,客户端地址,服务端地址创建存储路径和文件,包含数据文件和索引文件, 例如 20180611/15/Cat-127.0.01-127.0.01、20180611/15/Cat-127.0.01-127.0.01.idx ,从前面可以看出Message-ID的前3段可以确定唯一的索引文件,每条消息的存储由本地消息处理器(LocalMessageBucket)控制,LocalMessageBucket 的 storeMessage(…)方法会将消息信息写入消息块(MessageBlock)对象存放在内存中,当MessageBlock数据块大小达到 64K 时,将内存数据(MessageBlock) 放入阻塞队列 (m_messageBlocks),异步写入文件,并清空内存MessageBlock。LocalMessageBucket 有个字段 m_blockSize 用于记录消息块总大小,注意这里的 64K 是压缩前的消息块总大小。

MessageGzip.java

public void run() {
   try {
      while (true) {
         MessageItem item = m_messageQueue.poll(5, TimeUnit.MILLISECONDS);

         if (item != null) {
            m_count++;
            if (m_count % (10000) == 0) {
               gzipMessageWithMonitor(item);
            } else {
               gzipMessage(item);
            }
         }
      }
   } catch (InterruptedException e) {
      // ignore it
   }
}
private void gzipMessageWithMonitor(MessageItem item) {
   Transaction t = Cat.newTransaction("Gzip", "Thread-" + m_index);
   t.setStatus(Transaction.SUCCESS);

   gzipMessage(item);
   t.complete();
}
private void gzipMessage(MessageItem item) {
   try {
      MessageId id = item.getMessageId();
      String name = id.getDomain() + '-' + id.getIpAddress() + '-' + m_localIp;
      String path = m_pathBuilder.getLogviewPath(new Date(id.getTimestamp()), name);
      LocalMessageBucket bucket = m_buckets.get(path);

      if (bucket == null) {
            //为空时先初始化
         synchronized (m_buckets) {
            bucket = m_buckets.get(path);
            if (bucket == null) {
               bucket = (LocalMessageBucket) lookup(MessageBucket.class, LocalMessageBucket.ID);
               bucket.setBaseDir(m_baseDir);
               bucket.initialize(path);

               m_buckets.put(path, bucket);
            }
         }
      }

      DefaultMessageTree tree = (DefaultMessageTree) item.getTree();
      ByteBuf buf = tree.getBuffer();
        //当size小鱼64K的时候会返回null
      MessageBlock block = bucket.storeMessage(buf, id);

      if (block != null) {
         if (!m_messageBlocks.offer(block)) {
            m_serverStateManager.addBlockLoss(1);
            Cat.logEvent("DumpError", tree.getDomain());
         }
      }
   } catch (Throwable e) {
      Cat.logError(e);
   }
}



public MessageBlock storeMessage(final ByteBuf buf, final MessageId id) throws IOException {
   synchronized (this) {
      int size = buf.readableBytes();

      m_dirty.set(true);
      m_lastAccessTime = System.currentTimeMillis();
      m_blockSize += size;
      m_block.addIndex(id.getIndex(), size);
      buf.getBytes(0, m_out, size); // write buffer and compress it

      if (m_blockSize >= MAX_BLOCK_SIZE) {
        //大于 64K 时,将内存数据(MessageBlock) 放入阻塞队列 (m_messageBlocks),异步写入文件,并清空内存MessageBlock
         return flushBlock();
      } else {
         return null;
      }
   }
}

从上代码可以看出,当 storeMessage(…) 返回不为空的消息块(MessageBlock)时,则认为内存数据已经达到64K,需要写入文件,MessageGzip将消息块推入阻塞队列m_messageBlocks, BlockDumper线程会对队列进行消费, 它在实例化的时候会创建一个执行线程池 m_executors,然后 BlockDumper 线程轮询从阻塞队列取消息块(MessageBlock),为每个消息块创建一个块写入任务(FlushBlockTask),并将任务提交给执行线程池执行。FlushBlockTask实际会调用BlockDumper的flushBlock(block)函数将MessageBlock写入文件。

最终写入操作,还是得由LocalMessageBucket的MessageBlockWriter来完成,接下来我们介绍下本地消息处理器(LocalMessageBucket),它是一个控制消息数据读写的对象,数据在内存中的载体是消息块(MessageBlock),LocalMessageBucket 在gzipMessage(…)被首次实例化、初始化,初始化过程中会创建一个消息块(MessageBlock)、消息块读处理对象(MessageBlockReader)、消息块写处理对象(MessageBlockWriter)、、缓冲区以及缓冲区压缩流。

MessageBlock 包含4个信息:文件路径、数据缓冲区、每条ID的序列号、每条消息数据的大小(压缩前)。

消息块读处理对象负责消息的读取操作。

消息块写处理对象则负责数据文件、索引文件的写入操作,他会维护一个文件游标偏移量,记录压缩消息块(MessageBlock)在数据文件中的起始位置,即图中块地址,下面是具体的写逻辑,先写索引文件,CAT先获取消息块中消息总条数,为每个Message-ID都写一个索引记录,每条消息的索引记录长度都是48bits,索引根据Message-ID的第四段(序列号)来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置,48bits索引包含32bits的块地址 和 16bits 的块内偏移地址,前者记录压缩消息块(MessageBlock)在数据文件中的偏移位置,由于消息块包含多条消息,我们需要16bits来记录消息在消息块中的位置,注意这里指解压后的消息块。写完索引文件再写入数据文件,每一段压缩数据,前4位都是压缩块的大小,后面才是消息块的实际数据。

MessageBlockWriter.java

public synchronized void writeBlock(MessageBlock block) throws IOException {
   int len = block.getBlockSize();
   byte[] data = block.getData();
   int blockSize = 0;

   for (int i = 0; i < len; i++) {
      int seq = block.getIndex(i);
      int size = block.getSize(i);

      m_indexFile.seek(seq * 6L);
      m_indexFile.writeInt(m_blockAddress);
      m_indexFile.writeShort(blockSize);
      blockSize += size;
   }

   m_dataFile.writeInt(data.length);
   m_dataFile.write(data);
   m_blockAddress += data.length + 4;
}

CAT在写数据一份是Index文件,一份是Data文件.

Data文件是分段GZIP压缩,每个分段大小小于64K,这样可以用16bits可以表示一个最大分段地址。
一个Message-ID都用需要48bits的大小来存索引,索引根据Message-ID的第四段来确定索引的位置,比如消息Message-ID为ShopWeb-0a010680-375030-2,这条消息ID对应的索引位置为2*48bits的位置。
48bits前面32bits存数据文件的块偏移地址,后面16bits存数据文件解压之后的块内地址偏移。
CAT读取消息的时候,首先根据Message-ID的前面三段确定唯一的索引文件,在根据Message-ID第四段确定此Message-ID索引位置,根据索引文件的48bits读取数据文件的内容,然后将数据文件进行GZIP解压,在根据块内便宜地址读取出真正的消息内容。

一定得注意的是,同一台客户端机器产生的Message-ID的第四段,即当前小时的顺序递增号,在当前小时内一定不能重复,因为在服务端,CAT会为每个客户端IP、每个小时的原始消息存储都创建一个索引文件,每条消息的索引记录在索引文件内的偏移位置是由顺序递增号决定的,一旦顺序号重复生成,那么该小时的重复索引数据将会被覆盖,导致我们无法通过索引找到原始消息数据。

报表展示

服务器在初始化CatServlet 之后, 会初始化 MVC,MVC也是继承自AbstractContainerServlet , 同样也是一个 Servlet 容器,这是一个非常古老的MVC框架,当时Spring MVC 还并不成熟,但是所有MVC框架的核心思想都是一致的。此处不再展开