Hadoop集群(二)——WordCount(单词统计)详解

MapReduce理论简介

MapReduce编程模型

『MapReduce』是一种可用于数据处理的编程模型。它的任务过程分为两个处理阶段: map 阶段和 reduce 阶段。每阶段都以 键-值对 作为输入和输出,其类型由我们按需选择。我们还需要写两个函数: map 函数和 reduce 函数。

map 函数由『Mapper』类来表示,后者声明一个抽象的 map() 方法。『Mapper』类是一个泛型类型,它有四个形参类型,分别指定 map 函数的输入键、输入值、输出键、输出值的类型。

同样, reduce 函数也有四个形式参数类型用于指定输入和输出类型。 reduce 函数的输入类型必须匹配 map 函数的输出类型

MapReduce处理过程

处理过程

『MapReduce』作业(job)对象指定作业执行规范,是客户端需要执行的一个工作单元:包括输入数据、『MapReduce』程序和配置信息。

Configuration conf= new Configuration();
Job job= Job.getInstance(conf);

我们可以用它来控制整个作业的运行。我们在『Hadoop』集群上运行这个作业时,要把代码打包成一个『JAR』文件(『Hadoop』在集群上发布这个文件)。不必明确指定『JAR』文件的名称,在『Job』对象的『setJarByClass()』方法中传递一个类即可,『Hadoop』利用这个类来查找包含它的『JAR』文件,进而找到相关的『JAR』文件。

job.setJarByClass(MyWordCount.class);

对『Job』进行合理的命名有助于更快地找到『Job』,以便在『JobTracker』和『Tasktracker』的页面中对其进行监视。

job.setJobName("myjob");

构造『Job』对象后,需要指定输入和输出数据的路径。调用『FileInputFormat』类的静态方法『addInputPath()』来定义输入数据的路径,这个路径可以是单个的文件、一个目录(此时,将目录下的所有文件当作输入)或符合特定文件模式的一系列文件。由函数名可知,可以多次调用『addInputPath()』来实现多路径的输入。

Path inPath= new Path("/user/root/test.txt");
FileInputFormat.addInputPath(job, inPath);

调用『FileOutputFormat』类中的静态方法,『setOutputPath()』来指定输出路径(只能有一个输出路径)。这个方法指定的是『reduce』函数输出文件的写入目录。在运行作业前该目录是不应该存在的(或者是空目录),否则『Hadoop』会报错并拒绝运行作业。这种预防措施的目的是防止数据丢失(长时间运行的作业如果结果被意外覆盖,肯定是非常恼人的)。

Path outPath= new Path("/output/wordcount");
// 如果输出路径存在,则先删除
if(outPath.getFileSystem(conf).exists(outPath)) {
    outPath.getFileSystem(conf).delete(outPath, true);
}
FileOutputFormat.setOutputPath(job, outPath);

接着,通过『setMapperClass()』和『setReducerClass()』方法指定要用的『map』类型和『reduce』类型。

job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);

『setOutputKeyClass()』和『setOutputValueClass()』方法控制『reduce』函数的输出类型,并且必须和『Reduce』类产生的相匹配。『map』函数的输出类型默认情况下和『reduce』函数是相同的,因此如果『mapper』产生出和『reducer』相同的类型时,不需要单独设置。但是,如果不同,则必须通过『setMapOutputKeyClass()』和『setMapOutputValueClass()』方法来设置map函数的输出类型。

job.setMapOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);

在设置定义『map』和『reduce』函数的类之后,可以开始运行作业。『Job』中的『waitForCompletion()』方法提交作业并等待执行完成。该方法唯一的参数是一个标识,指示是否已生成详细输出。当标识位『true』(成功)时,作业会把其进度信息写到控制台。

job.waitForCompletion(true);

『waitForCompletion()』方法返回一个布尔值,表示执行的成败,这个布尔值被转换成程序的退出代码『0』或者『1』。

打包并运行WordCount程序

Eclipse环境搭建

将下载好的『hadoop-2.9.2.tar.gz』和『hadoop-2.9.2-src.tar.gz』解压到英文路径下,并创建『hadoop-2.9.2-lib』目录。

解压路径

将『E:\caroly\hadoop-2.9.2\share\hadoop』目录下文件夹的根目录(除『httpfs』、『kms』)中的『jar』包和『lib』中的『jar』包复制到『hadoop-2.9.2-lib』目录。

配置Windows环境变量:

HADOOP_HOME:E:\caroly\hadoop-2.9.2

HADOOP_USER_NAME:root

Path:%HADOOP_HOME%\bin;


解压此文件夹并将里面的所有文件替换到『E:\caroly\hadoop-2.9.2\bin』中。

将『E:\caroly\hadoop-2.9.2\bin』中的『hadoop.dll』放到『C:\Windows\System32』中。

将『hadoop-eclipse-plugin-2.6.0.jar』放到『Eclipse』的安装目录『D:\eclipse\plugins』中。


打开『Eclipse』,在『Project Explorer』中会出现『DFS Locations』。

如果没有,点击右上角『Java EE』按钮,切换到『Java EE』项目下。

点击:Windows –> Preferences –> Hadoop Map/Reduce 。右边选择Hadoop路径。

![Hadoop Installation](http://caroly.site/caroly_img/Hadoop Installation.png)

依次点击:Windows –> Show View –> Other… –> MapReduce Tools –> Map/Reduce Locations 。

![DFS Locations](http://caroly.site/caroly_img/DFS Location.png)

在『Map/Reduce Locations』面板中右键,选择『New Hadoop location…』。

New Hadoop

在定位器中输入名称、去掉『DFS Master』面板中『Use M/R Master host』前面的勾,填入状态为『active』的主机名以及端口。

定位器


创建环境共享『jar』包:

依次点击:Windows –> Preferences –> Java –> Build Path –> User Libraries 。

环境共享jar包

点击『New…』,填入自定义名称。注:System library(added to the boot class path)前面不要勾选。

![new jars](http://caroly.site/caroly_img/new jars.png)

点击确定后面板中会出现『hadoop507_jars』。点击右侧的『Add External JARs…』,选择『E:\caroly\hadoop-2.9.2-lib』中的所有『jar』包。点击『Apply and Close』退出。

![Apply and Close](http://caroly.site/caroly_img/apply and close.png)


新建『Java』项目,右键项目选择Build Path –> Configure Build Path… 。

依次点击:Add Library… –> User Library 。选择刚刚创建的自定义共享包。

依次点击:Add Library… –> JUnit 。导入『Junit 4』。

所导入的包

将『caroly01』中的『Hadoop』配置文件(core-site.xml、hdfs-site.xml、mapred-site.xml、yarn-site.xml)复制到『src』下。


代码逻辑

MyWordCount.java

package com.caroly.mr.wc;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class MyWordCount {

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf= new Configuration();
        Job job= Job.getInstance(conf);

        job.setJarByClass(MyWordCount.class);

        job.setJobName("myjob");

        Path inPath= new Path("/user/root/test.txt");
        FileInputFormat.addInputPath(job, inPath);

        Path outPath= new Path("/output/wordcount");
        //如果输出路径存在,则先删除
        if(outPath.getFileSystem(conf).exists(outPath)) {
            outPath.getFileSystem(conf).delete(outPath, true);
        }

        FileOutputFormat.setOutputPath(job, outPath);

        job.setMapperClass(MyMapper.class);
        job.setReducerClass(MyReducer.class);

        job.setMapOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);

        job.waitForCompletion(true);
    }
}

MyMapper.java

package com.caroly.mr.wc;

import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<Object, Text, Text, IntWritable>{

    private final static IntWritable one = new IntWritable(1);
    private Text word= new Text();

    public void map(Object key, Text value, Context context) throws IOException, InterruptedException{
        StringTokenizer itr= new StringTokenizer(value.toString());
        while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
        }
    }
}

MyReducer.java

package com.caroly.mr.wc;

import java.io.IOException;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Reducer;

public class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    //迭代计算
    private IntWritable result= new IntWritable();

    public void reduce(Text key, Iterable<IntWritable> values, 
            Context context) throws IOException, InterruptedException {
        int sum= 0;
        for (IntWritable val : values) {
            sum+= val.get();
        }
        result.set(sum);
        context.write(key, result);
    }
}

运行并查看结果

在『caroly01』中运行如下『shell』

for i in `seq 100000`;do  echo "hello caroly $i" >> test.txt;done

会在当前目录生成一个大小为『1.9M』的『txt』文件。

测试文件

将这个文件上传到HDFS中:

hdfs dfs -mkdir -p /user/root
hdfs dfs -D dfs.blocksize=1048576 -put ~/software/test.txt /user/root

上传的测试文件分为两块,第一块大小是『1M』,第二块的大小为剩余字节大小。


将写好的单词统计代码打包成『jar』包。

右键此包–> Export… –> Java –> JAR file 。

生成jar包

将生成的『jar』包放到『caroly01』中。

运行如下命令进行单词统计:

hadoop jar ~/software/mywordcount.jar com.caroly.mr.wc.MyWordCount

运行如下命令查看结果:

hdfs dfs -cat /output/wordcount/part-r-00000

WordCount处理过程

一、将文件拆分成 splits,并将文件按行分割形成 形式。这一步由 MapReduce 框架自动完成,其中偏移量(即 key 的值)包括了回车所占的字符数( Windows 和 Linux 环境会不同)。

按行分割


二、将分割好的 交给我们定义的 map 方法进行处理,生成新的 对。

map处理


三、得到map方法输出的 对后,Mapper 会将它们按照 key 值进行排序,并执行 Combine 过程,将 key 值相同的的value累加,得到 Mapper 的最终输出结果。

Map端排序及Combine过程


四、Reducer 先对从 Mapper 接收的数据进行排序,再交由我们自定义的 reduce 方法进行处理,得到新的 对,并作为 WordCount 的输出结果。

Reduce端排序及输出结果


SQL操作

将数据文件放到『HDFS』上:

hdfs dfs -put wc /usr/

确保该目录下只有『wc』这一个文件。


创建两张表:

create external table wc
(
line string
)
location '/usr/';

create table wc_result
(
word string,
ct int
);

第一张表存放原始数据,第二张表存放结果数据。


执行如下 SQL:

from (select explode(split(line,' ')) word from wc) t
insert into wc_result
select word,count(word) group by word;

首先进行切割操作,对字段『line』按照空格进行切割,会形成数组;使用『explode』转成一列;再从这个结果中进行查询并写入结果表。该操作走『MapReduce』。

最后查询一下结果表:

select * from wc_result;