Showing posts with label hadoop. Show all posts
Showing posts with label hadoop. Show all posts
Saturday, May 28, 2011
Realtime Hadoop usage at Facebook -- Part 2 - Workload Types
This is the second part of our SIGMOD-2011 paper that describes our use case for Apache Hadoop and Apache HBase in realtime workloads. You can find the first part here. We describe why Hadoop and HBase fits the requirements of each of these applications.
OUR WORKLOADS
1. Facebook Messaging
The latest generation of Facebook Messaging combines existing Facebook messages with e-mail, chat, and SMS. In addition to persisting all of these messages, a new threading model also requires messages to be stored for each participating user. As part of the application server requirements, each user will be sticky to a single data center.
1.2 Large Tables
As part of the product requirements, messages would not be deleted unless explicitly done so by the user, so each mailbox would grow indefinitely. As is typical with most messaging applications, messages are read only a handful of times when they are recent, and then are rarely looked at again. As such, a vast majority would not be read from the database but must be available at all times and with low latency, so archiving would be difficult. Storing all of a user’’s thousands of messages meant that we’’d have a database schema that was indexed by the user with an ever-growing list of threads and messages. With this type of random write workload, write performance will typically degrade in a system like MySQL as the number of rows in the table increases. The sheer number of new messages would also mean a heavy write workload, which could translate to a high number of random IO operations in this type of system.
1.3 Data Migration
One of the most challenging aspects of the new Messaging product was the new data model. This meant that all existing user’’s messages needed to be manipulated and joined for the new threading paradigm and then migrated to the new system. The ability to perform large scans, random access, and fast bulk imports would help to reduce the time spent migrating users to the new system.
2 Facebook Insights
Facebook Insights provides developers and website owners with access to real-time analytics related to Facebook activity across websites with social plugins, Facebook Pages, and Facebook Ads. Using anonymized data, Facebook surfaces activity such as impressions, click through rates and website visits. These analytics can help everyone from businesses to bloggers gain insights into how people are interacting with their content so they can optimize their services. Domain and URL analytics were previously generated in a periodic, offline fashion through our Hadoop and Hive analytics data warehouse. However, this does not yield a rich user experience as the data is only available several hours after it has occurred.
2.1 Realtime Analytics
The insights teams wanted to make statistics available to their users within seconds of user actions rather than the hours previously supported. This would require a large-scale, asynchronous queuing system for user actions as well as systems to process, aggregate, and persist these events. All of these systems need to be fault-tolerant and support more than a million events per second.
2.2 High Throughput Increments
To support the existing insights functionality, time and demographic-based aggregations would be necessary. However, these aggregations must be kept up-to-date and thus processed on the fly, one event at a time, through numeric counters. With millions of unique aggregates and billions of events, this meant a very large number of counters with an even larger number of operations against them.
3. Facebook Metrics System
At Facebook, all hardware and software feed statistics into a metrics collection system called ODS (Operations Data Store). For example, we may collect the amount of CPU usage on a given server or tier of servers, or we may track the number of write operations to an HBase cluster. For each node or group of nodes we track hundreds or thousands of different metrics, and engineers will ask to plot them over time at various granularities. While this application has hefty requirements for write throughput, some of the bigger pain points with the existing MySQL-based system are around the resharding of data and the ability to do table scans for analysis and time roll-ups. This use-case is gearing up to be in production very shortly.
3.2 Fast Reads of Recent Data and Table Scans
A vast majority of reads to the metrics system is for very recent, raw data, however all historical data must also be available. Recently written data should be available quickly, but the entire dataset will also be periodically scanned in order to perform time- based rollups.
OUR WORKLOADS
Before deciding on a particular software stack and whether or not to move away from our MySQL-based architecture, we looked at a few specific applications where existing solutions may be problematic. These use cases would have workloads that are challenging to scale because of very high write throughput, massive datasets, unpredictable growth, or other patterns that may be difficult or suboptimal in a sharded RDBMS environment.
1. Facebook Messaging
The latest generation of Facebook Messaging combines existing Facebook messages with e-mail, chat, and SMS. In addition to persisting all of these messages, a new threading model also requires messages to be stored for each participating user. As part of the application server requirements, each user will be sticky to a single data center.
1.1 High Write Throughput
With an existing rate of millions of messages and billions of instant messages every day, the volume of ingested data would be very large from day one and only continue to grow. The denormalized requirement would further increase the number of writes to the system as each message could be written several times.
1.2 Large Tables
As part of the product requirements, messages would not be deleted unless explicitly done so by the user, so each mailbox would grow indefinitely. As is typical with most messaging applications, messages are read only a handful of times when they are recent, and then are rarely looked at again. As such, a vast majority would not be read from the database but must be available at all times and with low latency, so archiving would be difficult. Storing all of a user’’s thousands of messages meant that we’’d have a database schema that was indexed by the user with an ever-growing list of threads and messages. With this type of random write workload, write performance will typically degrade in a system like MySQL as the number of rows in the table increases. The sheer number of new messages would also mean a heavy write workload, which could translate to a high number of random IO operations in this type of system.
1.3 Data Migration
One of the most challenging aspects of the new Messaging product was the new data model. This meant that all existing user’’s messages needed to be manipulated and joined for the new threading paradigm and then migrated to the new system. The ability to perform large scans, random access, and fast bulk imports would help to reduce the time spent migrating users to the new system.
2 Facebook Insights
Facebook Insights provides developers and website owners with access to real-time analytics related to Facebook activity across websites with social plugins, Facebook Pages, and Facebook Ads. Using anonymized data, Facebook surfaces activity such as impressions, click through rates and website visits. These analytics can help everyone from businesses to bloggers gain insights into how people are interacting with their content so they can optimize their services. Domain and URL analytics were previously generated in a periodic, offline fashion through our Hadoop and Hive analytics data warehouse. However, this does not yield a rich user experience as the data is only available several hours after it has occurred.
2.1 Realtime Analytics
The insights teams wanted to make statistics available to their users within seconds of user actions rather than the hours previously supported. This would require a large-scale, asynchronous queuing system for user actions as well as systems to process, aggregate, and persist these events. All of these systems need to be fault-tolerant and support more than a million events per second.
2.2 High Throughput Increments
To support the existing insights functionality, time and demographic-based aggregations would be necessary. However, these aggregations must be kept up-to-date and thus processed on the fly, one event at a time, through numeric counters. With millions of unique aggregates and billions of events, this meant a very large number of counters with an even larger number of operations against them.
3. Facebook Metrics System
At Facebook, all hardware and software feed statistics into a metrics collection system called ODS (Operations Data Store). For example, we may collect the amount of CPU usage on a given server or tier of servers, or we may track the number of write operations to an HBase cluster. For each node or group of nodes we track hundreds or thousands of different metrics, and engineers will ask to plot them over time at various granularities. While this application has hefty requirements for write throughput, some of the bigger pain points with the existing MySQL-based system are around the resharding of data and the ability to do table scans for analysis and time roll-ups. This use-case is gearing up to be in production very shortly.
3.1 Automatic Sharding
The massive number of indexed and time-series writes and the unpredictable growth patterns are difficult to reconcile on a sharded MySQL setup. For example, a given product may only collect ten metrics over a long period of time, but following a large rollout or product launch, the same product may produce thousands of metrics. With the existing system, a single MySQL server may suddenly be handling much more load than it can handle, forcing the team to manually re-shard data from this server onto multiple servers.
3.2 Fast Reads of Recent Data and Table Scans
A vast majority of reads to the metrics system is for very recent, raw data, however all historical data must also be available. Recently written data should be available quickly, but the entire dataset will also be periodically scanned in order to perform time- based rollups.
(Credit to the authors of the paper: Dhruba Borthakur Kannan Muthukkaruppan Karthik Ranganathan Samuel Rash Joydeep Sen Sarma Jonathan Gray Nicolas Spiegelberg Hairong Kuang Dmytro Molkov Aravind Menon Rodrigo Schmidt Amitanand Aiyer)
[フレーム]
Labels:
facebook and hdfs,
hadoop,
hadoop and facebook,
hbase,
sigmod and hbase
Tuesday, May 17, 2011
Realtime Hadoop usage at Facebook -- Part 1
Facebook recently deployed Facebook Messages, its first ever user-facing application built on the Apache Hadoop platform. It uses HDFS and HBase as core technologies for this solution. Since then, there are many more applications that have started to used HBase. We have gained some experience in deploying and operating HDFS and HBase at peta-byte scale for realtime-workloads and decided to write a paper detailing some of these insights. This paper will be published in SIGMOD 2011.
The requirements for the storage system for our workloads can be summarized as follows:
1. Elasticity: We need to be able to add incremental capacity to our storage systems with minimal overhead and no downtime. In some cases we may want to add capacity rapidly and the system should automatically balance load and utilization across new hardware.
2. High write throughput: Most of the applications store (and optionally index) tremendous amounts of data and require high aggregate write throughput.
3. Efficient and low-latency strong consistency semantics within a data center: There are important applications like Messages that require strong consistency within a data center. This requirement often arises directly from user expectations. For example ‘‘unread’’ message counts displayed on the home page and the messages shown in the inbox page view should be consistent with respect to each other. While a globally distributed strongly consistent system is practically impossible, a system that could at least provide strong consistency within a data center would make it possible to provide a good user experience. We also knew that (unlike other Facebook applications), Messages was easy to federate so that a particular user could be served entirely out of a single data center making strong consistency within a single data center a critical requirement for the Messages project. Similarly, other projects, like realtime log aggregation, may be deployed entirely within one data center and are much easier to program if the system provides strong consistency guarantees.
4. Efficient random reads from disk: In spite of the widespread use of application level caches (whether embedded or via memcached), at Facebook scale, a lot of accesses miss the cache and hit the back-end storage system. MySQL is very efficient at performing random reads from disk and any new system would have to be comparable.
5. High Availability and Disaster Recovery: We need to provide a service with very high uptime to users that covers both planned and unplanned events (examples of the former being events like software upgrades and addition of hardware/capacity and the latter exemplified by failures of hardware components). We also need to be able to tolerate the loss of a data center with minimal data loss and be able to serve data out of another data center in a reasonable time frame.
6. Fault Isolation: Our long experience running large farms of MySQL databases has shown us that fault isolation is critical. Individual databases can and do go down, but only a small fraction of users are affected by any such event. Similarly, in our warehouse usage of Hadoop, individual disk failures affect only a small part of the data and the system quickly recovers from such faults.
7. Atomic read-modify-write primitives: Atomic increments and compare-and-swap APIs have been very useful in building lockless concurrent applications and are a must have from the underlying storage system.
8. Range Scans: Several applications require efficient retrieval of a set of rows in a particular range. For example all the last 100 messages for a given user or the hourly impression counts over the last 24 hours for a given advertiser.
Some less tangible factors were also at work. Systems with existing production experience for Facebook and in-house expertise were greatly preferred. When considering open-source projects, the strength of the community was an important factor. Given the level of engineering investment in building and maintaining systems like these –– it also made sense to choose a solution that was broadly applicable (rather than adopt point solutions based on differing architecture and codebases for each workload).
You can find the full paper here later, but here are some highlights:
WHY HADOOP AND HBASE
The requirements for the storage system for our workloads can be summarized as follows:
1. Elasticity: We need to be able to add incremental capacity to our storage systems with minimal overhead and no downtime. In some cases we may want to add capacity rapidly and the system should automatically balance load and utilization across new hardware.
2. High write throughput: Most of the applications store (and optionally index) tremendous amounts of data and require high aggregate write throughput.
3. Efficient and low-latency strong consistency semantics within a data center: There are important applications like Messages that require strong consistency within a data center. This requirement often arises directly from user expectations. For example ‘‘unread’’ message counts displayed on the home page and the messages shown in the inbox page view should be consistent with respect to each other. While a globally distributed strongly consistent system is practically impossible, a system that could at least provide strong consistency within a data center would make it possible to provide a good user experience. We also knew that (unlike other Facebook applications), Messages was easy to federate so that a particular user could be served entirely out of a single data center making strong consistency within a single data center a critical requirement for the Messages project. Similarly, other projects, like realtime log aggregation, may be deployed entirely within one data center and are much easier to program if the system provides strong consistency guarantees.
4. Efficient random reads from disk: In spite of the widespread use of application level caches (whether embedded or via memcached), at Facebook scale, a lot of accesses miss the cache and hit the back-end storage system. MySQL is very efficient at performing random reads from disk and any new system would have to be comparable.
5. High Availability and Disaster Recovery: We need to provide a service with very high uptime to users that covers both planned and unplanned events (examples of the former being events like software upgrades and addition of hardware/capacity and the latter exemplified by failures of hardware components). We also need to be able to tolerate the loss of a data center with minimal data loss and be able to serve data out of another data center in a reasonable time frame.
6. Fault Isolation: Our long experience running large farms of MySQL databases has shown us that fault isolation is critical. Individual databases can and do go down, but only a small fraction of users are affected by any such event. Similarly, in our warehouse usage of Hadoop, individual disk failures affect only a small part of the data and the system quickly recovers from such faults.
7. Atomic read-modify-write primitives: Atomic increments and compare-and-swap APIs have been very useful in building lockless concurrent applications and are a must have from the underlying storage system.
8. Range Scans: Several applications require efficient retrieval of a set of rows in a particular range. For example all the last 100 messages for a given user or the hourly impression counts over the last 24 hours for a given advertiser.
It is also worth pointing out non-requirements:
1. Tolerance of network partitions within a single data center: Different system components are often inherently centralized. For example, MySQL servers may all be located within a few racks, and network partitions within a data center would cause major loss in serving capabilities therein. Hence every effort is made to eliminate the possibility of such events at the hardware level by having a highly redundant network design.
2. Zero Downtime in case of individual data center failure: In our experience such failures are very rare, though not impossible. In a less than ideal world where the choice of system design boils down to the choice of compromises that are acceptable, this is one compromise that we are willing to make given the low occurrence rate of such events. We might revise this non-requirement at a later time.
3. Active-active serving capability across different data centers: As mentioned before, we were comfortable making the assumption that user data could be federated across different data centers (based ideally on user locality). Latency (when user and data locality did not match up) could be masked by using an application cache close to the user.
Some less tangible factors were also at work. Systems with existing production experience for Facebook and in-house expertise were greatly preferred. When considering open-source projects, the strength of the community was an important factor. Given the level of engineering investment in building and maintaining systems like these –– it also made sense to choose a solution that was broadly applicable (rather than adopt point solutions based on differing architecture and codebases for each workload).
After considerable research and experimentation, we chose Hadoop and HBase as the foundational storage technology for these next generation applications. The decision was based on the state of HBase at the point of evaluation as well as our confidence in addressing the features that were lacking at that point via in- house engineering. HBase already provided a highly consistent, high write-throughput key-value store. The HDFS NameNode stood out as a central point of failure, but we were confident that our HDFS team could build a highly-available NameNode (AvatarNode) in a reasonable time-frame, and this would be useful for our warehouse operations as well. Good disk read-efficiency seemed to be within striking reach (pending adding Bloom filters to HBase’’s version of LSM Trees, making local DataNode reads efficient and caching NameNode metadata). Based on our experience operating the Hive/Hadoop warehouse, we knew HDFS was stellar in tolerating and isolating faults in the disk subsystem. The failure of entire large HBase/HDFS clusters was a scenario that ran against the goal of fault-isolation, but could be considerably mitigated by storing data in smaller HBase clusters. Wide area replication projects, both in-house and within the HBase community, seemed to provide a promising path to achieving disaster recovery.
HBase is massively scalable and delivers fast random writes as well as random and streaming reads. It also provides row-level atomicity guarantees, but no native cross-row transactional support. From a data model perspective, column-orientation gives extreme flexibility in storing data and wide rows allow the creation of billions of indexed values within a single table. HBase is ideal for workloads that are write-intensive, need to maintain a large amount of data, large indices, and maintain the flexibility to scale out quickly.
HBase is now being used by many other workloads internally at Facebook . I will describe these different workloads in a later post.
(Credit to the authors of the paper: Dhruba Borthakur Kannan Muthukkaruppan Karthik Ranganathan Samuel Rash Joydeep Sen Sarma Jonathan Gray Nicolas Spiegelberg Hairong Kuang Dmytro Molkov Aravind Menon Rodrigo Schmidt Amitanand Aiyer)
[フレーム]
Labels:
hadoop,
hadoop and facebook,
hbase,
hdfs,
sigmod and hbase,
sigmod and hdfs
Sunday, April 25, 2010
The Curse of the Singletons! The Vertical Scalability of Hadoop NameNode
Introduction
What are the bottlenecks of the NameNode?
Network: We have around 2000 nodes in our cluster and each node is running 9 mappers and 6 reducers simultaneously. This means that there are around 30K simultaneous clients requesting service from the NameNode. The Hive Metastore and the HDFS RaidNode imposes additional load on the NameNode. The Hadoop RPCServer has a singleton Listener Thread that pulls data from all incoming RPCs and hands it to a bunch of NameNode handler threads. Only after all the incoming parameters of the RPC are copied and deserialized by the Listener Thread does the NameNode handler threads get to process the RPC. One CPU core on our NameNode machine is completely consumed by the Listener Thread. This means that during times of high load, the Listener Thread is unable to copy and deserialize all incoming RPC data in time, thus leading to clients encountering RPC socket errors. This is one big bottleneck to vertically scalabiling of the NameNode.
CPU: The second bottleneck to scalability is the fact that most critical sections of the NameNode is protected by a singleton lock called the FSNamesystem lock. I had done some major restructuring of this code about three years ago via HADOOP-1269 but even that is not enough for supporting current workloads. Our NameNode machine has 8 cores but a fully loaded system can use at most only 2 cores simultaneously on the average; the reason being that most NameNode handler threads encounter serialization via the FSNamesystem lock.
Memory: The NameNode stores all its metadata in the main memory of the singleton machine on which it is deployed. In our cluster, we have about 60 million files and 80 million blocks; this requires the NameNode to have a heap size of about 58GB. This is huge! There isn't any more memory left to grow the NameNode's heap size! What can we do to support even greater number of files and blocks in our system?
Can we break the impasse?
RPC Server: We enhanced the Hadoop RPC Server to have a pool of Reader Threads that work in conjunction with the Listener Thread. The Listener Thread accepts a new connection from a client and then hands over the work of RPC-parameter-deserialization to one of the Reader Threads. In our case, we configured our system so that the Reader Threads consist of 8 threads. This change has doubled the number of RPCs that the NameNode can process at full throttle. This change has been contributed to the Apache code via HADOOP-6713.
The above change allowed a simulated workload to be able to consume 4 CPU cores out of a total of 8 CPU cores in the NameNode machine. Sadly enough, we still cannot get it to use all the 8 CPU cores!
FSNamesystem lock: A review of our workload showed that our NameNode typically has the following distribution of requests:
- stat a file or directory 47%
- open a file for read 42%
- create a new file 3%
- create a new directory 3%
- rename a file 2%
- delete a file 1%
The memory bottleneck issue is still unresolved. People have asked me if the NameNode can keep some portion of its metadata in disk, but this will require a change in locking model design first. One cannot keep the FSNamesystem lock while reading in data from the disk: this will cause all other threads to block thus throttling the performance of the NameNode. Could one use flash memory effectively here? Maybe an LRU cache of file system metadata will work well with current metadata access patterns? If anybody has good ideas here, please share it with the Apache Hadoop community.
In a Nutshell
The two proposed enhancements have improved NameNode scalability by a factor of 8. Sweet, isn't it?
Labels:
hadoop,
hadoop distributed file system,
hdfs,
hdfs scalability
Monday, October 19, 2009
Hadoop discussions at Microsoft Research
I was invited to present a talk about Hadoop File System Architecture at Microsoft Research at Seattle. This is a research group and is focussed on long-term research, so it is no surprise that they are interested in knowing how a growing company like Facebook is using Hadoop to its advantage.
I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!
The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.
Here are the slides I used for my presentation. [フレーム]
I met a few folks who chatted with me about how Microsoft SQL Server is being modified to handle large scala databases. These folks heartily agreed with a comment I made in my presentation that Dr. Dewitt and Dr. Stonebraker is missing the point when they are comparing performance numbers between Hadoop with traditional Database systems.... rather than comparing the scalability and fault-tolerance of these systems. I had learned some of the fundamentals of Database systems from Professor Dewitt during my graduate studies at Uiversity of Wisconsin Madison, but Dr Dewitt is a Microsoft employee now!
The fact that Facebook uses the SQL interface of Hive layered over Hadoop makes it even more interesting to Microsoft. They wanted to know the performance difference between Hive and PIG and would like to compare them to their distributed-SQL-Server software.
Here are the slides I used for my presentation. [フレーム]
Monday, September 14, 2009
HDFS block replica placement in your hands now!
Most Hadoop administrators set the default replication factor for their files to be three. The main assumption here is that if you keep three copies of the data, your data is safe. I have observed this to be true in the big clusters that we manage and operate. In actuality, administrators are managing two failure aspects: data corruption and data availability.
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
If all the datanodes on which the replicas of a block exist catch fire at the same time, then that data is lost and cannot be recovered. Or if an administrative error causes all the existing replicas of a block to be deleted, then it is a catastrophic failure. This is data corruption. On the other hand, if a rack switch goes down for sometime, the datanodes on that rack are in-accessible during that time. When that faulty rack switch is fixed, the data on the rack rejoins the HDFS cluster and life goes on as usual. This is a data avilability issue; in this case data was not corrupted or lost, it was just unavailable for some time. HDFS keeps three copies of a block on three different datanodes to protect against true data corruption. HDFS also tries to distribute these three replicas on more than one rack to protect against data availability issues. The fact that HDFS actively monitors any failed datanode(s) and upon failure detection immediately schedules re-replication of blocks (if needed) implies that three copies of data on three different nodes is sufficient to avoid corrupted files.
HDFS uses a simple but highly effective policy to allocate replicas for a block. If a process that is running on any of the HDFS cluster nodes open a file for writing a block, then one replica of that block is allocated on the same machine on which the client is running. The second replica is allocated on a randomly chosen rack that is different from the rack on which the first replica was allocated. The third replica is allocated on a randomly chosen machine on the same remote rack that was chosen in the earlier step. This means that a block is present on two unique racks. One point to note is that there is no relationship between replicas of different blocks of the same file as far as their location is concerned. Each block is allocated independently.
The above algorithm is great for availability and scalability. However, there are scenarios where co-locating many block of the same file on the same set of datanode(s) or rack(s) is beneficial for performance reasons. For example, if many blocks of the same file are present on the same datanode(s), a single mapper instance could process all these blocks using the CombineFileInputFormat. Similarly, if a dataset contains many small files that are co-located on the same datanode(s) or rack(s), one can use CombineFileInputFormat to process all these file together by using fewer mapper instances via CombineFileInputFormat. If an application always uses one dataset with another dataset (think Hive or Pig join), then co-locating these two datasets on the same set of datanodes is beneficial.
Another reason when one might want to allocate replicas using a different policy is to ensure that replicas and their parity blocks truly reside in different failure domains. The erasure code work in HDFS could effectively bring down the physical replication factor of a file to about 1.5 (while keeping the logical replication factor at 3) if it can place replicas of all blocks in a stripe more intelligently.
Yet another reason, however exotic, is to allow HDFS to place replicas based on the HeatMap of your cluster. If one of of the node in the cluster is at a higher temperature than that of another, then it might be better to prefer the cooler node while allocating a new replica. If you want to experiment with HDFS across two data centers, you might want to try out new policies for replica placement.
Well, now you can finally get your hands wet! HDFS-385 is part of the Hadoop trunk and will be part of the next major HDFS 0.21 release. This feature provides a way for the adventurous developer to write Java code that specifies how HDFS should allocate replicas of blocks of a file. The API is experimental in nature, and could change in the near future if we discover any in-efficiencies in it. Please let the Hadoop community know if you need any changes in this API or if you come across novel uses of this API. [フレーム]
Saturday, June 6, 2009
HDFS Scribe Integration
It is finally here: you can configure the open source log-aggregator, scribe, to log data directly into the Hadoop distributed file system.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
ipc.client.idlethreshold 10000 Defines the threshold number of connections after which ipc.client.connection.maxidletime 10000 The maximum time in msec after which a client will bring down the ipc.client.connect.max.retries 2 Indicates the number of retries a client will make to establish ipc.server.listen.queue.size 128 Indicates the length of the listen queue for servers accepting ipc.server.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.client.tcpnodelay true Turn on/off Nagle's algorithm for the TCP socket connection on ipc.ping.interval 5000 The Client sends a ping message to server every period. This is helpful ipc.client.connect.maxwaittime 5000 The Client waits for this much time for a socket connect call to be establised dfs.datanode.socket.write.timeout 20000 The dfs Client waits for this much time for a socket write call to the datanode. ipc.client.ping false HADOOP-2757
[フレーム]
Many Web 2.0 companies have to deploy a bunch of costly filers to capture weblogs being generated by their application. Currently, there is no option other than a costly filer because the write-rate for this stream is huge. The Hadoop-Scribe integration allows this write-load to be distributed among a bunch of commodity machines, thus reducing the total cost of this infrastructure.
The challenge was to make HDFS be real-timeish in behaviour. Scribe uses libhdfs which is the C-interface to the HDFs client. There were various bugs in libhdfs that needed to be solved first. Then came the FileSystem API. One of the major issues was that the FileSystem API caches FileSystem handles and always returned the same FileSystem handle when called from multiple threads. There was no reference counting of the handle. This caused problems with scribe, because Scribe is highly multi-threaded. A new API FileSystem.newInstance() was introduced to support Scribe.
Making the HDFS write code path more real-time was painful. There are various timeouts/settings in HDFS that were hardcoded and needed to be changed to allow the application to fail fast. At the bottom of this blog-post, I am attaching the settings that we have currently configured to make the HDFS-write very real-timeish. The last of the JIRAS, HADOOP-2757 is in the pipeline to be committed to Hadoop trunk very soon.
What about Namenode being the single point of failure? This is acceptable in a warehouse type of application but cannot be tolerated by a realtime application. Scribe typically aggregates click-logs from a bunch of webservers, and losing *all* click log data of a website for a 10 minutes or so (minimum time for a namenode restart) cannot be tolerated. The solution is to configure two overlapping clusters on the same hardware. Run two separate namenodes N1 and N2 on two different machines. Run one set of datanode software on all slave machines that report to N1 and the other set of datanode software on the same set of slave machines that report to N2. The two datanode instances on a single slave machine share the same data directories. This configuration allows HDFS to be highly available for writes!
The highly-available-for-writes-HDFS configuration is also required for software upgrades on the cluster. We can shutdown one of the overlapping HDFS clusters, upgrade it to new hadoop software, and then put it back online before starting the same process for the second HDFS cluster.
What are the main changes to scribe that were needed? Scribe already had the feature that it buffers data when it is unable to write to the configured storage. The default scribe behaviour is to replay this buffer back to the storage when the storage is back online. Scribe is configured to support no-buffer-replay when the primary storage is back online. Scribe-hdfs is configured to write data to a cluster N1 and if N1 fails then it writes data to cluster N2. Scribe treats N1 and N2 as two equivalent primary stores. The scribe configuration should have fs_type=hdfs. For scribe compilation, you can use ./configure --enable-hdfs LDFLAGS="-ljvm -lhdfs". A good example for configuring scribe-hdfs is in a file called hdfs_example2.conf in the scribe code base.
Here are the settings for the Hadoop 0.17 configuration that is needed by an application doing writes in realtime:
connections will be inspected for idleness.
connection to the server.
a server connection.
client connections.
the server. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
the client. Setting to true disables the algorithm and may decrease latency
with a cost of more/smaller packets.
to detect socket connections that were idle and have been terminated by a failed server.
with the server.
Labels:
hadoop,
hadoop distributed file system,
hdfs,
scribe
Sunday, May 24, 2009
Better Late than Never
For quite a while, I have been thinking on blogging about Hadoop in general and Hadoop distributed file system (HDFS) in particular. Why, you may ask?
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Firstly, I have been contacted by students from as far as Bangladesh and Fiji asking me questions about HDFS via email. This made me think that disseminating internal details about HDFS to the whole wide world would really benefit a lot of people. I like to interact with these budding engineers; and their questions, though elementary in nature, sometimes makes me really ruminate on why we adopted a particular design and not another. I will sprinkle a few of these examples next week.
Secondly, I visited a few Universities last month, among them Carnegie Mellon University and my alma-mater Univ of Wisconsin. On my flight, I was getting bored to death, because I really did not like the movie that was playing and I did not carry any material to read. (Usually I like to read and re-read Sherlock Holmes over and over again.) But like they say, " an idle mind is the devil's workhop".... I started to jot down some exotic design ideas about HDFS.... And lo behold, I have a list of ideas that I would like to share! I will post them next week as well. [フレーム]
Labels:
hadoop,
hadoop distributed file system,
hdfs
Subscribe to:
Posts (Atom)