一、HDFS的优缺点,对于FSDataset后面会继续讨论

作者: 网络资讯  发布:2019-11-07

DataNode.java主要包含三个类DataNode主类、DataTransfer和BlockRecord辅助类。
一、看主类DataNode:
1、主要的数据成员 

DataXceiverServer是Hadoop分布式文件系统HDFS的从节点--数据节点DataNode上的一个后台工作线程,它类似于一个小型的服务器,被用来接收数据读写请求,并为每个请求创建一个工作线程以进行请求的响应。那么,有以下几个问题:

我们知道,HDFS全称是Hadoop Distribute FileSystem,即Hadoop分布式文件系统。既然它是一个分布式文件系统,那么肯定存在很多物理节点,而这其中,就会有主从节点之分。在HDFS中,主节点是名字节点NameNode,它负责存储整个HDFS中文件元数据信息,保存了名字节点第一关系和名字节点第二关系。名字节点第一关系是文件与数据块的对应关系,在HDFS正常运行期间,保存在NameNode内存和FSImage文件中,并且在NameNode启动时就由FSImage加载,之后的修改则保持在内容和FSEdit日志文件中,而第二关系则是数据块与数据节点的对应关系,它并非由名字节点的FSImage加载而来,而是在从节点DataNode接入集群后,由其发送心跳信息汇报给主节点NameNode。

 HDFS是Hadoop Distribute File System 的简称,也就是Hadoop的一个分布式文件系统。

  1. public DatanodeProtocol namenode = null;  
  2.   public FSDatasetInterface data = null;  
  3.   public DatanodeRegistration dnRegistration = null;  
  4.   volatile boolean shouldRun = true;  
  5.   private LinkedList<Block> receivedBlockList = new LinkedList<Block>();  
  6.   /** list of blocks being recovered */  
  7.   private final Map<Block, Block> ongoingRecovery = new HashMap<Block, Block>();  
  8.   private LinkedList<String> delHints = new LinkedList<String>();  
  9.   public final static String EMPTY_DEL_HINT = "";  
  10.   AtomicInteger xmitsInProgress = new AtomicInteger();  
  11.   Daemon dataXceiverServer = null;  
  12.   ThreadGroup threadGroup = null;  

        1、DataXceiverServer是什么?

        那么,何为心跳呢?心跳就是HDFS中从节点DataNode周期性的向名字节点DataNode做汇报,汇报自己的健康情况、负载状况等,并从NameNode处领取命令在本节点执行,保证NameNode这一HDFS指挥官熟悉HDFS的全部运行情况,并对从节点DataNode发号施令,以完成来自外部的数据读写请求或内部的负载均衡等任务。

一、HDFS的优缺点

 

        2、DataXceiverServer是如何初始化的?

        我们知道,Hadoop2.x版本中,做了两个比较大的改动:一是引入了联邦的概念,允许一个HDFS集群提供多个命名空间服务,第二个是利用HA解决了NameNode单点故障问题,引入了Active NN和Standby NN的概念。

1.HDFS优点:

  1.  DataNodeInstrumentation myMetrics;     
  2.  private Thread dataNodeThread = null;   public DataBlockScanner blockScanner = null;        
  3.  public Daemon blockScannerThread = null;         
  4.  public Server ipcServer;   

        3、DataXceiverServer是如何工作的?

        本篇文章,结合Hadoop2.6.0的源码,我们先来看下心跳汇报的整体结构。

  a.高容错性

namenode :执行NN RPC调用的对象。

        带着这些问题,本文将带着你进入DataNode的DataXceiverServer世界。

        众所周知,心跳汇报是从节点DataNode主动发起的周期性向主节点NameNode汇报的一个动作,所以这个问题的突破口自然而然就落在了数据节点DataNode上了。在DataNode内部,有这么一个成员变量blockPoolManager,定义如下:

    .数据保存多个副本

data :DN上管理存储数据结构的对象,对于FSDataset后面会继续讨论。

 

 

    .数据丢的失后自动恢复

dnRegistration :存储NN需要的所有的DN的信息。

        一、DataXceiverServer是什么?

[java] view plain copy

  b.适合批处理

receivedBlockList :记录DN接受到的数据块的链表,通过该数据结构向NN报告接收到的block的信息

        DataXceiverServer是数据节点DataNode上一个用于接收数据读写请求的后台工作线程,为每个数据读写请求创建一个单独的线程去处理。它的成员变量如下:

 

    .移动计算而非移动数据

ongoingRecovery:记录正在进行recovery的block

 

 图片 1图片 2

    .数据位置暴露给计算框架

delHints :receivedBlockList 的副本,暂时没有弄清楚使用两个相同链表的原因图片 3

[java] view plain copy

  1. // 每个DataNode上都有一个BlockPoolManager实例  
  2. private BlockPoolManager blockPoolManager;  

  c.适合大数据处理

myMetrics:记录DN的运行过程中的一些信息。dataNodeThread :DN的主线程。

 

        它是每个DataNode上都会存在的BlockPoolManager实例。那么这个BlockPoolManager是什么呢?看下它的定义及成员变量就知道了,代码如下:

    .GB、TB、甚至PB级的数据处理

dataXceiverServer:DN上接受数据的线程

 图片 4图片 5

 

    .百万规模以上的文件数据

threadGroup :建立一个线程组,主要管理dataXceiverServer和recoverBlocks的线程,有针对线程组的函数,方便对多个线程进行操作。

  1. // PeerServer是一个接口,实现了它的TcpPeerServer封装饿了一个ServerSocket,提供了Java Socket服务端的功能  
  2. private final PeerServer peerServer;  
  3.   
  4. // 该DataXceiverServer所属DataNode实例datanode  
  5. private final DataNode datanode;  
  6.   
  7. // Peer所在线程的映射集合peers  
  8. private final HashMap<Peer, Thread> peers = new HashMap<Peer, Thread>();  
  9.   
  10. // Peer与DataXceiver的映射集合peersXceiver  
  11. private final HashMap<Peer, DataXceiver> peersXceiver = new HashMap<Peer, DataXceiver>();  
  12.   
  13. // DataXceiverServer是否已关闭的标志位closed  
  14. private boolean closed = false;  
  15.   
  16. /** 
  17.  * Maximal number of concurrent xceivers per node. 
  18.  * Enforcing the limit is required in order to avoid data-node 
  19.  * running out of memory. 
  20.  *  
  21.  * 每个节点并行的最大DataXceivers数目。 
  22.  * 为了避免dataNode运行内存溢出,执行这个限制是必须的。 
  23.  * 定义是默认值为4096. 
  24.  */  
  25. int maxXceiverCount =  
  26.   DFSConfigKeys.DFS_DATANODE_MAX_RECEIVER_THREADS_DEFAULT;  
  27.   
  28. // 集群数据块平衡节流器balanceThrottler  
  29. final BlockBalanceThrottler balanceThrottler;  
  30.   
  31. /** 
  32.  * We need an estimate for block size to check if the disk partition has 
  33.  * enough space. Newer clients pass the expected block size to the DataNode. 
  34.  * For older clients we just use the server-side default block size. 
  35.  *  
  36.  * 我们需要估计块大小以检测磁盘分区是否有足够的空间。 
  37.  * 新客户端传递预期块大小给DataNode。 
  38.  * 对于旧客户端而言我们仅仅使用服务器端默认的块大小。 
  39.  */  
  40. final long estimateBlockSize;  

[java] view plain copy

    .10000 的节点

blockScanner与blockScannerThread:对DN上的block进行扫描处理,具体见BlockScanner类的介绍

        其中,PeerServer类型的peerServer,实际上是DataXceiverServer实现功能最重要的一个类,在DataXceiverServer实例构造时,实际上传入的是实现了PeerServer接口的TcpPeerServer类,该类内部封装了一个ServerSocket,提供了Java Socket服务端的功能,用于监听来自客户端或其他DataNode的数据读写请求。

 

  d.可构建在廉价的机器上

ipcServer:管理不同DN之间进行数据传输

 

 图片 6图片 7

    .通过多副本存储,提高可靠性

图片 8

        DataXceiverServer内部还存在对于其载体DataNode的实例datanode,这样该线程就能随时获得DataNode状态、提供的一些列服务等;

  1. /** 
  2.  * Manages the BPOfferService objects for the data node. 
  3.  * Creation, removal, starting, stopping, shutdown on BPOfferService 
  4.  * objects must be done via APIs in this class. 
  5.  *  
  6.  * 为DataNode节点管理BPOfferService对象。 
  7.  * 对于BPOfferService对象的创建、移除、启动、停止等操作必须通过该类的api来完成。 
  8.  */  
  9. @InterfaceAudience.Private  
  10. class BlockPoolManager {  
  11.   private static final Log LOG = DataNode.LOG;  
  12.     
  13.   // NameserviceId与BPOfferService的对应关系  
  14.   private final Map<String, BPOfferService> bpByNameserviceId =  
  15.     Maps.newHashMap();  
  16.     
  17.   // BlockPoolId与BPOfferService的对应关系  
  18.   private final Map<String, BPOfferService> bpByBlockPoolId =  
  19.     Maps.newHashMap();  
  20.     
  21.   // 所有的BPOfferService  
  22.   private final List<BPOfferService> offerServices =  
  23.     Lists.newArrayList();  
  24.   
  25.   // DataNode实例dn  
  26.   private final DataNode dn;  
  27.   
  28.   //This lock is used only to ensure exclusion of refreshNamenodes  
  29.   // 这个refreshNamenodesLock仅仅在refreshNamenodes()方法中被用作互斥锁  
  30.   private final Object refreshNamenodesLock = new Object();  
  31.     
  32.   // 构造函数  
  33.   BlockPoolManager(DataNode dn) {  
  34.     this.dn = dn;  
  35.   }  
  36. }  

    .提供了容错和恢复机制

        peers和peersXceiver是DataXceiverServer内部关于peer的两个数据结构,一个是Peer与其所在线程映射集合peers,另一个则是Peer与DataXceiver的映射集合peersXceiver,均是HashMap类型。Peer是什么呢?实际上就是对Socket的封装;

        由类的注释我们可以知道,BlockPoolManager为DataNode节点管理BPOfferService对象。对于BPOfferService对象的创建、移除、启动、停止等操作必须通过类BlockPoolManager的API来完成。而且,BlockPoolManager中主要包含如下几个数据结构:

2.HDFS缺点

        closed为DataXceiverServer是否已关闭的标志位;

 

  a.低延迟数据访问处理较弱

        maxXceiverCount为每个DataNode节点并行的最大DataXceivers数目,为了避免dataNode运行内存溢出,执行这个限制是必须的;

        1、保存nameserviceId与BPOfferService的对应关系的HashMap:bpByNameserviceId;

    .毫秒级别的访问响应较慢

        balanceThrottler是DataXceiverServer内部一个关于集群中数据库平衡的节流器的实现,它实现了对于数据块移动时带宽、数量的控制。

        2、保存blockPoolId与BPOfferService的对应关系的HashMap:bpByBlockPoolId;

    .低延迟和高吞吐率的请求处理较弱

 

        3、保存所有BPOfferService的ArrayList:offerServices;

  b.大量小文件存取处理较弱

        二、DataXceiverServer是如何初始化的?

        4、DataNode实例dn;

    .会占用大量NameNode的内存

        在数据节点DataNode进程启动的startDataNode()方法中,会调用initDataXceiver()方法,完成DataXceiverServer的初始化,代码如下:

        5、refreshNamenodes()方法中用于线程间同步或互斥锁的Object:refreshNamenodesLock。

    .寻道时间超过读取时间

 

        由前三个成员变量,我们可以清楚的知道,BlockPoolManager主要维护的就是该DataNode上的BPOfferService对象,及其所属nameserviceId、blockPoolId。nameserviceId我们可以理解为HDFS集群中某一特定命名服务空间的唯一标识,blockPoolId则对应为该命名服务空间中的一个块池,或者说一组数据块的唯一标识,那么,什么是BPOfferService呢?我们继续往下看BPOfferService的源码,看下它类的定义及其成员变量,代码如下:

  c.并发写入、文件随机修改

[java] view plain copy

 

    .一个文件仅有一个写者

 

[java] view plain copy

    .仅支持Append写入

 图片 9图片 10

 

二、HDFS的架构

  1.  private void initDataXceiver(Configuration conf) throws IOException {  
  2.    // find free port or use privileged port provided  
  3. // 找一个自由端口或使用已提供的特权端口  
  4.     
  5. // 构造TcpPeerServer实例tcpPeerServer,它实现了PeerServer接口,提供了ServerSocket的功能  
  6.    TcpPeerServer tcpPeerServer;  
  7.      
  8.    if (secureResources != null) {// 如果secureResources存在,根据secureResources创建tcpPeerServer  
  9.      tcpPeerServer = new TcpPeerServer(secureResources);  
  10.    } else {// 否则,根据配置信息创建tcpPeerServer  
  11.      tcpPeerServer = new TcpPeerServer(dnConf.socketWriteTimeout,  
  12.          DataNode.getStreamingAddr(conf));  
  13.    }  
  14.      
  15.    // 设置数据接收缓冲区大小,默认为128KB  
  16.    tcpPeerServer.setReceiveBufferSize(HdfsConstants.DEFAULT_DATA_SOCKET_SIZE);  
  17.      
  18.    // 获取Socket地址InetSocketAddress,赋值给DataNode成员变量streamingAddr  
  19.    streamingAddr = tcpPeerServer.getStreamingAddr();  
  20.    LOG.info("Opened streaming server at "   streamingAddr);  
  21.      
  22.    // 构造名字为dataXceiverServer的线程组threadGroup  
  23.    this.threadGroup = new ThreadGroup("dataXceiverServer");  
  24.      
  25.    // 构造DataXceiverServer实例xserver,传入tcpPeerServer  
  26.    xserver = new DataXceiverServer(tcpPeerServer, conf, this);  
  27.      
  28.    // 构造dataXceiverServer守护线程,并将xserver加入线程组threadGroup  
  29.    this.dataXceiverServer = new Daemon(threadGroup, xserver);  
  30.      
  31.    // 将线程组里的所有线程设置为设置为守护线程,方便虚拟机退出时自动销毁  
  32.    this.threadGroup.setDaemon(true); // auto destroy when empty  
  33.   
  34.    // 如果系统配置的参数dfs.client.read.shortcircuit为true(默认为false),  
  35.    // 或者配置的参数dfs.client.domain.socket.data.traffic为true(默认为false),  
  36.    //   
  37.    if (conf.getBoolean(DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_KEY,  
  38.              DFSConfigKeys.DFS_CLIENT_READ_SHORTCIRCUIT_DEFAULT) ||  
  39.        conf.getBoolean(DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC,  
  40.              DFSConfigKeys.DFS_CLIENT_DOMAIN_SOCKET_DATA_TRAFFIC_DEFAULT)) {  
  41.      DomainPeerServer domainPeerServer =  
  42.                getDomainPeerServer(conf, streamingAddr.getPort());  
  43.      if (domainPeerServer != null) {  
  44.        this.localDataXceiverServer = new Daemon(threadGroup,  
  45.            new DataXceiverServer(domainPeerServer, conf, this));  
  46.        LOG.info("Listening on UNIX domain socket: "    
  47.            domainPeerServer.getBindPath());  
  48.      }  
  49.    }  
  50.      
  51.    // 构造短路注册实例  
  52.    this.shortCircuitRegistry = new ShortCircuitRegistry(conf);  
  53.  }  

 图片 11图片 12

图片 13

        整个初始化工作很简单:

  1. /** 
  2.  * One instance per block-pool/namespace on the DN, which handles the 
  3.  * heartbeats to the active and standby NNs for that namespace. 
  4.  * This class manages an instance of {@link BPServiceActor} for each NN, 
  5.  * and delegates calls to both NNs.  
  6.  * It also maintains the state about which of the NNs is considered active. 
  7.  *  
  8.  * DataNode上每个块池或命名空间对应的一个实例,它处理该命名空间到对应活跃或备份状态NameNode的心跳。 
  9.  * 这个类管理每个NameNode的一个BPServiceActor实例,在两个NanmeNode之间调用。 
  10.  * 它也保存了哪个NameNode是active状态。 
  11.  */  
  12. @InterfaceAudience.Private  
  13. class BPOfferService {  
  14.   static final Log LOG = DataNode.LOG;  
  15.   
  16.   /** 
  17.    * Information about the namespace that this service 
  18.    * is registering with. This is assigned after 
  19.    * the first phase of the handshake. 
  20.    *  
  21.    * 该服务登记的命名空间信息。第一阶段握手即被分配。 
  22.    * NamespaceInfo中有个blockPoolID变量,显示了NameSpace与blockPool是一对一的关系 
  23.    */  
  24.   NamespaceInfo bpNSInfo;  
  25.   
  26.   /** 
  27.    * The registration information for this block pool. 
  28.    * This is assigned after the second phase of the 
  29.    * handshake. 
  30.    *  
  31.    * 块池的注册信息,在第二阶段握手被分配 
  32.    */  
  33.   volatile DatanodeRegistration bpRegistration;  
  34.     
  35.   /** 
  36.    * 服务所在DataNode节点 
  37.    */  
  38.   private final DataNode dn;  
  39.   
  40.   /** 
  41.    * A reference to the BPServiceActor associated with the currently 
  42.    * ACTIVE NN. In the case that all NameNodes are in STANDBY mode, 
  43.    * this can be null. If non-null, this must always refer to a member 
  44.    * of the {@link #bpServices} list. 
  45.    *  
  46.    * 与当前活跃NameNode相关的BPServiceActor引用 
  47.    */  
  48.   private BPServiceActor bpServiceToActive = null;  
  49.     
  50.   /** 
  51.    * The list of all actors for namenodes in this nameservice, regardless 
  52.    * of their active or standby states. 
  53.    * 该命名服务对应的所有NameNode的BPServiceActor实例列表,不管NameNode是活跃的还是备份的 
  54.    */  
  55.   private final List<BPServiceActor> bpServices =  
  56.     new CopyOnWriteArrayList<BPServiceActor>();  
  57.   
  58.   /** 
  59.    * Each time we receive a heartbeat from a NN claiming to be ACTIVE, 
  60.    * we record that NN's most recent transaction ID here, so long as it 
  61.    * is more recent than the previous value. This allows us to detect 
  62.    * split-brain scenarios in which a prior NN is still asserting its 
  63.    * ACTIVE state but with a too-low transaction ID. See HDFS-2627 
  64.    * for details.  
  65.    *  
  66.    * 每次我们接收到一个NameNode要求成为活跃的心跳,都会在这里记录那个NameNode最近的事务ID,只要它 
  67.    * 比之前的那个值大。这要求我们去检测裂脑的情景,比如一个之前的NameNode主张保持着活跃状态,但还是使用了较低的事务ID。 
  68.    */  
  69.   private long lastActiveClaimTxId = -1;  
  70.   
  71.   // 读写锁mReadWriteLock  
  72.   private final ReentrantReadWriteLock mReadWriteLock =  
  73.       new ReentrantReadWriteLock();  
  74.     
  75.   // mReadWriteLock上的读锁mReadLock  
  76.   private final Lock mReadLock  = mReadWriteLock.readLock();  
  77.     
  78.   // mReadWriteLock上的写锁mWriteLock  
  79.   private final Lock mWriteLock = mReadWriteLock.writeLock();  
  80.   
  81.   // utility methods to acquire and release read lock and write lock  
  82.   void readLock() {  
  83.     mReadLock.lock();  
  84.   }  
  85.   
  86.   void readUnlock() {  
  87.     mReadLock.unlock();  
  88.   }  
  89.   
  90.   void writeLock() {  
  91.     mWriteLock.lock();  
  92.   }  
  93.   
  94.   void writeUnlock() {  
  95.     mWriteLock.unlock();  
  96.   }  
  97.   
  98.   BPOfferService(List<InetSocketAddress> nnAddrs, DataNode dn) {  
  99.     Preconditions.checkArgument(!nnAddrs.isEmpty(),  
  100.         "Must pass at least one NN.");  
  101.     this.dn = dn;  
  102.   
  103.     // 每个namenode一个BPServiceActor  
  104.     for (InetSocketAddress addr : nnAddrs) {  
  105.       this.bpServices.add(new BPServiceActor(addr, this));  
  106.     }  
  107.   }  
  108. }  

  如上图所示,HDFS也是按照Master和Slave的结构。分NameNode、SecondaryNameNode、DataNode这几个角色。

 

        由上述代码我们可以得知,BPOfferService为DataNode上每个块池或命名空间对应的一个实例,它处理该命名空间到对应活跃或备份状态NameNode的心跳。这个类管理每个NameNode的一个BPServiceActor实例,同时它也保存了哪个NameNode是active状态。撇开其他成员变量先不说,该类有两个十分重要的成员变量,分别是:

  NameNode:是Master节点,是大领导。管理数据块映射;处理客户端的读写请求;配置副本策略;管理HDFS的名称空间;

        1、创建构造DataXceiverServer需要的TcpPeerServer实例tcpPeerServer,它内部封装了ServerSocket,是DataXceiverServer功能实现的最主要依托;

 

  SecondaryNameNode:是一个小弟,分担大哥namenode的一部分工作量;是NameNode的冷备份;合并fsimage和fsedits然后再发给namenode。

        2、从tcpPeerServer中获取Socket地址InetSocketAddress,赋值给DataNode成员变量streamingAddr

        1、bpServiceToActive:BPServiceActor类型的,表示与当前活跃NameNode相关的BPServiceActor引用;

*  DataNode:Slave*节点,奴隶,干活的。负责存储client发来的数据块block;执行数据块的读写操作。

        3、然后构造DataXceiverServer实例xserver,传入tcpPeerServer;

        2、bpServices:CopyOnWriteArrayList<BPServiceActor>类型的列表,表示该命名服务对应的所有NameNode的BPServiceActor实例列表,不管NameNode是活跃的还是备份的。

  热备份:b是a的热备份,如果a坏掉。那么b马上运行代替a的工作。

        4、构造dataXceiverServer守护线程,并将xserver加入之前创建的线程组threadGroup;

        由此可以看出,BPOfferService实际上是每个命名服务空间所对应的一组BPServiceActor的管理者,这些BPServiceActor全部存储在bpServices列表中,并且由bpServices表示当前与active NN连接的BPServiceActor对象的引用,而bpServices对应的则是连接到所有NN的BPServiceActor,无论这个NN是active状态还是standby状态。那么,问题又来了?BPServiceActor是什么呢?继续吧!

  冷备份:b是a的冷备份,如果a坏掉。那么b不能马上代替a工作。但是b上存储a的一些信息,减少a坏掉之后的损失。

        5、将线程组里的所有线程设置为设置为守护线程,方便虚拟机退出时自动销毁。

[java] view plain copy

  fsimage:元数据镜像文件(文件系统的目录树。)

        下面,我们再看下DataXceiverServer的构造方法,代码如下:

 

  edits:元数据的操作日志(针对文件系统做的修改操作记录)

 

 图片 14图片 15

  namenode内存中存储的是=fsimage edits。

本文由巴黎人游戏官网发布于网络资讯,转载请注明出处:一、HDFS的优缺点,对于FSDataset后面会继续讨论

关键词:

上一篇:安装及配置,编译及安装
下一篇:没有了