This is the documentation for Cloudera Enterprise 5.12.x. Documentation for other versions is available at Cloudera Documentation.

Near Real Time (NRT) Indexing Tweets Using Flume

The following section describes how to use Flume to index tweets. Before beginning, complete the process of Preparing to Index Sample Tweets with Cloudera Search.

Install Flume

If you have not already done so, install Flume. For Cloudera Manager installations, Flume is included in CDH, and no additional action is required for installation. Add the Flume service to the cluster following the instructions in Adding a Service.

For instructions on installing Flume in an unmanaged environment, see Flume Installation.

Copy Configuration Template Files

Copy the configuration files as follows:

  • Parcel-based Installation: For Cloudera Manager environments, the Flume agent is configured in a later section.
  • Package-based Installation:
    $ sudo cp -r $HOME/twitter_config /etc/flume-ng/conf/tweets
    $ sudo cp /usr/share/doc/search*/examples/solr-nrt/twitter-flume.conf \
    /etc/flume-ng/conf/flume.conf
    $ sudo cp /usr/share/doc/search*/examples/solr-nrt/test-morphlines/tutorialReadAvroContainer.conf \
    /etc/flume-ng/conf/morphline.conf

Configuring the Flume Solr Sink

  Warning: Using more than one Flume agent for this tutorial can result in blocked access to the Twitter public streams. When you configure the Flume agent as described in this section, make sure that you are configuring a single agent on a single host, and not the entire Flume service.
This topic describes how to configure the Flume Solr Sink for both parcel-based and package-based installations:
  • For parcel-based installations, use Cloudera Manager to edit the configuration files similar to the process described in Configuring the Flume Agents.
  • For package-based installations, use command-line tools (such as vi) to edit files.
  1. Modify the Flume configuration for a single agent to specify the Flume source details and configure the flow. You must set the relative or absolute path to the morphline configuration file.
    • Parcel-based Installation: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. Set Agent Name to twitter-stream and modify Configuration File as follows:
      twitter-stream.sources = twitterSrc
      twitter-stream.channels = memoryChannel
      twitter-stream.sinks = solrSink
      
      twitter-stream.sources.twitterSrc.type = org.apache.flume.source.twitter.TwitterSource
      twitter-stream.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
      twitter-stream.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
      twitter-stream.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
      twitter-stream.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET
      twitter-stream.sources.twitterSrc.maxBatchDurationMillis = 200
      twitter-stream.sources.twitterSrc.channels = memoryChannel
      
      twitter-stream.channels.memoryChannel.type = memory
      twitter-stream.channels.memoryChannel.capacity = 10000
      twitter-stream.channels.memoryChannel.transactionCapacity = 1000
      
      twitter-stream.sinks.solrSink.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
      twitter-stream.sinks.solrSink.channel = memoryChannel
      twitter-stream.sinks.solrSink.morphlineFile = morphlines.conf
    • Package-based Installation: If you copied the configuration templates as described in Copy Configuration Template Files, no further action is required in this step.
  2. Edit the Morphline configuration to specify Solr environment details.
    • Parcel-based Installation: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration and edit the SOLR_LOCATOR directive in the Morphlines File as follows. Make sure you are editing the same Flume agent as step 1. Edit the SOLR_LOCATOR entry only. Leave the rest of the configuration unedited.
      SOLR_LOCATOR : {
        # Name of solr collection
        collection : tweets
      
        # ZooKeeper ensemble
        zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr"
      }

      Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.

    • Package-based Installation: Edit the SOLR_LOCATOR section in /etc/flume-ng/conf/morphline.conf as follows:
      SOLR_LOCATOR : {
        # Name of solr collection
        collection : tweets
      
        # ZooKeeper ensemble
        zkHost : "zk01.example.com:2181,zk02.example.com:2181,zk03.example.com:2181/solr"
      }

      Replace the example ZooKeeper hostnames with the hostnames of your ZooKeeper servers.

  3. (Unmanaged environments only) Copy flume-env.sh.template to flume-env.sh:
    $ sudo cp /etc/flume-ng/conf/flume-env.sh.template \
    /etc/flume-ng/conf/flume-env.sh
  4. Update the Java heap size.
    • Parcel-based Installation: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration > Category > Resource Management. Make sure you are editing the same Flume agent as step 1. Set Java Heap Size of Agent in Bytes to be 500 and choose MiB units. Click Save Changes.
    • Package-based Installation: Edit /etc/flume-ng/conf/flume-env.sh or /opt/cloudera/parcels/CDH/etc/flume-ng/conf/flume-env.sh, inserting or replacing JAVA_OPTS as follows:
      JAVA_OPTS="-Xmx500m"
  5. (Optional) Modify Flume logging settings to facilitate monitoring and debugging:
    • Parcel-based Installation: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration > Advanced and modify Agent Logging Advanced Configuration Snippet (Safety Valve) to include:
      log4j.logger.org.apache.flume.sink.solr=DEBUG
      log4j.logger.org.kitesdk.morphline=TRACE
    • Package-based Installation: Run the following commands:
      $ sudo bash -c 'echo "log4j.logger.org.apache.flume.sink.solr=DEBUG" >> \
      /etc/flume-ng/conf/log4j.properties'
      $ sudo bash -c 'echo "log4j.logger.org.kitesdk.morphline=TRACE" >> \
      /etc/flume-ng/conf/log4j.properties'
  6. (Optional) In an unmanaged environment you can use SEARCH_HOME to configure where Flume finds Cloudera Search dependencies for the Flume Solr Sink. For example, if you installed Flume from a tarball package, you can configure it to find required files by setting SEARCH_HOME. To set SEARCH_HOME use a command similar to the following:
    $ export SEARCH_HOME=/usr/lib/search

    Alternatively, you can add the same setting to flume-env.sh.

    In a Cloudera Manager managed environment, Cloudera Manager automatically updates the SOLR_HOME location with any additional required dependencies.

Configuring Flume Solr Sink to Access the Twitter Public Stream

Use the Twitter developer site to generate credentials to access the Twitter public stream:

  1. Sign in to https://apps.twitter.com with a Twitter account.
  2. On the Application Management page, click Create New App.
  3. Fill in the form to represent the Search installation. This can represent multiple clusters, and does not require the callback URL. Because this is not a publicly distributed application, the values you enter for the required name, description, and website fields are not important.
  4. Read and accept the Developer Agreement, then click Create your Twitter application at the bottom of the page.
  5. Click on the Keys and Access Tokens tab, then click Create my access token button at the bottom.

The Flume TwitterSource uses the Twitter 1.1 API, which requires authentication of both the consumer (application) and the user (you). Consider this information confidential, just like your regular Twitter credentials. Edit the Flume configuration and replace the following properties with the credentials you generated:

agent.sources.twitterSrc.consumerKey = YOUR_TWITTER_CONSUMER_KEY
agent.sources.twitterSrc.consumerSecret = YOUR_TWITTER_CONSUMER_SECRET
agent.sources.twitterSrc.accessToken = YOUR_TWITTER_ACCESS_TOKEN
agent.sources.twitterSrc.accessTokenSecret = YOUR_TWITTER_ACCESS_TOKEN_SECRET

To edit the Flume configuration:

  • Parcel-based Installation: In the Cloudera Manager Admin Console, go to Flume service > Instances > Agent (select one) > Configuration. Make sure you are editing the correct Flume agent. Modify the Configuration File parameter.
  • Package-based Installation: Edit /etc/flume-ng/conf/flume.conf.

For authentication to succeed, you must make sure the system clock is set correctly on the Flume agent host that connects to Twitter. You can install NTP and keep the host synchronized by running the ntpd service, or manually synchronize by using the command sudo ntpdate pool.ntp.org. To confirm that the time is set correctly, make sure that the output of the command date --utc matches the time shown at http://www.timeanddate.com/worldclock/timezone/utc. You can also set the time manually using the date command.

Starting the Flume Agent

  1. Delete all existing documents in Solr:
    $ solrctl collection --deletedocs tweets
  2. Start or restart the Flume agent configured in Configuring the Flume Solr Sink:
    • Parcel-based Installation: Flume service > Actions > Restart
    • Package-based Installation:
      $ sudo /etc/init.d/flume-ng-agent restart
  3. Monitor progress in the Flume log file and watch for errors:
    $ tail -f /var/log/flume-ng/flume*.log

After restarting the Flume agent, use the Cloudera Search GUI. For example, if you have a Solr server running on search01.example.com, go to http://search01.example.com:8983/solr/tweets/select?q=*%3A*&sort=created_at+desc&wt=json&indent=true in a browser to verify that new tweets have been ingested into Solr. The query sorts the result set such that the most recently ingested tweets are at the top, based on the created_at timestamp. If you rerun the query, new tweets show up at the top of the result set.

To print diagnostic information, such as the content of records as they pass through the morphline commands, enable TRACE log level diagnostics by adding the following to your log4j.properties file:

log4j.logger.org.kitesdk.morphline=TRACE

In Cloudera Manager, you can use the safety valve to enable TRACE log level.

Go to Menu Services > Flume > Configuration > View and Edit > Agent > Advanced > Agent Logging Advanced Configuration Snippet (Safety Valve). After setting this value, restart the service.

Indexing a File Containing Tweets with Flume HTTPSource

HTTPSource lets you ingest data into Solr by POSTing a file over HTTP. HTTPSource sends data using a channel to a sink, in this case a SolrSink. For more information, see Flume Solr BlobHandler Configuration Options.

  1. Delete all existing documents in Solr:
    $ sudo /etc/init.d/flume-ng-agent stop
    $ solrctl collection --deletedocs tweets
  2. Comment out TwitterSource in /etc/flume-ng/conf/flume.conf and uncomment HTTPSource:
    # comment out “agent.sources = twitterSrc”
    # uncomment “agent.sources = httpSrc”
  3. Restart the Flume Agent:
    $ sudo /etc/init.d/flume-ng-agent restart
  4. Send a file containing tweets to the HTTPSource:
    • Parcel-based Installation:
      $ curl --data-binary \
      @/opt/cloudera/parcels/CDH/share/doc/search-*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \
      'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' \
      --header 'Content-Type:application/octet-stream' --verbose
    • Package-based Installation:
      $ curl --data-binary \
      @/usr/share/doc/search-*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \
      'http://127.0.0.1:5140?resourceName=sample-statuses-20120906-141433-medium.avro' \
      --header 'Content-Type:application/octet-stream' --verbose
  5. Check the log for status or errors:
    $ cat /var/log/flume-ng/flume.log 

Use the Cloudera Search GUI at http://search01.example.com:8983/solr/collection1/select?q=*%3A*&wt=json&indent=true to verify that new tweets have been ingested into Solr as expected.

Indexing a File Containing Tweets with Flume SpoolDirectorySource

SpoolDirectorySource specifies a directory on a local disk that Flume monitors. Flume automatically transfers data from files in this directory to Solr. SpoolDirectorySource sends data over a channel to a sink, in this case a SolrSink.

  1. Delete all existing documents in Solr:
    $ sudo /etc/init.d/flume-ng-agent stop
    $ solrctl collection --deletedocs tweets
  2. Comment out TwitterSource and HTTPSource in /etc/flume-ng/conf/flume.conf and uncomment SpoolDirectorySource:
    # Comment out "agent.sources = twitterSrc"
    # Comment out “agent.sources = httpSrc”
    “agent.sources = spoolSrc”
  3. Delete any old spool directory and create a new spool directory:
    $ rm -fr /tmp/myspooldir
    $ sudo -u flume mkdir /tmp/myspooldir
  4. Restart the Flume Agent:
    $ sudo /etc/init.d/flume-ng-agent restart
  5. Send a file containing tweets to the SpoolDirectorySource. To ensure no partial files are ingested, copy and then atomically move files:
    • Parcel-based Installation:
      $ sudo -u flume cp \
      /opt/cloudera/parcels/CDH/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \
      /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro
      $ sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro \
      /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
    • Package-based Installation:
      $ sudo -u flume cp \
      /usr/share/doc/search*/examples/test-documents/sample-statuses-20120906-141433-medium.avro \
      /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro
      $ sudo -u flume mv /tmp/myspooldir/.sample-statuses-20120906-141433-medium.avro \
      /tmp/myspooldir/sample-statuses-20120906-141433-medium.avro
  6. Check the log for status or errors.
    $ cat /var/log/flume-ng/flume.log
  7. Check the completion status.
    $ find /tmp/myspooldir

Use the Cloudera Search GUI to verify that the new tweets have been ingested into Solr. For example, access the following URL in a browser:

http://search01.example.com:8983/solr/collection1/select?q=*%3A*&wt=json&indent=true

Replace search01.example.com with a hostname running Solr server.

Page generated August 14, 2017.