HDFS1.0源代码解析—数据
本次主要介绍DN端进行数据传输和接受的类BlockSender和BlockReceiver,其中BlockSender是读取DN本地block数据传送回数据请求端,BlockReceiver是接受存储数据,写入到本地的block中。首选介绍BlockSender的主要函数的作用:
构造函数
[java]
94 this.blockLength = datanode.data.getVisibleLength(block);
99 checksumIn = new DataInputStream(
100 new BufferedInputStream(datanode.data.getMetaDataInputStream(block),
101 BUFFER_SIZE));
123 bytesPerChecksum = checksum.getBytesPerChecksum();
129 checksumSize = checksum.getChecksumSize();
145 offset = (startOffset - (startOffset % bytesPerChecksum));
146 if (length >= 0) {
147 // Make sure endOffset points to end of a checksumed chunk.
148 long tmpLen = startOffset + length;
149 if (tmpLen % bytesPerChecksum != 0) {
150 tmpLen += (bytesPerChecksum - tmpLen % bytesPerChecksum);
151 }
152 if (tmpLen < endOffset) {
153 endOffset = tmpLen;
154 }
155 }
156
157 // seek to the right offsets
158 if (offset > 0) {
159 long checksumSkip = (offset / bytesPerChecksum) * checksumSize;
160 // note blockInStream is seeked when created below
161 if (checksumSkip > 0) {
162 // Should we use seek() for checksum file as well?
163 IOUtils.skipFully(checksumIn, checksumSkip);
164 }
165 }
166 seqno = 0;
167
168 blockIn = datanode.data.getBlockInputStream(block, offset); // seek to offset
在开始介绍构造函数之前,先说一下DN上每个block的存放形式,block以blk_3910275445015356807和blk_3910275445015356807_1001.meta这样的形式存在,第一个文件存储内容,第二个存储元信息,所以才读取block的时候需要读取这两个文件。
开始看一下构造函数的主要作用,94行获取block文件的长度,99行打开block对应的元信息文件,123行获取每个chunk的大小,129行获取chunksum的大小。那么chunk和chunksum是什么呢,这里需要介绍下DN传送和接受数据的特点,每次传输和接受一个package,每个package包含若干个chunk,每个chunk对应的元信息就是chunksum。有了这个背景知识就知道145行的含义了,我们要求读取的数据可以从任意位置开始,但是DN传送时却要从一个整的chunk开始,因为元信息就是按照chunk为单位计算和存储的,这个ooffset就是我们要求读取的位置和实际读取位置的一个偏移,这个需要传回客户端,根据这个值用户才不会多介绍数据。149和150行对应读取的末尾做同样的处理,但是这个地方为什么没有把多读取数据的偏移量返回呢,是因为我们既然已经知道了起始位置有知道了读取的长度,所以我们知道实际的末尾,即使传送的数据有多余信息也不会造成影响。163行是使读取的元信息的文件流移动到要读取的位置,168行是打开block文件,并将文件流偏移到读取的位置。
构造函数初始化完毕以后,再来看传送数据的主函数sendBlock
[java]
383 long sendBlock(DataOutputStream out, OutputStream baseStream,
384 BlockTransferThrottler throttler) throws IOException {
396 try {
397 checksum.writeHeader(out);
398 if ( chunkOffsetOK ) {
399 out.writeLong( offset );
400 }
433 ByteBuffer pktBuf = ByteBuffer.allocate(pktSize);
434
435 while (endOffset > offset) {
436 long len = sendChunks(pktBuf, maxChunksPerPacket,
437 streamForSendChunks);
438 offset += len;
439 totalRead += len + ((len + bytesPerChecksum - 1)/bytesPerChecksum*
440 checksumSize);
441 seqno++;
442 }
443 try {
444 out.writeInt(0); // mark the end of block
445 out.flush();
397行是首先传送一些元数据信息,399传送读取数据时多余的偏移,433行声明读取数据的缓冲区,436调用另外一个函数真正进行数据的传输,每次传送一个package,444行传送一个0表示传送结束。
作者:zhangchunminggucas
补充:软件开发 , Java ,