Showing posts with label hadoop distributed file system. Show all posts
Showing posts with label hadoop distributed file system. Show all posts
Tuesday, June 29, 2010
America’s Most Wanted – a metric to detect faulty machines in Hadoop
I have been asked many many questions about the failure rates of machines in our Hadoop cluster. These questions vary from the innocuous how much time do you spend everyday fixing bad machines in your cluster to the more involved ones like does failure of hadoop machines depend on the heat-map of the data center? I do not have answer to these questions and this is an area of research that has been of focus lately. But I have seen that the efficiency of a Hadoop cluster is directly dependent on the amount of manpower needed to operate such a cluster.
[フレーム]
Common Types of Failures
The three most common categories of failures I have observed are
- System Errors – Hardware, OS, jvm, hadoop, compiler, etc – Hadoop aims to reduce the effect of this broad category of errors
- User Application Errors – Bad code written by an user – Bloated memory usage
- Anomalous Behaviour -- Not working according to expectation – Slow nodes – Causes most harm to Hadoop cluster because they go undetected for long periods of time
America's Most Wanted (AMW)
I have the previledge of working with one of the most experienced Hadoop cluster administrator Andrew Ryan. He observed that a few machines in the Hadoop cluster are always repeat offenders: they land into trouble, gets incarcerated and fixed and then when put back online they create trouble again. He came up with this metric to determine when to throw a machine out of the Hadoop cluster. The following chart shows that 3% of our machines cause 43% of all manual repair events.
This clearly shows the need for a three-strikes-you-are-out law: if a machine goes into repair three times it is better to take it permanently out of the Hadoop cluster.
An exotic question
Other exotic questions that I am frequently asked are like do map-reduce jobs written in python have a higher probability of failure? Here is a chart that tries to answer this question:
- 5% of all jobs in cluster are written in Python
- 15% of cluster CPU is consumed by Python jobs
- 20% of all failed jobs are written in python
This does show that jobs written in python consume more CPU on the average than jobs written in Java. It also shows that a greater percentage of these jobs are likely to fail. Why is this? I do not have a definite answer but I my guess is that a developer is more likely to write the first few version of his experimental query in python because it is an easy-to-prototype language.
I presented a more detailed version of this in a IFIP Working Group on Dependable Computing (DSN2010). The aim of this workshop is to understand more about failure patterns on Hadoop nodes, automatic ways to analyze and handle these failures and how the research community can help Hadoop become more fault-tolerant.
Sunday, May 9, 2010
Facebook has the world's largest Hadoop cluster!
It is not a secret anymore!
The Datawarehouse Hadoop cluster at Facebook has become the largest known Hadoop storage cluster in the world. Here are some of the details about this single HDFS cluster:
- 21 PB of storage in a single HDFS cluster
- 2000 machines
- 12 TB per machine (a few machines have 24 TB each)
- 1200 machines with 8 cores each + 800 machines with 16 cores each
- 32 GB of RAM per machine
- 15 map-reduce tasks per machine
That's a total of more than 21 PB of configured storage capacity! This is larger than the previously known Yahoo!'s cluster of 14 PB. Here are the cluster statistics from the HDFS cluster at Facebook:
Hadoop started at Yahoo! and full marks to Yahoo! for developing such critical infrastructure technology in the open. I started working with Hadoop when I joined Yahoo! in 2006. Hadoop was in its infancy at that time and I was fortunate to be part of the core set of Hadoop engineers at Yahoo!. Many thanks to Doug Cutting for creating Hadoop and Eric14 for convincing the executing management at Yahoo! to develop Hadoop as open source software.
Facebook engineers work closely with the Hadoop engineering team at Yahoo! to push Hadoop to greater scalability and performance. Facebook has many Hadoop clusters, the largest among them is the one that is used for Datawarehousing. Here are some statistics that describe a few characteristics of the Facebook's Datawarehousing Hadoop cluster:
- 12 TB of compressed data added per day
- 800 TB of compressed data scanned per day
- 25,000 map-reduce jobs per day
- 65 millions files in HDFS
- 30,000 simultaneous clients to the HDFS NameNode
A majority of this data arrives via scribe , as desribed in scribe-hdfs integration. This data is loaded in Hive . Hive provides a very elegant way to query the data stored in Hadoop. Almost 99.9% Hadoop jobs at Facebook are generated by a Hive front-end system. We provide lots more details about our scale of operations in our paper at SIGMOD titled Datawarehousing and Analytics Infrastructure at Faceboo k.
Here are two pictorial representations of the rate of growth of the Hadoop cluster:
Details about our Hadoop configuration
I have fielded many questions from developers and system administrators about the Hadoop configuration that is deployed in the Facebook Hadoop Datawarehouse. Some of these questions are from Linux kernel developers who would like to make Linux swapping work better with Hadoop workload; other questions are from JVM developers who may attempt to make Hadoop run faster for processes with large heap size; yet others are from GPU architects who would like to port a Hadoop workload to run on GPUs. To enable this type of outside research, here are the details about the Facebook's Hadoop warehouse configurations. I hope this open sharing of infrastructure details from Facebook jumpstarts the research community to design ways and means to optimize systems for Hadoop usage.
Sunday, April 25, 2010
The Curse of the Singletons! The Vertical Scalability of Hadoop NameNode
Introduction
What are the bottlenecks of the NameNode?
Network: We have around 2000 nodes in our cluster and each node is running 9 mappers and 6 reducers simultaneously. This means that there are around 30K simultaneous clients requesting service from the NameNode. The Hive Metastore and the HDFS RaidNode imposes additional load on the NameNode. The Hadoop RPCServer has a singleton Listener Thread that pulls data from all incoming RPCs and hands it to a bunch of NameNode handler threads. Only after all the incoming parameters of the RPC are copied and deserialized by the Listener Thread does the NameNode handler threads get to process the RPC. One CPU core on our NameNode machine is completely consumed by the Listener Thread. This means that during times of high load, the Listener Thread is unable to copy and deserialize all incoming RPC data in time, thus leading to clients encountering RPC socket errors. This is one big bottleneck to vertically scalabiling of the NameNode.
CPU: The second bottleneck to scalability is the fact that most critical sections of the NameNode is protected by a singleton lock called the FSNamesystem lock. I had done some major restructuring of this code about three years ago via HADOOP-1269 but even that is not enough for supporting current workloads. Our NameNode machine has 8 cores but a fully loaded system can use at most only 2 cores simultaneously on the average; the reason being that most NameNode handler threads encounter serialization via the FSNamesystem lock.
Memory: The NameNode stores all its metadata in the main memory of the singleton machine on which it is deployed. In our cluster, we have about 60 million files and 80 million blocks; this requires the NameNode to have a heap size of about 58GB. This is huge! There isn't any more memory left to grow the NameNode's heap size! What can we do to support even greater number of files and blocks in our system?
Can we break the impasse?
RPC Server: We enhanced the Hadoop RPC Server to have a pool of Reader Threads that work in conjunction with the Listener Thread. The Listener Thread accepts a new connection from a client and then hands over the work of RPC-parameter-deserialization to one of the Reader Threads. In our case, we configured our system so that the Reader Threads consist of 8 threads. This change has doubled the number of RPCs that the NameNode can process at full throttle. This change has been contributed to the Apache code via HADOOP-6713.
The above change allowed a simulated workload to be able to consume 4 CPU cores out of a total of 8 CPU cores in the NameNode machine. Sadly enough, we still cannot get it to use all the 8 CPU cores!
FSNamesystem lock: A review of our workload showed that our NameNode typically has the following distribution of requests:
- stat a file or directory 47%
- open a file for read 42%
- create a new file 3%
- create a new directory 3%
- rename a file 2%
- delete a file 1%
The memory bottleneck issue is still unresolved. People have asked me if the NameNode can keep some portion of its metadata in disk, but this will require a change in locking model design first. One cannot keep the FSNamesystem lock while reading in data from the disk: this will cause all other threads to block thus throttling the performance of the NameNode. Could one use flash memory effectively here? Maybe an LRU cache of file system metadata will work well with current metadata access patterns? If anybody has good ideas here, please share it with the Apache Hadoop community.
In a Nutshell
The two proposed enhancements have improved NameNode scalability by a factor of 8. Sweet, isn't it?
Labels:
hadoop,
hadoop distributed file system,
hdfs,
hdfs scalability
Monday, November 9, 2009
HDFS High Availability
I have encountered plenty of questions about the single point of failure for the HDFS NameNode. The most common concern being that if the NameNode dies, then the whole cluster is unavailable. This means that HDFS is unsuitable for applications that need a high degree of uptime. This is not a problem when you run map-reduce jobs on HDFS, especially because a map-reduce system is a batch system and the uptime requirements of a batch system is typically not that stringent.
In recent times, I am starting to see lots of distributed applications that are using HDFS as a general purpose storage system. These applications range from multimedia servers that store mails, photos, videos, etc to systems that store updates from thousands of distributed sensors. These applications prefer HDFS because they can store a large volume of data. These applications do not use map-reduce to mine any of the information stored in HDFS. However, these applications do need consistency and availability of data. The Brewer's CAP Theorem states that most distributed systems need some tradeoffs among Consistency, Availability and Partition tolerance. HDFS's does an excellent job of providing Consistency of data at all times. Traditionally, it did not address Availability and Partition tolerance earlier. That could change to a certain extent with HDFS 0.21 release. HDFS 0.21 has a new entity called the BackupNode that receives real-time updates about transactions from the NameNode. This is the first step in making making HDFS highly available.
Here is a slide-deck that I wanted to present at ApacheCon 2009 about the current state of affairs regarding High Availabilty with HDFS. [フレーム]
In recent times, I am starting to see lots of distributed applications that are using HDFS as a general purpose storage system. These applications range from multimedia servers that store mails, photos, videos, etc to systems that store updates from thousands of distributed sensors. These applications prefer HDFS because they can store a large volume of data. These applications do not use map-reduce to mine any of the information stored in HDFS. However, these applications do need consistency and availability of data. The Brewer's CAP Theorem states that most distributed systems need some tradeoffs among Consistency, Availability and Partition tolerance. HDFS's does an excellent job of providing Consistency of data at all times. Traditionally, it did not address Availability and Partition tolerance earlier. That could change to a certain extent with HDFS 0.21 release. HDFS 0.21 has a new entity called the BackupNode that receives real-time updates about transactions from the NameNode. This is the first step in making making HDFS highly available.
Here is a slide-deck that I wanted to present at ApacheCon 2009 about the current state of affairs regarding High Availabilty with HDFS. [フレーム]
Monday, October 19, 2009
Hadoop discussions at Microsoft Research
I was invited to present a talk about Hadoop File System Architecture at Microsoft Research at Seattle. This is a research group and is focussed on long-term research, so it is no surprise that they are interested in knowing how a growing company like Facebook is using Hadoop to its advantage.
I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!
The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.
Here are the slides I used for my presentation. [フレーム]
I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!
The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.
Here are the slides I used for my presentation. [フレーム]
Friday, October 2, 2009
I presented a set of slides that describes the Hadoop development at Facebook at the HadoopWorld conference in New York today. It was well received by more than 100 people. I have presented at many-a-conferences in the west coast but this is the first time I have presented at a conference in New York... there are more hadoop users here versus mostly hadoop developers in the west coast. There were plenty of questions, especially about Hadoop-Archive and Realtime-Hadoop. There were people asking me questions about HDFS Symbolic links and HDFS-scribe copier.
Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement. [フレーム]
Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement. [フレーム]
Monday, September 14, 2009
HDFS block replica placement in your hands now!
Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
Friday, August 28, 2009
HDFS and Erasure Codes (HDFS-RAID)
The Hadoop Distributed File System has been great in providing a cloud-type file system. It is robust (when administered correctly :-)) and highly scalable. However, one of the main drawbacks of HDFS is that each piece of data is replicated in three places. This is acceptable because disk storage is cheap and is becoming cheaper by the day; this isn't a problem if you have a relatively small to medium size cluster. The price difference (in absolute terms) is not much whether you use 15 disks or whether you use 10 disks. If we consider the cost of 1ドル per GByte, the price difference between fifteen 1 TB disk and ten 1 TB disk is only 5ドルK. But when the total size of your cluster is 10 PBytes, then the costs savings in storing the data in two places versus three is a huge ten million dollars!
The reason HDFS stores disk blocks in triplicate is because it uses commodity hardware and there is non-negligible probability of a disk failure. It has been observed that a replication factor of 3 and the fact the HDFS aggressively detects failures and immediately replicates failed -block-replicas is sufficient to never lose any data in practice. The challenge now is to achieve an effective replication factor of 3 while keeping the real physical replication factor at close to 2! How best to do it than by using Erasure Codes.
I heard about this idea called DiskReduce from the folks at CMU. The CMU PDL Labs has been a powerhouse of research in file systems and it is no surprise that they proposed a elegant way of implementing erasure codes in HDFS. I borrowed heavily from their idea in my implementation of Erasure Codes in HDFS described in HDFS-503. One of the main motivation of my design is to keep the HDFS Erasure Coding as a software layer above HDFS rather than inter-twining it inside of HDFS code. The HDFS code is complex by itself and it is really nice to not have to make it more complex and heavyweight.
Distributed Raid File System consists of two main software components. The first component is the RaidNode, a daemon that creates parity files from specified HDFS files. The second component "raidfs" is a software that is layered over a HDFS client and it intercepts all calls that an application makes to the HDFS client. If the HDFS client encounters corrupted data while reading a file, the raidfs client detects it; it uses the relevant parity blocks to recover the corrupted data (if possible) and returns the data to the application. The application is completely transparent to the fact that parity data was used to satisfy it's read request. The Distributed Raid File System can be configured in such a way that a set of data blocks of a file are combined together to form one or more parity blocks. This allows one to reduce the replication factor of a HDFS file from 3 to 2 while keeping the failure probabilty relatively same as before.
I have seen that using a stripe size of 10 blocks decreases the physical replication factor of a file to 2.2 while keeping the effective replication factor of a file at 3. This typically results in saving 25% to 30% of storage space in a HDFS cluster.
One of the shortcoming of this implementation is that we need a parity file for every file in HDFS. This potentially increases the number of files in the NameNode. To alleviate this problem, I will enhance this implementation (in future) to use the Hadoop Archive feature to archive all the parity files together in larger containers so that the NameNode does not have to support additional files when the HDFS Erasure Coding is switched on. This works reasonably well because it is a very very rare case that the parity files are ever used to satisfy a read request.
I am hoping that this feature becomes part of Hadoop 0.21 release scheduled for September 2009! [フレーム]
The reason HDFS stores disk blocks in triplicate is because it uses commodity hardware and there is non-negligible probability of a disk failure. It has been observed that a replication factor of 3 and the fact the HDFS aggressively detects failures and immediately replicates failed -block-replicas is sufficient to never lose any data in practice. The challenge now is to achieve an effective replication factor of 3 while keeping the real physical replication factor at close to 2! How best to do it than by using Erasure Codes.
I heard about this idea called DiskReduce from the folks at CMU. The CMU PDL Labs has been a powerhouse of research in file systems and it is no surprise that they proposed a elegant way of implementing erasure codes in HDFS. I borrowed heavily from their idea in my implementation of Erasure Codes in HDFS described in HDFS-503. One of the main motivation of my design is to keep the HDFS Erasure Coding as a software layer above HDFS rather than inter-twining it inside of HDFS code. The HDFS code is complex by itself and it is really nice to not have to make it more complex and heavyweight.
Distributed Raid File System consists of two main software components. The first component is the RaidNode, a daemon that creates parity files from specified HDFS files. The second component "raidfs" is a software that is layered over a HDFS client and it intercepts all calls that an application makes to the HDFS client. If the HDFS client encounters corrupted data while reading a file, the raidfs client detects it; it uses the relevant parity blocks to recover the corrupted data (if possible) and returns the data to the application. The application is completely transparent to the fact that parity data was used to satisfy it's read request. The Distributed Raid File System can be configured in such a way that a set of data blocks of a file are combined together to form one or more parity blocks. This allows one to reduce the replication factor of a HDFS file from 3 to 2 while keeping the failure probabilty relatively same as before.
I have seen that using a stripe size of 10 blocks decreases the physical replication factor of a file to 2.2 while keeping the effective replication factor of a file at 3. This typically results in saving 25% to 30% of storage space in a HDFS cluster.
One of the shortcoming of this implementation is that we need a parity file for every file in HDFS. This potentially increases the number of files in the NameNode. To alleviate this problem, I will enhance this implementation (in future) to use the Hadoop Archive feature to archive all the parity files together in larger containers so that the NameNode does not have to support additional files when the HDFS Erasure Coding is switched on. This works reasonably well because it is a very very rare case that the parity files are ever used to satisfy a read request.
I am hoping that this feature becomes part of Hadoop 0.21 release scheduled for September 2009! [フレーム]
Labels:
erasure code,
hadoop distributed file system,
hdfs,
raid
Saturday, June 6, 2009
HDFS Scribe Integration
It is finally here: you can configure the open source log-aggregator, scribe, to log data directly into the Hadoop distributed file system.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
ipc.client.idlethreshold 10000 Defines the threshold number of connections after which ipc.client.connection.maxidletime 10000 The maximum time in msec after which a client will bring down the ipc.client.connect.max.retries 2 Indicates the number of retries a client will make to establish ipc.server.listen.queue.size 128 Indicates the length of the listen queue for servers accepting ipc.server.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.client.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.ping.interval 5000 The Client sends a ping message to server every period. This is helpful ipc.client.connect.maxwaittime 5000 The Client waits for this much time for a socket connect call to be establised dfs.datanode.socket.write.timeout 20000 The dfs Client waits for this much time for a socket write call to the datanode. ipc.client.ping false HADOOP-2757
[フレーム]
Many Web 2.0 companies have to deploy a bunch of costly filers to capture weblogs being generated by their application. Currently, there is no option other than a costly filer because the write-rate for this stream is huge. The Hadoop-Scribe integration allows this write-load to be distributed among a bunch of commodity machines, thus reducing the total cost of this infrastructure.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
Making the HDFS write code path more real-time was painful. There are various timeouts/settings in HDFS that were hardcoded and needed to be changed to allow the application to fail fast. At the bottom of this blog-post, I am attaching the settings that we have currently configured to make the HDFS-write very real-timeish. The last of the JIRAS, HADOOP-2757 is in the pipeline to be committed to Hadoop trunk very soon.
What about Namenode being the single point of failure? This is acceptable in a warehouse type of application but cannot be tolerated by a realtime application. Scribe typically aggregates click-logs from a bunch of webservers, and losing *all* click log data of a website for a 10 minutes or so (minimum time for a namenode restart) cannot be tolerated. The solution is to configure two overlapping clusters on the same hardware. Run two separate namenodes N1 and N2 on two different machines. Run one set of datanode software on all slave machines that report to N1 and the other set of datanode software on the same set of slave machines that report to N2. The two datanode instances on a single slave machine share the same data directories. This configuration allows HDFS to be highly available for writes!
The highly-available-for-writes-HDFS configuration is also required for software upgrades on the cluster. We can shutdown one of the overlapping HDFS clusters, upgrade it to new hadoop software, and then put it back online before starting the same process for the second HDFS cluster.
What are the main changes to scribe that were needed? Scribe already had the feature that it buffers data when it is unable to write to the configured storage. The default scribe behaviour is to replay this buffer back to the storage when the storage is back online. Scribe is configured to support no-buffer-replay when the primary storage is back online. Scribe-hdfs is configured to write data to a cluster N1 and if N1 fails then it writes data to cluster N2. Scribe treats N1 and N2 as two equivalent primary stores. The scribe configuration should have fs_type=hdfs. For scribe compilation, you can use ./configure --enable-hdfs LDFLAGS="-ljvm -lhdfs". A good example for configuring scribe-hdfs is in a file called hdfs_example2.conf in the scribe code base.
Here are the settings for the Hadoop 0.17 configuration that is needed by an application doing writes in realtime:
connections will be inspected for idleness.
connection to the server.
a server connection.
client connections.
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
to detect socket connections that were idle and have been terminated by a failed server.
with the server.
Labels:
hadoop,
hadoop distributed file system,
hdfs,
scribe
Sunday, May 24, 2009
Better Late than Never
For quite a while, I have been thinking on blogging about Hadoop in general and Hadoop distributed file system (HDFS) in particular. Why, you may ask?
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Labels:
hadoop,
hadoop distributed file system,
hdfs
Subscribe to:
Posts (Atom)