Data import options with Elasticsearch
Architecturally there are two approaches for dataload, at the outset, you will have to decide between "push" vs "pull" model based on your requirements and performance goals, in this article we will explore ES dataload options for both of these categories.
I have sourced much of this information from ES mailing group , in fact this is a compilation of everything that I found on the ES mailing list while I was researching on this topic and did not find any tutorial or article that has a comprehensive information on this topic.
Java TransportClient bulk indexing
Can be used within a custom ETL load that runs outside of ES nodes, you can connect to ES node from a remote host, you can index with multiple threads it saves a bit of HTTP overhead by using the native ES protocol, Bulk is always best as it would try and group the requests per shard and minimize the network round trips, Transport Client is thread safe and it is built to be reused by several threads, while doing bulk load coding do ensure you do not create Transport client in a loop, instead send all the requests through one TransportClient instance per JVM, perhaps create TransportClient as a singleton.
Internally the Transport client sends each request asynchronously and is thread safe
Another nice thing about using a Transportclient is that it will automatically internally round robin to a ES node, and then that node will spread the bulk requests to the respective "shard bulks"
Here is a sample snippet that can be used for connecting to the ES cluster.
Here is a sample code for creating ES client and using bulk load API for indexing.
Performance Tuning
4. One of the simplest and most effective strategy is to simply start with a no replica index. And once indexing is done, increase the number of replicas to the number you want to have. This will reduce the load when indexing.
Further Reading.
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html
I have sourced much of this information from ES mailing group , in fact this is a compilation of everything that I found on the ES mailing list while I was researching on this topic and did not find any tutorial or article that has a comprehensive information on this topic.
Before we jump deep into the topic there are few basic things to remember when it comes to indexing the data in ES, with ES, the best load performance is with more shards, and best query performance is with more replicas, so you need to find a sweet spot with your setup, In ES all indexing goes through the primary shards , it is important that you follow an iterative approach to data indexing needs to arrive at a sweet spot, don't start with tuning at first place, instead let tuning recommendations trickle down based on what you learn from your setup and do remember that It takes significantly more time to index on an existing index than on an empty index
Pull Model
River plugin
These are built as custom plugin code that can be deployed within ES and runs within the ES node, they are a good fit when you are expecting a constant flow of data that needs to be indexed and you don't want to write another external application to push data into ES for indexing. a very good use case if when you are indexing analytics and server logs or data coming out of nosql store like cassendra or mongodb.
River plugins also support import using Bulk API, this is useful in cases where the river plugin
can accumulate the data for certain threshold before performing an import / indexing, since the client is running within the ES node it is cluster aware.
Push Model
curl -XPUT
This is perhaps the simplest way to index a document, you just perform a PUT on a REST endpoint,
this works best during during development phase to index documents for performing few quick validations from command line.
curl -XPOST 'http://127.0.0.1:9200/test' -d '{"partnumber":"HLG028_281201","name":"Modern Houseware Hanging Lamp","shortdescription":"A red hanging lamp in a triangular shape.","longdescription":"A hanging lamp with red ambient shades to add a romantic mood to your room. Perfect for your bedroom or your children's room. Easy set up so you do not have to pay electricians to set it up."}'
UDP Bulk API
Connectionless datagram protocol. This is faster but not so reliable as you don't have any acknowledgement of success or failure.
E.g. cat bulk.txt | nc -w 0 -u localhost 9700
HTTP Bulk APIif you have an external application that consolidates the data in a timely manner
and then formats it to JSON to be indexed. This is much more reliable as compared to UDP bulk import as you get an acknowledgement of index operation and can take corrective steps based on the response.
Java TransportClient bulk indexing
Can be used within a custom ETL load that runs outside of ES nodes, you can connect to ES node from a remote host, you can index with multiple threads it saves a bit of HTTP overhead by using the native ES protocol, Bulk is always best as it would try and group the requests per shard and minimize the network round trips, Transport Client is thread safe and it is built to be reused by several threads, while doing bulk load coding do ensure you do not create Transport client in a loop, instead send all the requests through one TransportClient instance per JVM, perhaps create TransportClient as a singleton.
Internally the Transport client sends each request asynchronously and is thread safe
Another nice thing about using a Transportclient is that it will automatically internally round robin to a ES node, and then that node will spread the bulk requests to the respective "shard bulks"
Here is a sample snippet that can be used for connecting to the ES cluster.
ImmutableSettings.Builder clientSettings = ImmutableSettings.settingsBuilder()
.put("http.enabled", "false")
.put("discovery.zen.minimum_master_nodes", 1)
.put("discovery.zen.ping.multicast.ttl", 4)
.put("discovery.zen.ping_timeout", 100)
.put("discovery.zen.fd.ping_timeout", 300)
.put("discovery.zen.fd.ping_interval", 5)
.put("discovery.zen.fd.ping_retries", 5)
.put("client.transport.ping_timeout", "10s")
.put("multicast.enabled", false)
.put("discovery.zen.ping.unicast.hosts", esHosts)
.put("cluster.name", esClusterName)
.put("index.refresh_interval", "10") //change refresh interval to a higher value
.put("index.merge.async", true); //change index merge to async
TransportClient client = new TransportClient( clientSettings.build() );
List<TransportAddress> addresses = new LinkedList<TransportAddress>();
//Add one or more ES address and port
InetSocketTransportAddress address = new InetSocketTransportAddress("<ES_IP>)",Integer.parseInt("<ES_PORT>"));
addresses.add(address);
TransportAddress[] taddresses = addresses.toArray(new TransportAddress[addresses.size()]);
client.addTransportAddresses(taddresses);
// Create initial bulk request builder
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.setRefresh(false);
IndexRequestBuilder indexRequestBuilder = esLoader.getClient().prepareIndex("<ES_INDEX_NAME>", "regular");
//Build the JSON content using XContentBuilder
indexRequestBuilder.setSource(XContentBuilder);
bulkRequest.add(indexRequestBuilder);
BulkResponse bulkResponse = bulkRequest.execute().actionGet()
if (bulkResponse.hasFailures()) {
log.info("Failed to send all requests in bulk " + bulkResponse.buildFailureMessage());
return true;
}
else {
log.info("Elasticsearch Index updated in {} ms.", bulkResponse.getTookInMillis());
}
Performance Tuning
1. Start with tuning the index refresh rate at the time of bulk indexing, While importing large amount of data it is recommended to disable refresh interval by setting to a value of -1, you can then refresh the index programmatically towards the end of the load.
You can define index refresh rate at global level by defining in config/elasticsearch.yml or at index level
a value of -1 will suppress it or you can set to any positive integer value based on your requirements of index refresh.
You can define index refresh rate at global level by defining in config/elasticsearch.yml or at index level
a value of -1 will suppress it or you can set to any positive integer value based on your requirements of index refresh.
curl -XPUT localhost:9200/test/_settings -d '{
"index" : {
"refresh_interval" : "-1"
} }'
2. You can decrease the bulk thread pool size,Thread pool size should be carefully tuned, under most circumstances defaults are good enough, but you can tune these based on your application requirements, for instance if you are expecting data to flow into the index all the time you can think of adding more thread pools for bulk index operation.
Always remember this rule of thumb, every thread eats up system resources, and try to match it with number of cores.
3. if you want both - max perf on load and max perf on search - you should use two indexes, one for the old generation and one for new generation, and connect them with an index alias. Distribute the indexes over the nodes so they form two separated groups, that is, so they use different machines (for example, by shard moving, shard allocation). Set replica level to 0 (no replicas) for the new gen index. Forward search only to those nodes with the old gen. After bulk is complete, add replica level to new gen, and switch from old to new with the help of index alias (or by just dropping the old gen). You may see a perf hit when replicas are building up but this is not much compared to bulk load.Always remember this rule of thumb, every thread eats up system resources, and try to match it with number of cores.
# Search pool
threadpool.search.type: fixed
threadpool.search.size: 3
threadpool.search.queue_size: 100
# Bulk pool
threadpool.bulk.type: fixed
threadpool.bulk.size: 2
threadpool.bulk.queue_size: 300
# Index pool
threadpool.index.type: fixed
threadpool.index.size: 2
threadpool.index.queue_size: 100
Further Reading.
http://www.elasticsearch.org/guide/en/elasticsearch/reference/current/modules-plugins.html
0 Response to "Data import options with Elasticsearch"
Post a Comment