Showing posts with label hdfs. Show all posts
Showing posts with label hdfs. Show all posts
Tuesday, May 17, 2011
Realtime Hadoop usage at Facebook -- Part 1
Facebook recently deployed Facebook Messages, its first ever user-facing application built on the Apache Hadoop platform. It uses HDFS and HBase as core technologies for this solution. Since then, there are many more applications that have started to used HBase. We have gained some experience in deploying and operating HDFS and HBase at peta-byte scale for realtime-workloads and decided to write a paper detailing some of these insights. This paper will be published in SIGMOD 2011.
The requirements for the storage system for our workloads can be summarized as follows:
1. Elasticity: We need to be able to add incremental capacity to our storage systems with minimal overhead and no downtime. In some cases we may want to add capacity rapidly and the system should automatically balance load and utilization across new hardware.
2. High write throughput: Most of the applications store (and optionally index) tremendous amounts of data and require high aggregate write throughput.
3. Efficient and low-latency strong consistency semantics within a data center: There are important applications like Messages that require strong consistency within a data center. This requirement often arises directly from user expectations. For example ‘‘unread’’ message counts displayed on the home page and the messages shown in the inbox page view should be consistent with respect to each other. While a globally distributed strongly consistent system is practically impossible, a system that could at least provide strong consistency within a data center would make it possible to provide a good user experience. We also knew that (unlike other Facebook applications), Messages was easy to federate so that a particular user could be served entirely out of a single data center making strong consistency within a single data center a critical requirement for the Messages project. Similarly, other projects, like realtime log aggregation, may be deployed entirely within one data center and are much easier to program if the system provides strong consistency guarantees.
4. Efficient random reads from disk: In spite of the widespread use of application level caches (whether embedded or via memcached), at Facebook scale, a lot of accesses miss the cache and hit the back-end storage system. MySQL is very efficient at performing random reads from disk and any new system would have to be comparable.
5. High Availability and Disaster Recovery: We need to provide a service with very high uptime to users that covers both planned and unplanned events (examples of the former being events like software upgrades and addition of hardware/capacity and the latter exemplified by failures of hardware components). We also need to be able to tolerate the loss of a data center with minimal data loss and be able to serve data out of another data center in a reasonable time frame.
6. Fault Isolation: Our long experience running large farms of MySQL databases has shown us that fault isolation is critical. Individual databases can and do go down, but only a small fraction of users are affected by any such event. Similarly, in our warehouse usage of Hadoop, individual disk failures affect only a small part of the data and the system quickly recovers from such faults.
7. Atomic read-modify-write primitives: Atomic increments and compare-and-swap APIs have been very useful in building lockless concurrent applications and are a must have from the underlying storage system.
8. Range Scans: Several applications require efficient retrieval of a set of rows in a particular range. For example all the last 100 messages for a given user or the hourly impression counts over the last 24 hours for a given advertiser.
Some less tangible factors were also at work. Systems with existing production experience for Facebook and in-house expertise were greatly preferred. When considering open-source projects, the strength of the community was an important factor. Given the level of engineering investment in building and maintaining systems like these –– it also made sense to choose a solution that was broadly applicable (rather than adopt point solutions based on differing architecture and codebases for each workload).
You can find the full paper here later, but here are some highlights:
WHY HADOOP AND HBASE
The requirements for the storage system for our workloads can be summarized as follows:
1. Elasticity: We need to be able to add incremental capacity to our storage systems with minimal overhead and no downtime. In some cases we may want to add capacity rapidly and the system should automatically balance load and utilization across new hardware.
2. High write throughput: Most of the applications store (and optionally index) tremendous amounts of data and require high aggregate write throughput.
3. Efficient and low-latency strong consistency semantics within a data center: There are important applications like Messages that require strong consistency within a data center. This requirement often arises directly from user expectations. For example ‘‘unread’’ message counts displayed on the home page and the messages shown in the inbox page view should be consistent with respect to each other. While a globally distributed strongly consistent system is practically impossible, a system that could at least provide strong consistency within a data center would make it possible to provide a good user experience. We also knew that (unlike other Facebook applications), Messages was easy to federate so that a particular user could be served entirely out of a single data center making strong consistency within a single data center a critical requirement for the Messages project. Similarly, other projects, like realtime log aggregation, may be deployed entirely within one data center and are much easier to program if the system provides strong consistency guarantees.
4. Efficient random reads from disk: In spite of the widespread use of application level caches (whether embedded or via memcached), at Facebook scale, a lot of accesses miss the cache and hit the back-end storage system. MySQL is very efficient at performing random reads from disk and any new system would have to be comparable.
5. High Availability and Disaster Recovery: We need to provide a service with very high uptime to users that covers both planned and unplanned events (examples of the former being events like software upgrades and addition of hardware/capacity and the latter exemplified by failures of hardware components). We also need to be able to tolerate the loss of a data center with minimal data loss and be able to serve data out of another data center in a reasonable time frame.
6. Fault Isolation: Our long experience running large farms of MySQL databases has shown us that fault isolation is critical. Individual databases can and do go down, but only a small fraction of users are affected by any such event. Similarly, in our warehouse usage of Hadoop, individual disk failures affect only a small part of the data and the system quickly recovers from such faults.
7. Atomic read-modify-write primitives: Atomic increments and compare-and-swap APIs have been very useful in building lockless concurrent applications and are a must have from the underlying storage system.
8. Range Scans: Several applications require efficient retrieval of a set of rows in a particular range. For example all the last 100 messages for a given user or the hourly impression counts over the last 24 hours for a given advertiser.
It is also worth pointing out non-requirements:
1. Tolerance of network partitions within a single data center: Different system components are often inherently centralized. For example, MySQL servers may all be located within a few racks, and network partitions within a data center would cause major loss in serving capabilities therein. Hence every effort is made to eliminate the possibility of such events at the hardware level by having a highly redundant network design.
2. Zero Downtime in case of individual data center failure: In our experience such failures are very rare, though not impossible. In a less than ideal world where the choice of system design boils down to the choice of compromises that are acceptable, this is one compromise that we are willing to make given the low occurrence rate of such events. We might revise this non-requirement at a later time.
3. Active-active serving capability across different data centers: As mentioned before, we were comfortable making the assumption that user data could be federated across different data centers (based ideally on user locality). Latency (when user and data locality did not match up) could be masked by using an application cache close to the user.
Some less tangible factors were also at work. Systems with existing production experience for Facebook and in-house expertise were greatly preferred. When considering open-source projects, the strength of the community was an important factor. Given the level of engineering investment in building and maintaining systems like these –– it also made sense to choose a solution that was broadly applicable (rather than adopt point solutions based on differing architecture and codebases for each workload).
After considerable research and experimentation, we chose Hadoop and HBase as the foundational storage technology for these next generation applications. The decision was based on the state of HBase at the point of evaluation as well as our confidence in addressing the features that were lacking at that point via in- house engineering. HBase already provided a highly consistent, high write-throughput key-value store. The HDFS NameNode stood out as a central point of failure, but we were confident that our HDFS team could build a highly-available NameNode (AvatarNode) in a reasonable time-frame, and this would be useful for our warehouse operations as well. Good disk read-efficiency seemed to be within striking reach (pending adding Bloom filters to HBase’’s version of LSM Trees, making local DataNode reads efficient and caching NameNode metadata). Based on our experience operating the Hive/Hadoop warehouse, we knew HDFS was stellar in tolerating and isolating faults in the disk subsystem. The failure of entire large HBase/HDFS clusters was a scenario that ran against the goal of fault-isolation, but could be considerably mitigated by storing data in smaller HBase clusters. Wide area replication projects, both in-house and within the HBase community, seemed to provide a promising path to achieving disaster recovery.
HBase is massively scalable and delivers fast random writes as well as random and streaming reads. It also provides row-level atomicity guarantees, but no native cross-row transactional support. From a data model perspective, column-orientation gives extreme flexibility in storing data and wide rows allow the creation of billions of indexed values within a single table. HBase is ideal for workloads that are write-intensive, need to maintain a large amount of data, large indices, and maintain the flexibility to scale out quickly.
HBase is now being used by many other workloads internally at Facebook . I will describe these different workloads in a later post.
(Credit to the authors of the paper: Dhruba Borthakur Kannan Muthukkaruppan Karthik Ranganathan Samuel Rash Joydeep Sen Sarma Jonathan Gray Nicolas Spiegelberg Hairong Kuang Dmytro Molkov Aravind Menon Rodrigo Schmidt Amitanand Aiyer)
[フレーム]
Labels:
hadoop,
hadoop and facebook,
hbase,
hdfs,
sigmod and hbase,
sigmod and hdfs
Sunday, April 25, 2010
The Curse of the Singletons! The Vertical Scalability of Hadoop NameNode
Introduction
What are the bottlenecks of the NameNode?
Network: We have around 2000 nodes in our cluster and each node is running 9 mappers and 6 reducers simultaneously. This means that there are around 30K simultaneous clients requesting service from the NameNode. The Hive Metastore and the HDFS RaidNode imposes additional load on the NameNode. The Hadoop RPCServer has a singleton Listener Thread that pulls data from all incoming RPCs and hands it to a bunch of NameNode handler threads. Only after all the incoming parameters of the RPC are copied and deserialized by the Listener Thread does the NameNode handler threads get to process the RPC. One CPU core on our NameNode machine is completely consumed by the Listener Thread. This means that during times of high load, the Listener Thread is unable to copy and deserialize all incoming RPC data in time, thus leading to clients encountering RPC socket errors. This is one big bottleneck to vertically scalabiling of the NameNode.
CPU: The second bottleneck to scalability is the fact that most critical sections of the NameNode is protected by a singleton lock called the FSNamesystem lock. I had done some major restructuring of this code about three years ago via HADOOP-1269 but even that is not enough for supporting current workloads. Our NameNode machine has 8 cores but a fully loaded system can use at most only 2 cores simultaneously on the average; the reason being that most NameNode handler threads encounter serialization via the FSNamesystem lock.
Memory: The NameNode stores all its metadata in the main memory of the singleton machine on which it is deployed. In our cluster, we have about 60 million files and 80 million blocks; this requires the NameNode to have a heap size of about 58GB. This is huge! There isn't any more memory left to grow the NameNode's heap size! What can we do to support even greater number of files and blocks in our system?
Can we break the impasse?
RPC Server: We enhanced the Hadoop RPC Server to have a pool of Reader Threads that work in conjunction with the Listener Thread. The Listener Thread accepts a new connection from a client and then hands over the work of RPC-parameter-deserialization to one of the Reader Threads. In our case, we configured our system so that the Reader Threads consist of 8 threads. This change has doubled the number of RPCs that the NameNode can process at full throttle. This change has been contributed to the Apache code via HADOOP-6713.
The above change allowed a simulated workload to be able to consume 4 CPU cores out of a total of 8 CPU cores in the NameNode machine. Sadly enough, we still cannot get it to use all the 8 CPU cores!
FSNamesystem lock: A review of our workload showed that our NameNode typically has the following distribution of requests:
- stat a file or directory 47%
- open a file for read 42%
- create a new file 3%
- create a new directory 3%
- rename a file 2%
- delete a file 1%
The memory bottleneck issue is still unresolved. People have asked me if the NameNode can keep some portion of its metadata in disk, but this will require a change in locking model design first. One cannot keep the FSNamesystem lock while reading in data from the disk: this will cause all other threads to block thus throttling the performance of the NameNode. Could one use flash memory effectively here? Maybe an LRU cache of file system metadata will work well with current metadata access patterns? If anybody has good ideas here, please share it with the Apache Hadoop community.
In a Nutshell
The two proposed enhancements have improved NameNode scalability by a factor of 8. Sweet, isn't it?
Labels:
hadoop,
hadoop distributed file system,
hdfs,
hdfs scalability
Friday, October 2, 2009
I presented a set of slides that describes the Hadoop development at Facebook at the HadoopWorld conference in New York today. It was well received by more than 100 people. I have presented at many-a-conferences in the west coast but this is the first time I have presented at a conference in New York... there are more hadoop users here versus mostly hadoop developers in the west coast. There were plenty of questions, especially about Hadoop-Archive and Realtime-Hadoop. There were people asking me questions about HDFS Symbolic links and HDFS-scribe copier.
Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement. [フレーム]
Earlier, I visited the university of Notre Dame to conduct a department seminar and present a guest lecture for the graduate students at the Department of Computer Science. There is plenty of interesting research being led by Prof Douglas Thain. One interesting research idea that came up was to place HDFS block replicas by analyzing HDFS access patterns. It is possible that we can provide HDFS datanode/namenode logs to researchers who can analyze these logs to come up with better algorithms for HDFS block replica placement. [フレーム]
Monday, September 14, 2009
HDFS block replica placement in your hands now!
Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
Friday, August 28, 2009
HDFS and Erasure Codes (HDFS-RAID)
The Hadoop Distributed File System has been great in providing a cloud-type file system. It is robust (when administered correctly :-)) and highly scalable. However, one of the main drawbacks of HDFS is that each piece of data is replicated in three places. This is acceptable because disk storage is cheap and is becoming cheaper by the day; this isn't a problem if you have a relatively small to medium size cluster. The price difference (in absolute terms) is not much whether you use 15 disks or whether you use 10 disks. If we consider the cost of 1ドル per GByte, the price difference between fifteen 1 TB disk and ten 1 TB disk is only 5ドルK. But when the total size of your cluster is 10 PBytes, then the costs savings in storing the data in two places versus three is a huge ten million dollars!
The reason HDFS stores disk blocks in triplicate is because it uses commodity hardware and there is non-negligible probability of a disk failure. It has been observed that a replication factor of 3 and the fact the HDFS aggressively detects failures and immediately replicates failed -block-replicas is sufficient to never lose any data in practice. The challenge now is to achieve an effective replication factor of 3 while keeping the real physical replication factor at close to 2! How best to do it than by using Erasure Codes.
I heard about this idea called DiskReduce from the folks at CMU. The CMU PDL Labs has been a powerhouse of research in file systems and it is no surprise that they proposed a elegant way of implementing erasure codes in HDFS. I borrowed heavily from their idea in my implementation of Erasure Codes in HDFS described in HDFS-503. One of the main motivation of my design is to keep the HDFS Erasure Coding as a software layer above HDFS rather than inter-twining it inside of HDFS code. The HDFS code is complex by itself and it is really nice to not have to make it more complex and heavyweight.
Distributed Raid File System consists of two main software components. The first component is the RaidNode, a daemon that creates parity files from specified HDFS files. The second component "raidfs" is a software that is layered over a HDFS client and it intercepts all calls that an application makes to the HDFS client. If the HDFS client encounters corrupted data while reading a file, the raidfs client detects it; it uses the relevant parity blocks to recover the corrupted data (if possible) and returns the data to the application. The application is completely transparent to the fact that parity data was used to satisfy it's read request. The Distributed Raid File System can be configured in such a way that a set of data blocks of a file are combined together to form one or more parity blocks. This allows one to reduce the replication factor of a HDFS file from 3 to 2 while keeping the failure probabilty relatively same as before.
I have seen that using a stripe size of 10 blocks decreases the physical replication factor of a file to 2.2 while keeping the effective replication factor of a file at 3. This typically results in saving 25% to 30% of storage space in a HDFS cluster.
One of the shortcoming of this implementation is that we need a parity file for every file in HDFS. This potentially increases the number of files in the NameNode. To alleviate this problem, I will enhance this implementation (in future) to use the Hadoop Archive feature to archive all the parity files together in larger containers so that the NameNode does not have to support additional files when the HDFS Erasure Coding is switched on. This works reasonably well because it is a very very rare case that the parity files are ever used to satisfy a read request.
I am hoping that this feature becomes part of Hadoop 0.21 release scheduled for September 2009! [フレーム]
The reason HDFS stores disk blocks in triplicate is because it uses commodity hardware and there is non-negligible probability of a disk failure. It has been observed that a replication factor of 3 and the fact the HDFS aggressively detects failures and immediately replicates failed -block-replicas is sufficient to never lose any data in practice. The challenge now is to achieve an effective replication factor of 3 while keeping the real physical replication factor at close to 2! How best to do it than by using Erasure Codes.
I heard about this idea called DiskReduce from the folks at CMU. The CMU PDL Labs has been a powerhouse of research in file systems and it is no surprise that they proposed a elegant way of implementing erasure codes in HDFS. I borrowed heavily from their idea in my implementation of Erasure Codes in HDFS described in HDFS-503. One of the main motivation of my design is to keep the HDFS Erasure Coding as a software layer above HDFS rather than inter-twining it inside of HDFS code. The HDFS code is complex by itself and it is really nice to not have to make it more complex and heavyweight.
Distributed Raid File System consists of two main software components. The first component is the RaidNode, a daemon that creates parity files from specified HDFS files. The second component "raidfs" is a software that is layered over a HDFS client and it intercepts all calls that an application makes to the HDFS client. If the HDFS client encounters corrupted data while reading a file, the raidfs client detects it; it uses the relevant parity blocks to recover the corrupted data (if possible) and returns the data to the application. The application is completely transparent to the fact that parity data was used to satisfy it's read request. The Distributed Raid File System can be configured in such a way that a set of data blocks of a file are combined together to form one or more parity blocks. This allows one to reduce the replication factor of a HDFS file from 3 to 2 while keeping the failure probabilty relatively same as before.
I have seen that using a stripe size of 10 blocks decreases the physical replication factor of a file to 2.2 while keeping the effective replication factor of a file at 3. This typically results in saving 25% to 30% of storage space in a HDFS cluster.
One of the shortcoming of this implementation is that we need a parity file for every file in HDFS. This potentially increases the number of files in the NameNode. To alleviate this problem, I will enhance this implementation (in future) to use the Hadoop Archive feature to archive all the parity files together in larger containers so that the NameNode does not have to support additional files when the HDFS Erasure Coding is switched on. This works reasonably well because it is a very very rare case that the parity files are ever used to satisfy a read request.
I am hoping that this feature becomes part of Hadoop 0.21 release scheduled for September 2009! [フレーム]
Labels:
erasure code,
hadoop distributed file system,
hdfs,
raid
Saturday, June 6, 2009
HDFS Scribe Integration
It is finally here: you can configure the open source log-aggregator, scribe, to log data directly into the Hadoop distributed file system.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
ipc.client.idlethreshold 10000 Defines the threshold number of connections after which ipc.client.connection.maxidletime 10000 The maximum time in msec after which a client will bring down the ipc.client.connect.max.retries 2 Indicates the number of retries a client will make to establish ipc.server.listen.queue.size 128 Indicates the length of the listen queue for servers accepting ipc.server.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.client.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.ping.interval 5000 The Client sends a ping message to server every period. This is helpful ipc.client.connect.maxwaittime 5000 The Client waits for this much time for a socket connect call to be establised dfs.datanode.socket.write.timeout 20000 The dfs Client waits for this much time for a socket write call to the datanode. ipc.client.ping false HADOOP-2757
[フレーム]
Many Web 2.0 companies have to deploy a bunch of costly filers to capture weblogs being generated by their application. Currently, there is no option other than a costly filer because the write-rate for this stream is huge. The Hadoop-Scribe integration allows this write-load to be distributed among a bunch of commodity machines, thus reducing the total cost of this infrastructure.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
Making the HDFS write code path more real-time was painful. There are various timeouts/settings in HDFS that were hardcoded and needed to be changed to allow the application to fail fast. At the bottom of this blog-post, I am attaching the settings that we have currently configured to make the HDFS-write very real-timeish. The last of the JIRAS, HADOOP-2757 is in the pipeline to be committed to Hadoop trunk very soon.
What about Namenode being the single point of failure? This is acceptable in a warehouse type of application but cannot be tolerated by a realtime application. Scribe typically aggregates click-logs from a bunch of webservers, and losing *all* click log data of a website for a 10 minutes or so (minimum time for a namenode restart) cannot be tolerated. The solution is to configure two overlapping clusters on the same hardware. Run two separate namenodes N1 and N2 on two different machines. Run one set of datanode software on all slave machines that report to N1 and the other set of datanode software on the same set of slave machines that report to N2. The two datanode instances on a single slave machine share the same data directories. This configuration allows HDFS to be highly available for writes!
The highly-available-for-writes-HDFS configuration is also required for software upgrades on the cluster. We can shutdown one of the overlapping HDFS clusters, upgrade it to new hadoop software, and then put it back online before starting the same process for the second HDFS cluster.
What are the main changes to scribe that were needed? Scribe already had the feature that it buffers data when it is unable to write to the configured storage. The default scribe behaviour is to replay this buffer back to the storage when the storage is back online. Scribe is configured to support no-buffer-replay when the primary storage is back online. Scribe-hdfs is configured to write data to a cluster N1 and if N1 fails then it writes data to cluster N2. Scribe treats N1 and N2 as two equivalent primary stores. The scribe configuration should have fs_type=hdfs. For scribe compilation, you can use ./configure --enable-hdfs LDFLAGS="-ljvm -lhdfs". A good example for configuring scribe-hdfs is in a file called hdfs_example2.conf in the scribe code base.
Here are the settings for the Hadoop 0.17 configuration that is needed by an application doing writes in realtime:
connections will be inspected for idleness.
connection to the server.
a server connection.
client connections.
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
to detect socket connections that were idle and have been terminated by a failed server.
with the server.
Labels:
hadoop,
hadoop distributed file system,
hdfs,
scribe
Sunday, May 24, 2009
Better Late than Never
For quite a while, I have been thinking on blogging about Hadoop in general and Hadoop distributed file system (HDFS) in particular. Why, you may ask?
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Labels:
hadoop,
hadoop distributed file system,
hdfs
Subscribe to:
Posts (Atom)