该文为个人学习笔记,仅供参考。 更多内容关注本人Halo^0博客

Hadoop概述

概述

Hadoop是一个由Apache基金会所开发的分布式系统基础架构。

用户可以在不了解分布式底层细节的情况下,开发分布式程序。充分利用集群的威力进行高速运算和存储。

Apache Hadoop 原本来源于 Google 一款名为MapReduce的编程模型包。

​ GFS -> HDFS ​ MapReduce -> MapReduce ​ BigTable -> HBase

Hadoop主要组成

  • Hadoop Common:为其他Hadoop模块提供基础设施。

  • Hadoop HDFS:一个高可靠、高吞吐量的分布式文件系统。

    • 分布式
    • 安全性
    • 副本数据
  • Hadoop MapReduce:一个分布式的离线并行计算框架。

    • 分布式

    • 思想

      • 分而治之

        大数据集分为小的数据集

        每个数据集,进行逻辑业务处理(map)

        合并统计数据结果(reduce)

  • Hadoop YARN:一个新的MapReduce框架,任务调度与资源管理。

    分布式资源管理框架

    • 管理整个集群的资源(内存、CPU核数)
    • 分配调度集群的资源

HDFS 分布式文件系统

HDFS架构

NameNode 用于保存元数据,处理客户端请求 管理节点 ​ 管理文件系统的命名空间。维护着文件系统树及整棵树内所有的文件和目录。这些信息以命名空间镜像文件和编辑日志文件的形式永久保存在本地磁盘上。

Client 发起请求 ​ 代表用户通过namenode和datanode交互来访问整个文件系统。提供文件系统接口。

DataNode 保存具体数据 ​ 文件系统的工作节点。他们根据需要存储并检索数据块(受客户端或namenode调度)。

SecondaryNode 用于同步元数据, ​ HDFS没有namenode将无法运行。所以secondarynode作为第二名称节点,用来监控HDFS状态的辅助后台程序,每隔一段时间获取HDFS元数据的快照。

Hadoop读写流程

hdfs读写流程

HDFS写操作

HDFS写操作

  1. 客户端向Namenode发起写操作请求,Namenode检查文件是否存在并检查权限,如果存在,则返回exist,不能上传,不可覆盖,如果不存在,那么Namenode会创建一个临时文件用于存储。

  2. Namenode放置副本并遵循就近原则,即如果客户端在该集群且有Datanode节点,那么优先在该节点放置第一块副本,再将另外两块副本放置到另外两台Datanode(文件副本数通常为3,可根据需求及集群性能自定义),并根据距离排序,将Datanode列表返回给客户端。

    如果为多集群,假定3个集群,则每个集群中一台Datanode节点保存一份副本

  3. 客户端根据Namenode返回的Datanode列表连接第一个节点,且第一个节点与其余两个节点根据顺序保持串联连接关系,这种并联连接关系被成为Pipeline(管道)连接。

  4. 客户端将文件切分成block(block默认128MB)

    在写入过程中,block又被切分成package(默认为64KB),以流式向DataNode写入block文件。

  5. Datanode各节点每传输完一个block后,会返回验证信息

    每次验证是在写完一个block后,并非在传输package完成时

  6. 完成文件写入,关闭输出流。

  7. 发送完成信号给Namenode

    在传输过程中,其中一个Datanode挂掉了,Namenode会在整个流程结束,收集并验证Datanode的信息,发现此文件的副本数没有达到要求,则将挂掉的datanode移出Pipeline,然后再寻找另一个可用Datanode节点保存副本

HDFS读操作

HDFS读操作

  1. 客户端发起向Namenode读请求并获取元数据信息。

  2. 客户端根据Namenode返回的信息,就近选择Datanode节点并与各个节点建立输入流。

  3. Datanode向输入流中写数据。

  4. 客户端下载完成block后验证,保证块数据的完整性。

    就近原则:本地,最近的距离;同机架,次之的距离;other(数据中心),最远的距离;

HDFS启动流程

心跳机制

namenode从集群中的每个datanode接收心跳信号和快状态报告(Blockreport)。

  1. 接收心跳信号

    默认每3秒一次,表示改datanode节点工作正常

    心跳返回结果带有namenode给该datanode的命令,如复制删除等等。

    超过10分钟namenode没有收到某个datanode的心跳,则认为该节点不可用。

  2. 块状态报告

    datanode启动后,向namenode注册,周期性(1小时)的向namenode报告块信息。

  • 数据损坏处理

​ block创建时会产生checksum,当datanode读取block的时候,它会重新计算得到checksum,如果与初始checksum不同,则说明该block已经损坏,Client读取其他datanode上的block。那么弄得标记该块已损坏,复制block以达到预期的副本数量。

  • Namenode元数据合并同步

NameNode的元数据操作先往edits文件中写, 当edits文件达到一定的阈值(3600秒或一定的事务数量)的时候,会开启合并的流程。

合并流程:

1.当开始合并的时候,SecondaryNameNode会把edits和fsimage拷贝到所在服务器所在内存中,合并生成名为fsimage.ckpt的文件。

2.将fsimage.ckpt文件拷贝到NameNode上,删除原有的fsimage,并将fsimage.ckpt重命名为fsimage。

3.当SecondaryNameNode将edits和fsimage拷贝走之后, NameNode会立刻生成一个edits.new文件,用于记录新来的元数据,当合并完成之后,原有的edits文件才会被删除,并将edits.new文件重命名为edits文件。

  1. 在达到一定阈值 开启下一轮合并,只拷贝edits文件。

启动流程

  1. 启动HDFS相关进程,进入安全模式(只读不写)
  2. 加载元数据,namenode等待datanode注册
  3. datanode周期性的向namenode发送心跳
  4. 离开安全模式

Yarn集群资源管理系统

Yarn架构

ResourceManger 任务的调度和资源的管理(CPU、内存)管理集群上资源使用的资源管理器。

Container 对环境的抽象,封装了CPU,内存,环境变量等等多维的资源。运行在集群中所有节点上且能够启动和监控的容器。

NodeManger 单个节点的资源管理,节点管理器

application master 为任务程序申请资源,任务的监控和容错

Yarn运行机制

任务提交流程

Yarn工作流程

MapReduce编程模型

概述

MapReduce应用广泛的原因之一就是其易用性,提供了一个高度抽象画二变得非常简单的编程模型,他是总结大量应用的共同特点的基础上抽象出来的分布式计算框架,在其编程模型中,任务可以被分解成相互独立的子问题。任务过程分为两个处理阶段:map阶段和reduce阶段。每阶段都以键-值对作为输入和输出,其数据类型可自定义,需要编写map和reduce函数。

MapReduce作业(job)是客户端需要执行的一个工作单元:它包括输入数据、MapReduce程序和配置信息。Hadoop将作业分成若干个任务(task)来执行,其中包括两类任务:map任务和reduce任务。

MapReduce编程模型给出分布式编程方法的5个步骤:

  1. 迭代,遍历输入数据,将其解析成key/value对;
  2. 将输入key/value对映射map成另外一些key/value对;
  3. 根据key对中间结果进行分组(grouping);
  4. 以组为单位对数据进行归约;
  5. 迭代,将最终产生的key/value对保存到输出文件中。

过程及作用

执行顺序:InputFormat - OutputFormat - Map - Shuffle - Reduce

MapReduce

InputFormat

OutputFormat

Map

Reduce

Shuffle

Map Shuffle Phase
  • 进入环形缓冲区(默认100MB)

    默认情况下,当达到环形缓冲区内存的80%,将会将缓冲区中的数据spill到本地磁盘中(溢出到MapTask所运行的NodeManager机器的本地磁盘中)

  • 溢写 并不是立即将缓冲区中的数据溢写到本地磁盘,而是需要经历一些操作

    • 分区paritioner

分区决定Map Task输出的数据进入哪个Reduce,且分区数量等同于Reduce数量。

默认情况下,key采用HashParitioner

  • 自定义Paritioner用途

    解决数据倾斜:另一种需要我们自己定义一个Partitioner的情况是各个Reduce task处理的键值对数量极不平衡。对于某些数据集,由于很多不同的key的hash值都一样,导致这些键值对都被分给同一个Reducer处理,而其他的Reducer处理的键值对很少,从而拖延整个任务的进度。当然,编写自己的Partitioner必须要保证具有相同key值的键值对分发到同一个Reducer。

  • 对于map输出的每一个键值对,系统都会给定一个partition,partition值默认通过计算key的hash值后对Reduce task的数量取模获得。如果一个键值对的partition值为1,意味着这个键值对会交给第一个Reducer处理。

    //HashPartitioner
    int getParitition(key, value, numreducetask) {
    	return ( key.hashCode&Integer.maxValue)%numreducetask;
    }
    
  • 排序sorter

    会对每个分区中的数据进行排序,默认情况下依据key进行排序。

    • spill溢写

      将分区排序后的数据写到本地磁盘的一个文件中,重复上述操作,产生多个小文件。

  • 当溢写结束后

    [可选]combiner:Map端的reduce,在分区前

    [可选]compress:减少数据量,减少网络IO处理,但压缩消耗CPU性能,也需要时间。

Reduce Shuffle Phase

Reduce端的shuffle主要包括三个阶段,copy,sort(merge),reduce

  • Copy
  • 排序(merge)
  • 分组group

Map-side tuning properties

Property nameTypeDefault valueDescription
mapreduce.task.io.sort.mbint100The size, in megabytes, of the memory buffer to use while sorting map output.
mapreduce.map.sort.spill.percentfloat0.80The threshold usage proportion for both the map output memory buffer and the record boundaries index to start the process of spilling to disk.
mapreduce.task.io.sort.factoint10The maximum number of streams to merge at once when sorting files. This property is also used in the reduce. It's fairly common to increase this to 100.
mapreduce.map.combine.minspillsint3The minimum number of spill files needed for the combiner to run (if a combiner is specified).
mapreduce.map.output.compressbooleanfalseWhether to compress map outputs.
mapreduce.map.output.compress.codecClass nameorg.apache.hadoop.io.compress.DefaultCodecThe compression codec to use for map outputs.
mapreduce.shuffle.max.threadsint0The number of worker threads per node manager for serving the map outputs to reducers. This is a clusterwide setting and cannot be by individual jobs. 0 means use the Netty default of twice the number of available processors.

Reduce-side tuning properties

Property nameTypeDefault valueDescription
mapreduce.reduce.shuffle.parallelcopiesint5The number of threads used to copy map outputs to the reducer.
mapreduce.reduce.shuffle.maxfetchfailuresint10The number of times a reducer tries to fetch a map output before reporting the error.
mapreduce.task.io.sort.factorint10The maximum number of streams to merge at once when sorting files. This property is also used in the map.
mapreduce.reduce.shuffle.input.buffer.percentfloat0.70The proportion of total heap size to be allocated to the map outputs buffer during the copy phase of the shuffle.
mapreduce.reduce.shuffle.merge.percentfloat0.66The threshold usage proportion for the map outputs buffer (defined by mapred.job.shuffle.input.buffer.percent)for starting the process of merging the outputs and spilling to disk.
mapreduce.reduce.merge.inmem.thresholint1000The threshold number of map outputs for starting the process of merging the outputs and spilling to disk. A value of 0 or less means there is no threshold, and the spill behavior is governed solely by mapreduce.reduce.shuffle.merge.percent.
mapreduce.reduce.input.buffer.percentfloat0.0The proportion of total heap size to be used for retaining map outputs in memory during the reduce. For the reduce phase to begin, the size of map outputs in memory must be no more than this size. By default, all map outputs are merged to disk before the reduce begins, to give the reducers as much memory as possible. However, if your reducers require less memory,this value may be increased to minimize the number of trips to disk

附录

集群架构设计示例:

masterslave1slave2slave3
HDFS datanodedatanodedatanode
namenode 9820secondaryNamenode
namenode web 9870secondaryNamenode web 9868
Yarn nodemanagernodemanagernodemanager
resourcemanager resourcemanager
resourcemanager web 8088
历史服务 HistroryServer 10020
HistroryServer web 19888

服务端口

2.x、3.x版本端口差别

摘自网络

分类应用Haddop 2.x portHaddop 3 port
NNPortsNamenode80209820
NNPortsNN HTTP UI500709870
NNPortsNN HTTPS UI504709871
SNN portsSNN HTTP500919869
SNN portsSNN HTTP UI500909868
DN portsDN IPC500209867
DN portsDN500109866
DN portsDN HTTP UI500759864
DN portsNamenode504759865
组件节点默认端口配置用途说明
HDFSDataNode50010dfs.datanode.addressdatanode服务端口,用于数据传输
HDFSDataNode50075dfs.datanode.http.addresshttp服务的端口
HDFSDataNode50475dfs.datanode.https.addresshttps服务的端口
HDFSDataNode50020dfs.datanode.ipc.addressipc服务的端口
HDFSNameNode50070dfs.namenode.http-addresshttp服务的端口
HDFSNameNode50470dfs.namenode.https-addresshttps服务的端口
HDFSNameNode8020fs.defaultFS接收Client连接的RPC端口,用于获取文件系统metadata信息。
HDFSjournalnode8485dfs.journalnode.rpc-addressRPC服务
HDFSjournalnode8480dfs.journalnode.http-addressHTTP服务
HDFSZKFC8019dfs.ha.zkfc.portZooKeeper FailoverController,用于NN HA
YARNResourceManager8032yarn.resourcemanager.addressRM的applications manager(ASM)端口
YARNResourceManager8030yarn.resourcemanager.scheduler.addressscheduler组件的IPC端口
YARNResourceManager8031yarn.resourcemanager.resource-tracker.addressIPC
YARNResourceManager8033yarn.resourcemanager.admin.addressIPC
YARNResourceManager8088yarn.resourcemanager.webapp.addresshttp服务端口
YARNNodeManager8040yarn.nodemanager.localizer.addresslocalizer IPC
YARNNodeManager8042yarn.nodemanager.webapp.addresshttp服务端口
YARNNodeManager8041yarn.nodemanager.addressNM中container manager的端口
YARNJobHistory Server10020mapreduce.jobhistory.addressIPC
YARNJobHistory Server19888mapreduce.jobhistory.webapp.addresshttp服务端口
HBaseMaster60000hbase.master.portIPC
HBaseMaster60010hbase.master.info.porthttp服务端口
HBaseRegionServer60020hbase.regionserver.portIPC
HBaseRegionServer60030hbase.regionserver.info.porthttp服务端口
HBaseHQuorumPeer2181hbase.zookeeper.property.clientPortHBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
HBaseHQuorumPeer2888hbase.zookeeper.peerportHBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
HBaseHQuorumPeer3888hbase.zookeeper.leaderportHBase-managed ZK mode,使用独立的ZooKeeper集群则不会启用该端口。
HiveMetastore9083/etc/default/hive-metastore中export PORT=来更新默认端口
HiveHiveServer10000/etc/hive/conf/hive-env.sh中export HIVE_SERVER2_THRIFT_PORT=来更新默认端口
ZooKeeperServer2181/etc/zookeeper/conf/zoo.cfg中clientPort=对客户端提供服务的端口
ZooKeeperServer2888/etc/zookeeper/conf/zoo.cfg中server.x=[hostname]:nnnnn[:nnnnn],标蓝部分follower用来连接到leader,只在leader上监听该端口。
ZooKeeperServer3888/etc/zookeeper/conf/zoo.cfg中server.x=[hostname]:nnnnn[:nnnnn],标蓝部分用于leader选举的。只在electionAlg是1,2或3(默认)时需要。