Near Real Time (NRT) Indexing Tweets Using Flume
The following section describes how to use Flume to index tweets. Before beginning, complete the process of Preparing to Index Sample Tweets with Cloudera Search.
Install Flume
If you have not already done so, install Flume. For Cloudera Manager installations, Flume is included in CDH, and no additional action is required for installation. Add the Flume service to the cluster following the instructions in Adding a Service.
For instructions on installing Flume in an unmanaged environment, see Flume Installation.
Copy Configuration Template Files
Copy the configuration files as follows:
- Parcel-based Installation: For Cloudera Manager environments, the Flume agent is configured in a later section.
- Package-based Installation:
$ sudo cp -r $HOME/twitter_config /etc/flume-ng/conf/tweets $ sudo cp /usr/share/doc/search*/examples/solr-nrt/twitter-flume.conf \ /etc/flume-ng/conf/flume.conf $ sudo cp /usr/share/doc/search*/examples/solr-nrt/test-morphlines/tutorialReadAvroContainer.conf \ /etc/flume-ng/conf/morphline.conf
Configuring the Flume Solr Sink

- For parcel-based installations, use Cloudera Manager to edit the configuration files similar to the process described in Configuring the Flume Agents.
- For package-based installations, use command-line tools (such as vi) to edit files.
- Modify the Flume configuration for a single agent to specify the Flume source details and configure the flow. You must set the relative or absolute path to the morphline configuration
file.
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Agent Name to twitter-stream and modify Configuration
File as follows:
twitter-stream.sources = twitterSrc twitter-stream.channels = memoryChannel twitter-stream.sinks = solrSink twitter-stream.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource twitter-stream.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY twitter-stream.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET twitter-stream.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN twitter-stream.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET twitter-stream.sources.twitterSrc.maxBatchDurationMillis = 200 twitter-stream.sources.twitterSrc.channels = memoryChannel twitter-stream.channels.memoryChannel.type = memory twitter-stream.channels.memoryChannel.capacity = 10000 twitter-stream.channels.memoryChannel.transactionCapacity = 1000 twitter-stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink twitter-stream.sinks.solrSink.channel = memoryChannel twitter-stream.sinks.solrSink.morphlineFile = morphlines.conf
. Set - Package-based Installation: If you copied the configuration templates as described in Copy Configuration Template Files, no further action is required in this step.
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Agent Name to twitter-stream and modify Configuration
File as follows:
- Edit the Morphline configuration to specify Solr environment details.
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Morphlines File as follows. Make sure you are editing the same Flume agent as step 1. Edit the SOLR_LOCATOR entry only. Leave the rest of the configuration unedited.
SOLR_LOCATOR : { # Name of solr collection collection : tweets # ZooKeeper ensemble zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr" }
Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.
and edit the SOLR_LOCATOR directive in the - Package-based Installation: Edit the SOLR_LOCATOR section in /etc/flume-ng/conf/morphline.conf as follows:
SOLR_LOCATOR : { # Name of solr collection collection : tweets # ZooKeeper ensemble zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr" }
Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Morphlines File as follows. Make sure you are editing the same Flume agent as step 1. Edit the SOLR_LOCATOR entry only. Leave the rest of the configuration unedited.
- (Unmanaged environments only) Copy flume-env.sh.template to flume-env.sh:
$ sudo cp /etc/flume-ng/conf/flume-env.sh.template \ /etc/flume-ng/conf/flume-env.sh
- Update the Java heap size.
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Java Heap Size of Agent in Bytes to be 500 and choose MiB units. Click Save Changes. . Make sure you are editing the same Flume agent as step 1. Set
- Package-based Installation: Edit /etc/flume-ng/conf/flume-env.sh or /opt/cloudera/parcels/CDH/etc/flume-ng/conf/flume-env.sh, inserting or replacing JAVA_OPTS as follows:
JAVA_OPTS="-Xmx500m"
- (Optional) Modify Flume logging settings to facilitate monitoring and debugging:
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Agent Logging Advanced Configuration Snippet (Safety Valve) to include:
log4j.logger.org.apache.flume.sink.solr=DEBUG log4j.logger.org.kitesdk.morphline=TRACE
and modify - Package-based Installation: Run the following commands:
$ sudo bash -c 'echo "log4j.logger.org.apache.flume.sink.solr=DEBUG" >> \ /etc/flume-ng/conf/log4j.properties' $ sudo bash -c 'echo "log4j.logger.org.kitesdk.morphline=TRACE" >> \ /etc/flume-ng/conf/log4j.properties'
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Agent Logging Advanced Configuration Snippet (Safety Valve) to include:
- (Optional) In an unmanaged environment you can use SEARCH_HOME to
configure where Flume finds Cloudera Search dependencies for the Flume Solr Sink. For example, if you installed Flume from a tarball package, you can configure it to find required files by setting
SEARCH_HOME. To set SEARCH_HOME use a command similar to the following:
$ export SEARCH_HOME=/usr/lib/search
Alternatively, you can add the same setting to flume-env.sh.
In a Cloudera Manager managed environment, Cloudera Manager automatically updates the SOLR_HOME location with any additional required dependencies.
Configuring Flume Solr Sink to Access the Twitter Public Stream
Use the Twitter developer site to generate credentials to access the Twitter public stream:
- Sign in to https://apps.twitter.com with a Twitter account.
- On the Application Management page, click Create New App.
- Fill in the form to represent the Search installation. This can represent multiple clusters, and does not require the callback URL. Because this is not a publicly distributed application, the values you enter for the required name, description, and website fields are not important.
- Read and accept the Developer Agreement, then click Create your Twitter application at the bottom of the page.
- Click on the Keys and Access Tokens tab, then click Create my access token button at the bottom.
The Flume TwitterSource uses the Twitter 1.1 API, which requires authentication of both the consumer (application) and the user (you). Consider this information confidential, just like your regular Twitter credentials. Edit the Flume configuration and replace the following properties with the credentials you generated:
agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
To edit the Flume configuration:
- Parcel-based Installation: In the Cloudera Manager Admin Console, go to Configuration File parameter. . Make sure you are editing the correct Flume agent. Modify the
- Package-based Installation: Edit /etc/flume-ng/conf/flume.conf.
For authentication to succeed, you must make sure the system clock is set correctly on the Flume agent host that connects to Twitter. You can install NTP and keep the host synchronized by running the ntpd service, or manually synchronize by using the command sudo ntpdate pool.ntp.org. To confirm that the time is set correctly, make sure that the output of the command date --utc matches the time shown at http://www.timeanddate.com/worldclock/timezone/utc. You can also set the time manually using the date command.
Starting the Flume Agent
- Delete all existing documents in Solr:
$ solrctl collection --deletedocs tweets
- Start or restart the Flume agent configured in Configuring the Flume Solr Sink:
- Parcel-based Installation:
- Package-based Installation:
$ sudo /etc/init.d/flume-ng-agent restart
- Monitor progress in the Flume log file and watch for errors:
$ tail -f /var/log/flume-ng/flume*.log
After restarting the Flume agent, use the Cloudera Search GUI. For example, if you have a Solr server running on search01.example.com, go to http://search01.example.com:8983/solr/tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true in a browser to verify that new tweets have been ingested into Solr. The query sorts the result set such that the most recently ingested tweets are at the top, based on the created_at timestamp. If you rerun the query, new tweets show up at the top of the result set.
To print diagnostic information, such as the content of records as they pass through the morphline commands, enable TRACE log level diagnostics by adding the following to your log4j.properties file:
log4j.logger.org.kitesdk.morphline=TRACE
In Cloudera Manager, you can use the safety valve to enable TRACE log level.
Go to
. After setting this value, restart the service.Indexing a File Containing Tweets with Flume HTTPSource
HTTPSource lets you ingest data into Solr by POSTing a file over HTTP. HTTPSource sends data using a channel to a sink, in this case a SolrSink. For more information, see Flume Solr BlobHandler Configuration Options.
- Delete all existing documents in Solr:
$ sudo /etc/init.d/flume-ng-agent stop $ solrctl collection --deletedocs tweets
- Comment out TwitterSource in /etc/flume-ng/conf/flume.conf and uncomment HTTPSource:
# comment out “agent.sources = twitterSrc” # uncomment “agent.sources = httpSrc”
- Restart the Flume Agent:
$ sudo /etc/init.d/flume-ng-agent restart
- Send a file containing tweets to the HTTPSource:
- Parcel-based Installation:
$ curl --data-binary \ @/opt/cloudera/parcels/CDH/share/doc/search-*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \ 'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' \ --header 'Content-Type:application/octet-stream' --verbose
- Package-based Installation:
$ curl --data-binary \ @/usr/share/doc/search-*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \ 'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' \ --header 'Content-Type:application/octet-stream' --verbose
- Parcel-based Installation:
- Check the log for status or errors:
$ cat /var/log/flume-ng/flume.log
Use the Cloudera Search GUI at http://search01.example.com:8983/solr/collection1/select?q=*%3A*&wt=json&indent=true to verify that new tweets have been ingested into Solr as expected.
Indexing a File Containing Tweets with Flume SpoolDirectorySource
SpoolDirectorySource specifies a directory on a local disk that Flume monitors. Flume automatically transfers data from files in this directory to Solr. SpoolDirectorySource sends data over a channel to a sink, in this case a SolrSink.
- Delete all existing documents in Solr:
$ sudo /etc/init.d/flume-ng-agent stop $ solrctl collection --deletedocs tweets
- Comment out TwitterSource and HTTPSource in /etc/flume-ng/conf/flume.conf and uncomment
SpoolDirectorySource:
# Comment out "agent.sources = twitterSrc" # Comment out “agent.sources = httpSrc” “agent.sources = spoolSrc”
- Delete any old spool directory and create a new spool directory:
$ rm -fr /tmp/myspooldir $ sudo -u flume mkdir /tmp/myspooldir
- Restart the Flume Agent:
$ sudo /etc/init.d/flume-ng-agent restart
- Send a file containing tweets to the SpoolDirectorySource. To ensure no partial files are ingested, copy and then atomically move files:
- Parcel-based Installation:
$ sudo -u flume cp \ /opt/cloudera/parcels/CDH/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \ /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro $ sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro \ /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
- Package-based Installation:
$ sudo -u flume cp \ /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \ /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro $ sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro \ /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
- Parcel-based Installation:
- Check the log for status or errors.
$ cat /var/log/flume-ng/flume.log
- Check the completion status.
$ find /tmp/myspooldir
Use the Cloudera Search GUI to verify that the new tweets have been ingested into Solr. For example, access the following URL in a browser:
http://search01.example.com:8983/solr/collection1/select?q=*%3A*&wt=json&indent=true
Replace search01.example.com with a hostname running Solr server.
<< Using MapReduce Batch Indexing to Index Sample Tweets | ©2016 Cloudera, Inc. All rights reserved | Using Hue with Cloudera Search >> |
Terms and Conditions Privacy Policy |