Monday, December 8, 2014

Writing an Elasticsearch river


I've been recently working a lot with elasticsearch, the enterprise search engine based on lucene.  It is a very versatile tool. Although originally conceived to index documents or web sites to provide full text search capabilities, it is widely used in other use cases like logs aggregation or structured data indexing to leverage the flexibility its query language and aggregation engine.

An interesting feature of elasticsearch is the possibility of writing custom plugins in Java and deploy them in the cluster. A particular type of plugin, the river, allows us to move the data indexing process to the cluster using a polling approach: the river runs inside the elastiscsearch node retrieving and indexing the data from our repository instead of an external client implementation in a separate process that pushes the data to the cluster.

Writing a river is not a very difficult task. I recently wrote a river for indexing oracle coherence caches, you can find it in my repository. I will try to highlight the main steps I took  using it as example.

The River is a type of plugin and must be loaded by the node when it starts, so we need our entry point, the CoherenceRiverPlugin class to extend the AbstractPlugin class. We need to provide a name and a description of the plugin and, to let Elasticsearch know there is a new plugin, we must provide the plugin implementation class name in a mandatory file named es-plugin.properties.


plugin=org.elasticsearch.plugin.river.coherence.CoherenceRiverPlugin


When the node starts it loads the plugins using reflection from the plugins directory (Can be configured) and, if enabled, from the classpath.  It also looks for a onModule method on each plugin implementation to invoke it, injecting a module parameter in the call. In our case we provide an implementation that expects the injection of the RiverModule so we can register our river:

public class CoherenceRiverPlugin extends AbstractPlugin
{

 @Override
 public String name()
 {
  return "river-coherence";
 }

 @Override
 public String description()
 {
  return "Coherence river plugin";
 }

 public void onModule(RiversModule module)
 {
  module.registerRiver("coherence", CoherenceRiverModule.class);
 }

}

In fact we could register several types of modules in our plugin,  this "module" step is there due the way the underlying dependency injection framework Guice works. The implementation of the module binds our custom implementation: CoherenceRiver to the River interface:

public class CoherenceRiverModule extends AbstractModule
{

 @Override
 protected void configure()
 {
  bind(River.class).to(CoherenceRiver.class).asEagerSingleton();
 }

}

Now we arrive to the actual River implementation: CoherenceRiver. This class implements the interface River, extends the class AbstractRiverComponent and is instantiated by dependency injection when we register our river or, if already registered, when the node starts. The Framework looks for a constructor annotated with Inject to pass the desired context parameter (like a client entry point to the node, river settings ...) or a parameter-less public constructor. In our constructor we gather and store the initialization settings.


protected CoherenceRiver(RiverName riverName, RiverSettings settings, Client client)
{
 super(riverName, settings);
 this.client = client;
...
}

Since we are implementing the River interface we must implement the start and stop methods. The start method is also called when registering the river or, if previously registered, when the node starts. This method contains the logic of our river, it registers a listener to the configured cache and performs a initial query to get the current values, results are coalesced by key on an intermediate structure. We receive all the current stored keys plus the new insertions or deletions via this listener and request the actual value to index them using a configurable BulkProcessor to throttle our inputs. Don't forget that by using the river the data loading process is being performed inside the elasticsearch node virtual machine.

In this case the river keeps running and updating the index with the changes to the cache. Other rivers perform an initial import and stop or even can be automatically removed to avoid subsequent executions.

Similarly we implement the stop method to release the river resources when the river is removed of the node is stopped.

Each river is a singleton inside the cluster, if a node executing the river fails it is started in another one. It's up to you to provide a resume mechanism or re-synchronize the index if needed. Note that this makes the river a fault tolerant component but not an scalable one, since only one instance exists in the cluster the load cannot be distributed. Nonetheless, you can create several river instances to index independent stores inside of a repository like different tables in a relational databases or key/value caches in an in-memory data grid.

To register our river we create a document entry in the special system index "_river", using a custom label as type and the string "_meta" as id, we pass the parameters in json format in the request body. In particular the field type tells elasticsearch the specific river plugin we are registering. We can create different rivers of the same type with different initial parameters by using different index types.


curl -XPUT localhost:9200/_river/coherence_river/_meta -d '{
  "type":"coherence",
  "coherence":
    {
     "cache":"TEST_CACHE",
     "query":"key() between 500 and 800"
    }
 }'

Testing our River

 

Elasticsearch provides a powerful test framework  that we can use simply by extending the class ElasticSearchIntegrationTest. By doing that a local cluster is started and we can access it in our test using an inherited instance client. There are some testing dependencies needed in our pom.xml file described in the documentation. In my experience I had to add additional artifacts, you can find the pom file in the repository.

I faced some difficulties when developing the unit tests:

To test the river I needed it to be retrieved from the classpath and I found that by default the nodes started by the test framework only searched for plugins in the default plugins folder ignoring the classpath.

I had to set the load_classpath_plugins property to true using the framework nodeSettings method:


@Override
protected Settings nodeSettings(int nodeOrdinal)
{
 return ImmutableSettings.builder().put("plugins.load_classpath_plugins", "true").build();
}

Then I found out that with the default global cluster setting this method was not triggered so I set the cluster scope to suite and it started working:


@ClusterScope(scope = Scope.SUITE, numDataNodes = 2)
public class CoherenceRiverTest extends ElasticsearchIntegrationTest

Lastly to make sure that the river index is created as a precondition for your test and assertions logic don't forget to use the ensureGreen method:

ensureGreen(index);

Final Thoughts


All in all, and despite some minor difficulties, the implementation of an elasticsearch river is pretty straightforward and the powerful elasticsearch test framework simplifies greatly the automatic testing.  To summarize the main points:

Advantages:
  • Rivers are simple to embed and test in the elasticsearch framework. The business logic is similar to the stand alone client case.
  • By leveraging the elasticsearch cluster infrastructure you get fault tolerance for free. If the node fails, the river is started in another one.
  • You avoid monitoring and managing a separate process.
  • Rivers connect directly to the repository to index, so you usually save a network hop comparing to the external client that first have to retrieve the data from the repository and then push it to the cluster.
Disadvantages:
  • They are less flexible than an external client, you have to use java.
  • Rivers are deployed as singletons in the cluster, you cannot distribute the load to different nodes, thus, rivers don't scale. On the other hand a separate pushing client could be made to scale but it is a complex task, and on most of use cases a node is enough.
  • You are running your custom data loading process inside the elasticsearch node. This can penalize the node performance, in most cases the common cpu demanding pattern is the json serialization inside the river so make sure you use a good performing library.  It is also advisable to implement a configurable throttling mechanism like the BulkProcessor.

I hope this post gives you some insights on the implications, pros and cons of using a river.