Thursday, February 26, 2015

Considerations on a Hadoop ETL

The first step when we want to process some data with Hadoop is to load it in HDFS.  HDFS is a robust fault tolerant distributed  file system. It gives you unified seamless access to a set of distributed hard disk partitions and also automatic replication, all of it running on commodity and even heterogeneous hardware. For all the capabilities it provides it has its caveats:

- It is written in java on top of the O.S. file system, so it demands much more cpu and memory than a traditional filesystem.
- It needs an orchestrator master process running in a machine, the Namenode.  This is both a single point of failure and a bottleneck.

You need to keep in mind these ideas when loading data in HDFS.

Don't stream the files directly to HDFS

This is generally a bad idea whatever the system you are loading into. You need some basic ETL proxy application to at least keep track of the data already streamed, The process has to be monitored so it can be resumed, and it is nice also to have a fault tolerant implementation.

In addition with HDFS you have to avoid storing a lot of small files. The name node have to keep track of every file in the system and it is memory-limited, the more file you create in HDFS, more memory is used by the name node.

In addition to that, the mapper tasks of a Hadoop job are designed to be spawned by file split, giving that a file has at least one, so per each file at least one mapper task will be spawned. To process 10 files of 10 megabytes , 10 mapper task will be needed instead of 1 for an unique file of 100 megabytes.

The current standard for this proxy application is Apache Flume, a project originally developed by Cloudera.

Use some proxy streaming tool like Apache Flume

Out of the box it seems that Flume can cover all out ingestion needs, and specifically two critical points:

- The input files are processed row per line and grouped in HDFS at new files. We can configure the desired size of this target files to make them match with the default HDFS block size of 128mb, that will be also the mapper task file split.

- We can also configure the tool to persist to disk a buffer of lines read (events in Flume parlance) still to be processed in case there is sudden network slowness/failures, NameNode maintenance downtime etc..

On top of that Flume supports some degree of HA and load balancing allowing the deployment of different "sinks" that  write to HDFS in different machines, the data is sent from the source serialized with Avro.

Some Flume pitfalls. Introducing Apache Kafka.

In our experience working with Apache Flume we found a set of different problems.

- The tool seems a bit immature in several aspects:

Duplicated files are not handled very well: Imagine that you have no control on your input source directory and your client can leave there the same file you processed an hour ago and you are keeping your processed file (auto-renamed with the .PROCESSED suffix), that causes flume to enter an irrecoverable state, you have literally to restart the offending agent:
 https://issues.apache.org/jira/browse/FLUME-2119

Zero-bytes files are also a problem.  When processing a zero byte files, the agent goes also into an irrecoverable state. To resume you have to delete the offending file and restart the agent. This seems to be resolved in the last version:
https://issues.apache.org/jira/browse/FLUME-2525

Spureous "java.io.IOException: Not a data file" exceptions. We did not found the exact cause for this. This was related to the metadata generated for the avro serialization, removing that metadata solved the issue.

- Performance was not good, two aspects in particular:
    Storing to the persistent channel
    File aggregation in HDFS.

So, how to address this issues?, Part of the solution is using a fast reliable HA, event persistent store like apache Kafka as the flume sink, it is fast, provide a robust high availability and the HDFS ingestion can be parallelized using Camus.

Flume can be configured to use it like a file streamer (no HA, no avro, no persistent channel) to insert the file lines into Kafka and Kafka in turn will be polled by the Camus job to retrieve the data.

Not all is covered out of the box, not even common cases


One of our ETL cases was fairly common, a client will ftp log files to a server, and those files are the imput for the ETL. As you imagine in this cases you have little control on the file format, delivery policy of frequency, or at least that was our case and we found several important issues we have to deal with:

         Zero byte files.  Since Flume did not like this type of files we have a staging folder were the files were ftped to and there we periodically cleaned this files before moving the rest to the Flume input folder.
          Duplicate files. Again, Flume will choke on this, the same application that removed the zero byte files will check and remove duplicates.
          Compressed files. Flume does not support compressed formats (rar in our case), so we had also to decompress in the staging folder.
          Determine if a file transfer is finished. We found no reliable mechanism in Linux to indicate that a ftp transfer for a file was finished.  At the end the solution was to check the last time the file was modified in the staging folder, and process only the ones not touched for a significant period (for example 30 mins).
          File content overlap. In our experience, some log files can contain entries already present on previous processed ones, more on this in the next section.

Hive can help


One of the points of a Hadoop ETL is parallelization, we already have seen how to leverage Hadoop by using the Camus map reduce job.

We can also make some parallel filtering using Hadoop. In our case we use it to remove data redundancy caused by file overlap. We use Hive to do that:

  • Set two staging directories in HDFS, with the current ingestion batch finished files and other with the previous one.
  • Create two external schema tables on these directories using a regex serde.
  • Periodically execute a hive query on these two tables to output the rows present in the current table and not in the previous, this output will then be stored as a columnar RCfile binary file compressed with snappy. Then we remove the old batch folder files and move there the current folder ones.

We have now completed the HDFS storage process, our files are compressed by column and have the size of the HDFS block, prepared for an optimum map reduce job processing.

A word on monitoring


It's essential to monitor your ETL process, this works at two levels:

  • Monitor the processes involved, (Flume, Kafka, custom processes), with a proper tool and implement also an alert system. For example Ganglia+Nagios
  • Monitor the data itself to check if there are problems in the ingestion. The problem here is setting thresholds for alerts, You can base that in previous statistics gathered, like the average data ingested per day or if your data is a time series like a periodic log, on detecting time gaps.

Conclusions


  • Know your weaknesses to overcome them: HDFS is not good dealing with a lot of small files.
  • Use your strong points: Parallelize everything you can.
  • Watch the performance and use an HA persistent buffered proxy.
  • Don't neglect monitoring.

2 comments:

  1. Your post about Hadoop Training in Chennai was very helpful to me. Very clear step-by-step instructions. I appreciate your hard work and thanks for sharing.
    Hadoop Training Chennai
    Big Data Training in Chennai

    ReplyDelete
  2. It was really a nice article and i was really impressed by reading this Hadoop Administration Online Training India

    ReplyDelete