引言

众所周知,MapReduce编程框架(以下简称MR)一直是大并发运算以及海量数据读写应用设计的利器。在MR编程体系下,一个job通常会把输入的数据集切分为若干块,由map task以完全并行的方式处理消化这些数据块。框架会对map的输出先进行排序,然后把结果作为输入提交给reduce任务。通常作业的输入和输出都会被存储在文件系统中。整个框架负责任务的调度和监控,以及重新执行已经失败的任务。典型的MR程序有如下重要模块结构构成:

No.

模块

描述

1

InputFormat

定义map输入数据的格式

2

OutputFormat

定义reduce输出数据的格式

3

OutputKey

定义map输出数据中key的类型

4

OutputValue

定义map输出数据中value的类型

5

InputSplit

定义对输入数据进行切分的方式,以分配给Map task

6

configure

传入参数信息给mapreduce task

7

Mapper

定义map task

8

Reducer

定义reduce task

9

Collector

收集map/reduce task的输出数据

10

Reporter

计数器,用于汇报job执行过程一些特定事件的发生次数

类似hive这样的应用,拿到用户一句简单sql查询(假设无需二次执行的简单sql)后,将hdfs上的海量数据进行切分,然后每个map task分别对自己负责的那部分数据执行相同的sql查询,最后将各自获得的结果汇总输出给用户,这便可以保证在海量数据中以较快的速度获得查询结果。

简单介绍完MR编程框架后,我们再来谈谈常规压力测试的特点和需求。

LoadRunnerJMeter为例,这两种工具都可以对web应用进行大并发访问,模拟线上的高并发压力测试,并且也都相应的提供了多机联合产生负载这样的方式进一步模拟现实情况增大被测对象的压力。这是为了解决“如果一台测试机器模拟的虚拟用户数过多,他本身性能的下降也会直接影响到测试效果”这个问题。分析LRJMeter的多机联合产生负载这种测试方式,我们不难发现类似MR框架的一些特点,即测试分作如下几步(以LR为例):

1.      设置测试机,即在多台用于测试的机器上安装Load Generator

2.      设置测试任务,即各种configure

3.      同时调度测试任务,通过agent执行对web应用的访问

4.      Controller负责统一调度运行场景并收集测试信息和执行结果

 

无论是LR还是JMeter都是优秀的压测工具,但是总有一些非常规的压力测试场景无法通过LRJMeter方便的实现,例如对分布式系统做数据读写压力测试,被测目标并非一个单独的节点,而是由很多节点组成的,这样的压力测试场景意味着多机联合对单一节点产生的负载被分担到了很多个节点上。LRJMeter针对这样的场景往往在设置上就很复杂。

此外,对很多特定的压测目标,测试人员在设计了专属测试工具之后,往往也需要有一个类似上述LR测试步骤的过程,即工具分发、调度执行、收集结果和过程信息这样一个测试执行框架,如果自己去实现这一套框架,耗费的人月数都是相当可观的,且复用程度有限。于是在云梯项目中我通过自己的实践,想到了将MR编程框架体系与压力测试需求相结合。

从事例说起

先从简单实现类似LR多机联合负载这样一个压测场景展开。被测目标是这样的:一个web应用服务,用于收集分布式系统的跨机房流量信息,后端采用hbase作为存储数据库,接口为单一节点的http listen端口,需要模拟真实跨机房场景,利用较少的机器数量(约真实系统的50分之一)模拟线上系统的并发度。

测试工具代码是发送http request部分,出于安全考虑,重要部分略过:

while(System.currentTimeMillis() - start <= runtime){

          StringBuffer sb = new StringBuffer();

          List<String> data = new ArrayList<String>();

          HttpURLConnection httpurlconnection = null;

          try{

            URL url = new URL(this.reportAd);

            httpurlconnection = (HttpURLConnection) url.openConnection();

            httpurlconnection.setConnectTimeout(5000);

            httpurlconnection.setReadTimeout(5000);

            httpurlconnection.setDoOutput(true);

            httpurlconnection.setRequestMethod("POST");

            httpurlconnection.setRequestProperty("Content-type", "text/plain");

           

            for(long i=0; i<this.recordnum; i++){

                。。。。。。

                s = Math.abs(R.nextLong())%102400000+1024;

                staticWriteSize += s;

                reporter.incrCounter("TestTool", "Write Size", s);

                staticWriteTime += (endTime - startTime);

                reporter.incrCounter("TestTool", "Write Time", endTime - startTime);

                。。。。。。

              }else{

                。。。。。。

                reporter.incrCounter("TestTool", "Read Size", s);

                staticReadTime += (endTime - startTime);

                reporter.incrCounter("TestTool", "Read Time", endTime - startTime);

                。。。。。。

              }

              Pair p = value.get(R.nextInt(value.size()));

              。。。。。。

              staticCount++;

            }

            reporter.incrCounter("TestTool", "Record num", this.recordnum);

            reporter.setStatus("Record: "+staticCount+"("+staticWrite+"w, "+staticRead+"r), Write Size: "

                +staticWriteSize+", Write Time: "+staticWriteTime

                +", Read Size: "+staticReadSize+", Read Time: "+staticReadTime);

            httpurlconnection.getOutputStream().write(sb.toString().getBytes());

            httpurlconnection.getOutputStream().flush();

            httpurlconnection.getOutputStream().close();

            int code = httpurlconnection.getResponseCode();

            if(code != 200) {

              LOG.warn("send data to master server failed, code=" + code);

            }

            reporter.incrCounter("TestTool", "Http Post num", 1);

            map.staticPost.addAndGet(1);

            Thread.sleep(interval);

          } catch (Exception e) {

            map.staticPost.addAndGet(1);

            reporter.incrCounter("TestTool", e.getClass().toString(), 1);

            LOG.warn(e.getMessage(), e);

          } finally {

            if (httpurlconnection != null) {

              httpurlconnection.disconnect();

            }

          }

有了工具代码之后,我们通过实现Mapper来封装该工具,因此上述代码中我使用了“org.apache.hadoop.mapred.Reporter”的方法“incrCounter(Stringarg0, String arg1, long arg2)”来对测试中的重要过程数据进行计数,该方法会将所有map/reduce task中汇报的arg0|arg1定义的值arg2进行相加,输出到MRjobtracker页面上,通过观察作业执行页面可以实时获取这些测试执行过程信息。此外,我还调度了“setStatus(String arg0)”方法,该方法可以实时更新当前所处task的页面信息,提供更详细的单个task执行情况信息。jobdetails.jsp页面观察结果如下所示:

 

Counter

Map

Reduce

Total

TestTool

Http Post num

xxxx

0

xxxx

Record num

xxxx

yy

zzzz

Read Size

xx

0

xx

jobtasks.jsp页面观察结果如下所示:

Task

Complete

Status

Start Time

Finish Time

Errors

Counters

task_xx_m_1

自定义信息1

26-Oct-2013 22:41:41

 

 

10

task_xx_m_2

自定义信息2

26-Oct-2013 22:41:41

 

 

10

 

更多的执行过程信息,我们则通过“org.apache.commons.logging.Log”来收集,通过tasklog页面可以查阅到这些详细的日志信息。

作为map task的输入,我通过在hdfs上生成的一堆随机数据来实现,InputSplit类读取了hdfs上作为模拟真实数据的输入后,将其根据map数切分成n份(n=自定义的map数量),并将其分发给对应的map taskmap task拿到自己那份数据后,立即启动多个线程执行上述测试工具代码:

public void map(LongWritable key, List<Pair> value,

        OutputCollector<Text, LongWritable> context, Reporter reporter)

        throws IOException {

      。。。。。。

      for(int i=0; i < this.threadnum; i++){

        SCNThread t = new SCNThread(value, reporter, start, this);

        t.start();

        this.alivethread.addAndGet(1);

      }

      。。。。。。

task自行同步线程启动数量,当所有线程都启动之后,输出收集器开始运作:

long now = System.currentTimeMillis();

      if (now > this.start + this.interval) {

        this.start += this.interval;

        context.collect(new LongWritable(this.start), new LongWritable(

            this.writeBlocks));

      }

。。。。。。

可以看到key是时间戳,value是我们想收集的数值,收集器收集到的数据将进一步提供给Reducer来分析,这里有一个压力测试的关键点,即最大并发开始时间点和结束时间点的判断。观察Reducer类的reduce方法:

public void reduce(Text key, Iterator<LongWritable> value,

        OutputCollector<Text, Text> context, Reporter reporter)

由于所有map都以相同的时间戳作为key,因此同一时刻迭代器valuesize代表了有多少个map已经达到了最大并发度,我们判断这个size,当其与我们预期的map总数一致时,则可以将该时间戳作为最大并发压力的开始时间点,当size开始小于预期map总数时,则代表最大并发压力的结束时间点,测试结果分析时可以掐取这一段数据作为测试结果,免去开始准备阶段和快结束阶段压力变小对测试结果的干扰。

更进一步我们可以在hdfs上设计一个标志位,当一个maptask执行完毕之后,通过该标志位通知到其他所有map task,以便快速结束当前的测试。

测试结果被reducer分析汇总后输出到hdfs上,最终我们只需要查看一下这个输出文件的内容就可以得到我们需要的测试结果了。

其实我们不难发现,这种测试框架与DDoS攻击很类似,当手握数千台机器之后,基本上就具备了指哪毁哪的能力,MR框架体系蕴藏的能量的确是非常巨大的。

 

流程图

没有流程图,上述文字描述终归不够直观,因此详细流程请看下图所示:

多语言测试工具的支持

对于java类测试工具,我们可以应用该流程图所示方案进行大并发度的压力测试,对于非java语言类的测试工具,我们一方面可以自行撰写其他编程语言的进程调度和收集器,另一方面也可以使用hadoop streaming这个编程工具来实现。Hadoop Streaming Hadoop提供的一个编程工具,它允许用户使用任何可执行文件或脚本文件作为 Mapper Reducer。这样一来我们用pythonshell编写的测试工具也可以通过streaming简单的调度起来执行。有关streaming编程工具的技术细节请自行搜索脑补,这里不再赘述。

 

第二个例子

第二个例子是关于如何使用MR编程框架压测HDFS文件系统的,该例子涉及到Hadoop更底层的技术细节,以及对性能指标的分析等内容,因此留作《工欲善其事必先利其器》系列的下一弹专门介绍一下,敬请期待。

 

总结

为什么把本篇作为系列文章的首弹,是因为Hadoop相关的测试工具大多不离该篇所使用到的技术。Hadoop是一个生态圈,因此测试工具作为生态圈的一部分也没必要且不应该脱离这个生态圈去独立生存。MR编程框架大大缩减了分布式应用程序的开发周期,其编程思想更值得每个码农去深挖学习。本弹是笔者一个非常粗浅的思考开端,期望能够抛砖引玉与更多码农进行分享和探讨。系列文章之二《HDFS性能压测工具浅析》将进一步分享一下有关HDFS相关的技术细节。


做个广告:

 奔走相告邀伙伴,来往双11,6000万现金红包任你拿 http://t.cn/zR902rV 


登录就有2元红包