HDFS

( Notes from Definitive Guide 4th edition )

Hadoop is written in Java.

Good for Very large files, streaming data access, commodity hardware.
Not suitable for low latency applications, lots of small files, multiple writes and arbitrary file modifications

Disk block is different from file system block. File system ( of a single disk)  blocks consists of multiple disk blocks. Disk block is typically 512 bytes.  HDFS also has concept of block, but it is much larger than disk or regular single disk file system block.  HDFS block size is large to so that it can take advantage of higher disk transfer rate as compared to lower seek rate.

Map tasks in MapReduce normally operate on one block at time.

Why HDFS blocks? file can be larger than a disk , simplifies storage management (fixed width) , blocks fit well for replication

Block Cache : for frequently used file blocks , can be  administered via cache directives to cache pools.

Namode/datanode => Master/slave
Namenode : ( master)  manages filesystem namespace, maintains filesystem tree and metadata for all files and directories, namenode knows datanodes on which all blocks of a file are located.

It is important to build redundancy for namenode. If it fails and if there is no fallback , there is no way to make sense of any data on datanodes.  So namenode failures must be accounted for , either by duplicating the metadata to another filesystem or by having s secondary namenode( keeps checkpoints ) . Both of these only protect against data loss but don't provide HA. Namenode is till SPOF.

Hadoop 2 introduced HA ( Pair of namenodes in active-standby configuration where both use highly available shared storage to share edit log. This block mapping is not written to disk but stays in memory of each namenode.  )

HDFS Federation : To overcome the limitation ( esp. memory as it keep all files and blocks info in memory )  imposed by single namenode for entire cluster, the federation was introduced in 2.x.  In Federation each namenode manages a namespace volume ( /users, /share ) . These volumes are independent of each other.

Failover & Fencing:  During ungraceful failover , if the previous active name still remains active then it may cause corruption, to stop that from happening HA implementation goes to a great length. This is called fencing.

fs.defaultFS=> This indicates default filesystem and namenode.  set it to hdsf://localhost  if you are trying HDFS on your localhost.
fs.replication => replication factor . set it to 1 if you are just trying HDFS only on your localhost as there is only single datanode.

There is no execute persmission for a file in HDFS as you can not really execute a file unlike POSIX.

By default in HDFS, the security is disabled(dfs.permissions.enabled)  which means any arbitrary user can get access the files and modify those. When it is enabled, the permissions are enforced quite like POSIX.

superuser: Namenode process identity. no permission check is done for superuser.


Filesystems:  Hadoop has an abstract(org.apache.hadoop.fs.FileSystem - client interface ) notion of filesystems  , of which HDFS( hdfs.DistributedFileSystem) is just one implementation,  fs.s3a.S3AFileSystem(s3a)  is another implementation which  provides support for Amazon S3 and replaces s3n implementation, fs.azure.NativeAzureFileSystem(wasb)  for supporting MS Azure.

Hadoop provides many interfaces to its filesystems, and it generally uses the URI scheme to pick the correct filesystem instance to communicate with. 

S3A: 
S3A allows per bucket credentials to be configured. Click here   for more details.   

Hadoop Credential Provider allows credentials to be stored outside the XML config files for Hadoop clients. 
AWS credential providers are classes which can be used by the Amazon AWS SDK to obtain an AWS login from a different source in the system, including environment variables, JVM properties, and configuration files.

S3A by default talks to "US-East" when you use  the default endpoint which is s3.amazonaws.com, also called central endpoint.  This endpoint can be used to talk to any bucket in any region. There are region specific endpoints also. All endpoints other than the default endpoint only support interaction with buckets local to that S3 instance. 

You can specify bucket specific endpoints in site.xml. That way the default URL will use that endpoint. 

S3Guard
S3 is eventually consistent. The delay in consistency creates problems for Hadoop apps working via s3a over s3 data.  To enforce at least listing consistency for buckets , you can configure S3Guard with s3a. This causes DynamoDB to store  the bucket metadata. When request for listing the bucket content is made it is served by DynamoDB table rather than S3.   S3Guard still can not make sure that when data is actually accessed , client will get most recent(updated) data.   This becomes big issue for MapReduce and Spark jobs which rely on directory renaming for committing the work of various tasks/workers in a job.  To address this , S3A Committers evolved. There are three S3A committers 1) Directory 2) Partitioned 3)Magic .  Directory committer is widely used and more mature and it does not need S3Gurad.  Magic Committer needs S3Guard as it relies on consistent listing. All of the committers use multipart upload feature of S3 buckets.   For more info see  Working with Cloud ( S3, ADLS, GCS ) Data


Command Line Interface: 
There are three different similar command lines.  I am still not fully clear on the differences but both of these executables  ( hdfs and hadoop ) are in bin directory  of Hadoop installation . See  for more.

hadoop fs - can work with multiple filesystems. It infers the filesystem based on URI scheme.

hadoop dfs  - same as "hdfs dfs" . Deprecated, use "hdfs dfs" instead. This was designed to work with mainly hdfs. 

hdfs dfs - mainly for hdfs. 

These command lines  work with different DFS implementations like  HDFS, S3, ADLS and GCS.

hdfs  dfs  -cat  hdfs://hdfsroot/path/file.ext
hdfs  dfs  -cat  s3a://bucketname/path/file.ext
hdfs  dfs  -cat  wasb://adlsroot/path/file.ext
hdfs  dfs -put /localfilepath   s3a://....
hadoop fs -ls gs://<Bucket_You_Want_To_List>/dir/


You can also use the different file system within same command

hadoop fs -cp hdfs://<HOST_NAME>:<PORT>/Data_You_Want_To_Copy gs://<Bucket_name>

hdfs dfs -cp hdfs://      s3a://....

For more details on Google Cloud Storage related commands/configuration  see the link 

Interfaces :  HTTP, C, NTFS, FUSE, 

JAVA


public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {
      // implementation elided


public interface Seekable {
void seek(long pos) throws IOException; long getPos() throws IOException;
}


public class FSDataOutputStream extends DataOutputStream implements Syncable {
     public long getPos() throws IOException { // implementation elided
    }
      // implementation elided 
} 


Reading 

In case of HDFS,  DistributedFileSystem  read() returns FSDataInputStream which wraps DFSInputStream.  DistributedFileSystem connects with namenode and gets the  blocks locations  and data nodes which store those.  Clients then calls read on DFSInputStream which has the datanode info on which block is stored.  DFSInputStream takes care of closing the connection to datanode when block is read and opening new one for next block and contacting namenode when needs to know next set of blocks.

One important things is that after the open() , read directly goes to datanode.  So namenode does not become bottleneck even if multiple clients are reading at same time. This is HDFS can scale to large number of clients.

Writing

In case writing, when client calls create() , DistributedFileSystems connects to namename and requests to create a new file. namenode does some checks and if checks pass,  DistributedFileSystem create()    returns FSDataOutputStream which wraps DFSOutputStream.  DFSOutputStream will create a data queue and ack queue. DataStreamer will consume the data queue and communicates to namenodes to allocate new blocks to be written by picking a set of datanodes( replicas).  List of datanodes forms a pipeline.   During the writing if a datanode fails, it is removed from pipeline. The current block being written is given a new identity and block is written completely on good nodes. Namenode later on notices the under replication of the block and find suitable datanode and replicates it. For subsequent blocks the process is as usual. So as far as dfs.namenode.replication.min  replicas are wriiten , write() succeeds and replication to remainng replicas can happen in async manner. 

Coherency Model 

After creating a file, it is visible in the filesystem namespace, However, any content written to the file is not guaranteed to be visible, even if the stream is flushed.  Current block being written may not be visible to other readers. 

FSDataOutputStream.hflush() : gurantees the data being written , reaches all the datanodes in write pipeline. It does not guarantee if it has been written to disk on all nodes. For that guarantee use hsync().    hflush and hsync has some overhead , so depending on application requirements, decide when/ how many times to call hflush() or hsync() (  hsync() higher overhead than hflush() ).   

distcp - copies files in parallel using MapReduce ( only mappers , no reduces ). For  copying data to and from Hadoop filesystems in parallel.
 

A very common use is to transfer data from one cluster to another.  

distcp -  can imbalance the cluster though. If you use only single map ( -m 1 ), the first replica will always be created on the node running the map.  So always specify more maps than nodes in cluster. 


References: 




Comments

Popular posts from this blog

SQL

Analytics

DBeaver