前言
我想,作为开发人员,即便你不是大数据开发工程师,也应该听说过Hadoop。而且我认为,不论你是从事哪个开发岗位,都应该对它有所了解,因为Hadoop的思想已经渗透在众多技术中。
如果你之前对Hadoop还不太熟悉,希望通过本文对Hadoop的介绍、基本原理以及简单的实践案例让你了解它,并且在你之后的工作中有所帮助。
Hadoop :我可以处理海量数据
什么是 Hadoop?怎么处理大数据?在这里给没有概念的小伙伴介绍一下。
如上图,Hadoop 其实就是一个用 Java 编写的项目,包含了 HDFS、MapReduce 以及 Yarn 这几个核心组件。本文重点介绍 HDFS 和 MapReduce,Yarn 作为资源调度组件,本文就不重点说明了。
说起 HDFS 和 MapReduce,还要从Google的几篇论文开始聊起,《花了1个月学大数据,我想说...》中有详细说明,这里长话短说。
HDFS : 存储归我
HDFS 全称为分布式文件系统 (Hadoop Distributed File System),在大数据领域中用来存储海量的数据。
没错,只要是涉及到分布式,任何一个大问题都能分成多个小问题一一解决,即分而治之。
HDFS 的分而治之就是将一个体量巨大的文件切分成多个文件(数据块)分布在不同服务器上存储。这样即便是TB到PB级别的大文件,也不会对存储空间以及IO性能有太大影响。下面是 HDFS 的架构。
简单来说,一个大文件分布在不同节点一定需要统一管理,这里的节点在 HDFS 中就是 DataNode ,而统一管理的就是 NameNode。为了不偏离文章主题,这里不对此深入说明。
MapReduce :计算归我
在 MapReduce 没有出现之前,计算都是单机。如果是对大量数据进行计算,那对内存和CPU都需要特别高的要求。在2000年代主流内存容量最多为几十兆到几百兆,显然在单机处理不了大量数据的计算。
那 MapReduce 是怎么处理的?
既然大文件可以基于 HDFS 分布在不同节点,那计算程序也分布到这些节点计算,最后把结果汇总不就好了?
MapReduce 的核心理念就是这样的:“相比移动数据,移动计算更为经济高效。”,这也影响了后续众多的技术。
MapReduce 的计算流程大概如下图,有分布式系统开发经验的小伙伴应该不难理解。
对 Hadoop 有了基本概念后,下面用一个非常典型的面试题进行实操,看看如何基于Hadoop处理“统计10G大小的文件中字符出现的次数”。
Hadoop 环境搭建
Hadoop本身就是一个Java项目,所以,对于Javaer来说搭建起来也比较容易。下面是搭建步骤:
-
前往Hadoop官方下载二进制包并在本地解压。
-
配置环境变量,其实就是指向到Hadoop的几个目录,大概是这样:
export JAVA_HOME=/xxx export HADOOP_HOME=/xxx export HADOOP_HOME=/xxx export PATH=$PATH:$JAVA_HOME/bin:$JAVA_HOME/lib/tools.jar:$JAVA_HOME/lib/dt.jar:$HADOOP_HOME/bin:$HADOOP_HOME/sbin export HADOOP_MAPRED_HOME=$HADOOP_HOME export HADOOP_COMMON_HOME=$HADOOP_HOME export HADOOP_HDFS_HOME=$HADOOP_HOME export YARN_HOME=$HADOOP_HOME
-
Hadoop是通过SSH进行节点间的通信和管理,所以各节点需要开启远程登录。以Mac系统为例,需要在下图中开启远程登录并且开启免密码登录。
免密码登录需要执行以下命令创建秘钥,并放在ssh授权目录。
ssh-keygen -t rsa -P '' -f ~/.ssh/id_rsa cat ~/.ssh/id_rsa.pub >> ~/.ssh/authorized_keys
-
配置 Hadoop 包下
./etc/hadoop
的几个文件- hadoop-env.sh 配置 JDK 环境
export JAVA_HOME=/xxx
- core-site.xml 配置 HDFS 服务发布地址和临时文件目录
<configuration> <property> <name>fs.defaultFS</name> <value>hdfs://localhost:9000</value> </property> <property> <name>hadoop.tmp.dir</name> <value>file:/xxx/tmp</value> </property> </configuration>
- hdfs-site.xml 配置数据存储相关内容
<configuration> <property> <name>dfs.replication</name> <value>1</value> </property> <property> <name>dfs.permissions</name> <value>false</value> </property> <property> <name>dfs.namenode.name.dir</name> <value>file:/xxx/namenode</value> </property> <property> <name>dfs.datanode.data.dir</name> <value>file:/xxx/datanode</value> </property> </configuration>
- mapred-site.xml 配置MapReduce任务调度平台
<configuration> <property> <name>mapreduce.framework.name</name> <value>yarn</value> </property> <property> <name>mapreduce.application.classpath</name> <value>/xxx</value> </property> <property> <name>mapreduce.shuffle.enable.ssl</name> <value>false</value> </property> </configuration>
- yarn-site.xml 配置
<configuration> <property> <name>yarn.nodemanager.aux-services</name> <value>mapreduce_shuffle</value> </property> </configuration>
-
在 Hadoop 包下的./bin目录下执行下面的命令初始化 hdfs
./hdfs namenode -format
-
在 Hadoop 包下的
./sbin
目录下执行下面的命令启动hdfs和yarn。./start-dfs.sh ./start-yarn.sh
启动成功后分别通过127.0.0.1:9870 、127.0.0.1:8088 访问,Hadoop就算搭建完成了。
统计10G大小的文件中字符出现的次数
准备10G的文件
我这里使用python脚本生成了以空格分割单词的文件,内容大概如下:
然后多次执行命令 cat random_words_in_chunks.txt >> 10G_words.txt
快速将文件扩大至10G。
上传至HDFS
通过执行下面的命令将这10G的文件上传到 HDFS 的test目录。
#创建文件目录
hadoop fs -mkdir -p hdfs://127.0.0.1:9000/test
#上传文件
hadoop fs -put 10G_words.txt /test
执行成功后就可以通过 Web UI界面看到数据。
接下来,就可以编写MapReduce程序,基于HDFS的数据进行分布式计算了。
编写 MapReduce 程序
之所以称之为 MapReduce,是因为 MapReduce 将计算分为两个阶段,一个Map阶段,一个Reduce阶段。
我们要统计字符出现的次数,只需要关注Map和Reduce分别做什么操作。
- 通过Map阶段可以将字符转换成我们熟悉的键值对,key为字符,value为数字,例如(“字符1”,1)。
- 通过Reduce阶段可以将同样key的value进行叠加,从而达到目的。
接下来看代码的具体编写。
创建一个Maven项目,引入相关依赖。
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-jobclient</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.3.6</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs-client</artifactId>
<version>3.3.6</version>
</dependency>
</dependencies>
MapReduce为我们提供了Mapper类来完成Map阶段的计算任务,所以,我们只需要继承Mapper类编写逻辑即可。代码如下
public class TokenizerMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private Text word = new Text();
public void map(Object key, Text value, Context context
) throws IOException, InterruptedException {
StringTokenizer itr = new StringTokenizer(value.toString());
while (itr.hasMoreTokens()) {
word.set(itr.nextToken());
context.write(word, one);
}
}
}
Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 的泛型分别是计算任务的输入的键值对类型以及输出的键值对类型。
Map阶段的输入还只是读取的一行字符,所以需要通过StringTokenizer将这些字符以空格进行分割得到每个字符,然后遍历转化为("字符",1)的键值对作为输出。
本次上传到 HDFS 的10G文件被分割为77个数据块,那么最终会有77个Map计算任务以及结果输出。
同样,MapReduce为我们提供了Reducer类来完成Reduce阶段的计算任务,所以,我们只需要继承Reducer类编写逻辑即可。代码如下
public class IntSumReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values,
Context context
) throws IOException, InterruptedException {
int sum = 0;
for (IntWritable val : values) {
sum += val.get();
}
result.set(sum);
context.write(key, result);
}
}
Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT> 的泛型同Mapper。
不同Map阶段输出的相同key最终会被同一个Reduce任务作为输入,在编码时需要将相同输出的值进行叠加,最终完成每个字符出现的次数。
除了要编写Mapper和Reducer类,还需要写一个程序入口,即main方法。代码最终会编译为jar包交给yarn,最终会有多少任务,资源的申请以及任务的分发都在这个类完成。代码如下:
public class WordCount {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
if (otherArgs.length < 2) {
System.err.println("Usage: wordcount <in> [<in>...] <out>");
System.exit(2);
}
Job job = Job.getInstance(conf, "word count");
job.setJarByClass(WordCount.class);
job.setMapperClass(TokenizerMapper.class);
job.setCombinerClass(IntSumReducer.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
for (int i = 0; i < otherArgs.length - 1; ++i) {
FileInputFormat.addInputPath(job, new Path(otherArgs[i]));
}
FileOutputFormat.setOutputPath(job,
new Path(otherArgs[otherArgs.length - 1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
启动分布式计算
以上的代码编写完后,将程序打成jar包,然后执行下面的命令就可以启动分布式计算。
hadoop jar mr-wordcount-1.0-SNAPSHOT.jar WordCount /test /testresult
命令中的 WordCount
是 main 方法类。后面的两个路径是HDFS的目录,一个代表要计算的数据在哪里,一个代码计算后的结果存在哪里。
命令执行后,可以通过yarn的 web ui 或者控制台看到任务分发和执行。
计算过程就如下图一样,程序被分发到数据所在的节点,然后计算结果最后进行汇总。
MapReduce 任务执行完成后,在 hdfs 的 web ui 界面就可以看到计算结果。
至此,10G文件字符统计就通过Hadoop的分而治之快速完成。而作为开发人员来讲,只需要编写两段极简单的代码,不要考虑资源如何分配、程序如何移动、计算结果如何聚合就完成了一个分布式系统。
总结
- Hadoop是基于Java开发的项目,所以对于Javaer来说容易上手。
- Hadoop 的出现大大降低了分布式编程的难度,它不单单是一个项目,更重要的是它的思想和理念:“分而治之”和“移动计算”。
- Hadoop 的影响不仅仅是大数据领域,在它之后的分布式数据库、微服务等等都受其影响。