We will discuss about collection of data into HBase directly through flume agent. In our previous posts under flume category, we have covered setup of flume agents for file roll, logger and HDFS sink types. In this, we are going to explore the details of HBase sink and its setup with live example.
As we have already covered File channel , Memory channel and JDBC Channel, so we will try to make use of Spillable memory channel in this agent setup to cover the usage of all flume supported channels. This does not mean that HBase sink needs only spillable memory channel and it equally works well with other channel types as well.
Even in source types, we have already covered Netcat Source, Exec Source, Avro Source, and Sequence Generator Source types, so we will try to explain one more source (Spooling directory source) in this agent setup.
Now lets create our agent Agent5 in flume.conf properties file under <FLUME_HOME/conf> directory.
Table of Contents
Flume data collection into Hbase – Spooling Directory Source, HBase Sink and Spillable Memory channel:
flume.conf file creation:
Add the below configuration properties in flume.conf file to create Agent5 with Spooling Directory source, spillable memory channel and HBase Sink.
### Agent5-Spooling Directory Source, Spillable Memory Channel and HBase Sink ###
Agent5.sources = spooldir-source
Agent5.channels = spillmem-channel
Agent5.sinks = hbase-sink
Agent5.sources.spooldir-source.type = spooldir
Agent5.sources.spooldir-source.spoolDir = /usr/lib/flume/spooldir
Agent5.sources.spooldir-source.fileHeader = false
Agent5.sinks.hbase-sink.type = hbase
Agent5.sinks.hbase-sink.table = test_table
Agent5.sinks.hbase-sink.columnFamily = test_cf
# Use a channel which buffers events in file
Agent5.channels.spillmem-channel.type = SPILLABLEMEMORY
Agent5.channels.spillmem-channel.memoryCapacity = 10000
Agent5.channels.spillmem-channel.overflowCapacity = 1000000
Agent5.channels.spillmem-channel.byteCapacity = 80000
Agent5.channels.spillmem-channel.checkpointDir = /var/log/flume/checkpoint/
# Bind the source and sink to the channel
Agent5.sources.spooldir-source.channels = spillmem-channel
Agent5.sinks.hbase-sink.channel = spillmem-channel
Configuration Before Agent Start up:
- Before starting this agent, we need to make sure below things are ready.
- Start Hadoop and Yarn daemons. Also start Hbase daemons. make sure all the daemons are started running properly otherwise we will enter into hell lot of issues. For any assistance of Hadoop installation and running daemons we can refer to our previous posts under Hadoop category and for the same on Hbase, refer to hbase installation post. Below commands will be helpful for performing these activities.
7726 org.apache.hadoop.hbase.master.HMaster start
27400 sun.tools.jps.Jps -lm
- In Hbase, Create the table with column family specified in flume.conf file.
$ hbase shell
> create 'test_table', 'test_cf'
Below is the screen shot of terminal for creation of hbase table through hbase shell after starting all daemons. In our agent, test_table and test_cf are table and column families respectively.
- Create the folder specified for spooling directory path, and make sure that flume user should have read+write+execute access to that folder. In our agent, it is /usr/lib/flume/spooldir directory.
$ sudo mkdir /usr/lib/flume/spooldir
$ sudo chmod -R 777 /usr/lib/flume/spooldir/
- We will copy our input files into spool directory, from which flume will write each line as a new row into Hbase table. We will copy the input file wordcount.hql into spooling directory and below are the contents of wordcount.hql file.
CREATE TABLE docs (line STRING);
LOAD DATA INPATH '/input/word_in.txt' OVERWRITE INTO TABLE docs;
CREATE TABLE word_counts AS
SELECT word, count(1) AS count FROM
(SELECT explode(split(line, '\\s')) AS word FROM docs) w
GROUP BY word
ORDER BY word;
And also we will copy one more input file id.pig into spooling directory and below are the contents it.
A = load '/passwd' using PigStorage(':');
B = foreach A generate $0 as id;
store B into '/id.out';
So there are total of 10 (7+3) lines of input from two files. And each line will be treated as one event in flume.
Below is the snapshot of spool directory before starting the agent:
- Make sure that flume user has full accesses to the directories mentioned in <checkpointDir> and <dataDirs> specified in spillable memory channel configuration.
Start Agent :
After confirming that all the above configuration is successful and there are no issues then we are ready to start the agent otherwise we will end up in undesired error messages or java exceptions.
Start the agent with below command.
$ flume-ng agent --conf $FLUME_CONF_DIR --conf-file $FLUME_CONF_DIR/flume.conf --name Agent5
After a few seconds (as there are only two small input files, a few seconds will be sufficient to copy into Hbase) stop the agent by pressing ctrl+c key.
Verify the Output:
We can verify this agent process both at source level and at sink level as well.
Verification at source:
If we verify the spool directory after the agent is started and stopped, then the files which were placed as input to copy events into hbase will be renamed to .COMPLETED files, once all the events in a file are successfully copied into channel. Thus we can see all the files replaced with their corresponding .COMPLETED files. Below is the snapshot of spool directory after agent is stopped.
In the above screen we can see id.pig.COMPLETED and wordcount.hql.COMPLETED files created in place of original input files.
Verification at sink:
We can verify the events at sink level by connecting to hbase shell and scanning test_table contents.
$ hbase shell
hbase(main):001:0> scan 'test_table'
And we can see all the 10 lines of input inserted into pCol column of test_cf column family in test_table in the below screen shot.
As all the 10 input lines are inserted as 10 rows into our destination table at HBase, We successfully configured the flume agent to capture data onto HBase sink.
In below sections we will go into detailed descriptions of each component used to configure our above flume agent.
Spooling Directory Source:
In Spooling directory source, a spool directory will be configured in which users/applications are allowed to place their files, which need to be processed by flume agent. This source will watch the specified directory for new files, and will parse events out of new files as they appear. Once a given file is successfully read into the channel, it is either renamed to .COMPLETED file or deleted.
Only uniquely-named files must be dropped into the spooling directory. This source will report problems and stop processing in below scenarios.
- If a file is opened to write contents into it after it is placed into the spooling directory.
- If a file name is reused at a later time.
To avoid the above issues, it would be useful to add a unique identifier such as a timestamp to log file names when they are moved into the spooling directory.
Below is the property table for spooling directory source and required properties are in bold.
|type||–||The component type name, needs to be spooldir.|
|spoolDir||–||The directory from which to read files from.|
|fileSuffix||.COMPLETED||Suffix to append to completely ingested files|
|deletePolicy||never||When to delete completed files: never or immediate|
|fileHeader||FALSE||Whether to add a header storing the absolute path filename.|
|fileHeaderKey||file||Header key to use when appending absolute path filename to event header.|
|basenameHeader||FALSE||Whether to add a header storing the basename of the file.|
|basenameHeaderKey||basename||Header Key to use when appending basename of file to event header.|
|ignorePattern||^$||Regular expression specifying which files to ignore (skip)|
|trackerDir||.flumespool||Directory to store metadata related to processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.|
|consumeOrder||oldest||In which order files in the spooling directory will be consumed oldest, youngest and random. In case of oldestand youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest laxicographical order will be consumed first. In case ofrandom any file will be picked randomly.|
|maxBackoff||4000||The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full.|
|batchSize||100||Granularity at which to batch transfer to the channel|
|inputCharset||UTF-8||Character set used by deserializers that treat the input file as text.|
|decodeErrorPolicy||FAIL||What to do when we see a non-decodable character in the input file. FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.|
|deserializer||LINE||Specify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder.|
Spillable Memory Channel:
Spillable Memory Channel can be treated as a combination of memory channel & file channel. It is introduced to overcome the limitations of memory channel of losing events when memory queue is filled. It uses below two storage mechanisms.
- In-memory queue
It stores events primarily in an in-memory queue and once the queue is filled, additional incoming events are stored on a disk backed up by the file channel.
This channel is ideal for flows that need high throughput of memory channel during normal operation, but at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages
Below is the property table for spillable memory channel and required properties are in bold.
|type||–||The component type name, needs to be SPILLABLEMEMORY|
|memoryCapacity||10000||Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.|
|overflowCapacity||100000000||Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.|
|overflowTimeout||3||The number of seconds to wait before enabling disk overflow when memory fills up.|
|byteCapacityBufferPercentage||20||Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers. See below.|
|byteCapacity||see description||Maximum bytes of memory allowed as a sum of all events in the memory queue.|
|avgEventSize||500||Estimated average size of events, in bytes, going into the channel|
|<file channel properties>||see file channel||Use ‘overflowCapacity’ to set the File channel’s|
- To disable the use of the in-memory queue and function like a file channel, we can set the property memoryCapacity = 0 and need to provide overflowCapacity, checkpointDir & dataDirs properties of channel.
- To disable the use of overflow disk and function as a in-memory channel, we can set the property overflowCapacity = 0 and can omit checkpointDir & dataDirs properties but need to specify memoryCapacity value to non-zero value.
This sink reads events from a channel and writes them to HBase. The Hbase configuration is picked up from the first hbase-site.xml encountered in the classpath. A class implementing HbaseEventSerializer which is specified by the configuration is used to convert the events into HBase puts and/or increments. These puts and increments are then written to HBase.
This sink supports batch reading of events from the channel, to minimize the number
of flushes on the hbase tables. To use this sink, it has to be configured with certain mandatory parameters:
- table – The name of the table in Hbase to write to.
- columnFamily: The column family in Hbase to write to.
This sink will commit each transaction if the table’s write buffer size is reached or if the number of events in the current transaction reaches the batch size, whichever comes first.
Flume provides two serializers for HBase sink. The SimpleHbaseEventSerializer (org.apache.flume.sink.hbase.SimpleHbaseEventSerializer) writes the event body as-is to HBase, and optionally increments a column in Hbase. This is primarily an example implementation. The RegexHbaseEventSerializer (org.apache.flume.sink.hbase.RegexHbaseEventSerializer) breaks the event body based on the given regex and writes each part into different columns.
Below is the property table for HBase sink and required properties are in bold.
|type||–||The component type name, needs to be hbase|
|table||–||The name of the table in Hbase to write to.|
|columnFamily||–||The column family in Hbase to write to.|
|zookeeperQuorum||–||The quorum spec. This is the value for the propertyhbase.zookeeper.quorum in hbase-site.xml|
|znodeParent||/hbase||The base path for the znode for the -ROOT- region. Value ofzookeeper.znode.parent in hbase-site.xml|
|batchSize||100||Number of events to be written per txn.|
|coalesceIncrements||false||Should the sink coalesce multiple increments to a cell per batch.|
|serializer||see description||org.apache.flume.sink.hbase.SimpleHbaseEventSerializer. Default increment column = “iCol”, payload column = “pCol”.|