Couchbase java bulk load


#1

I am reading bulk data from a couch base bucket and inserting this bulk data to another bucket.I am using couchbase java sdk 1.4.4 version.Using views for reading the whole data.

I am using set api to insert the data. After inserting whole data into couch base, I am closing the couchbaseclient.

Data gets inserted into couch base but as soon as it gets inserted, it starts indexing the data which I can see from couch base web console. As per couch base documentation, it should start indexing when we are reading the data , based on STALE parameter.

The other issue which I am facing is, at my java application console, it shows lots of warnings until indexing is not done.

| WARN [OperationFuture] Exception thrown wile executing com.couchbase.client.CouchbaseClient$15.operationComplete()
| java.lang.IllegalStateException: Shutting down
| at net.spy.memcached.MemcachedClient.broadcastOp(MemcachedClient.java:298) ~[spymemcached-2.11.4.jar:2.11.4]
| at net.spy.memcached.MemcachedClient.broadcastOp(MemcachedClient.java:292) ~[spymemcached-2.11.4.jar:2.11.4]
| at com.couchbase.client.CouchbaseClient.observe(CouchbaseClient.java:1670) ~[couchbase-client-1.4.4.jar:1.4.4]
| at com.couchbase.client.CouchbaseClient.observePoll(CouchbaseClient.java:1803) ~[couchbase-client-1.4.4.jar:1.4.4
| at com.couchbase.client.CouchbaseClient$15.onComplete(CouchbaseClient.java:1443) ~[couchbase-client-1.4.4.jar:1.4
| at com.couchbase.client.CouchbaseClient$15.onComplete(CouchbaseClient.java:1412) ~[couchbase-client-1.4.4.jar:1.4
| at net.spy.memcached.internal.AbstractListenableFuture$1.run(AbstractListenableFuture.java:117) ~[spymemcached-2.
| at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) [na:1.7.0_21]


#2

You need to make sure all data is done before shutting the thing down. Can you please share your code? If you are using the async API, use latches to orchestrate between the callbacks and the shutdown.


#3

Thanks daschl…I got that it is happening because I am shutting down couch base client before making sure that all data is inserted through asyncronous api by using count down latches.

I got the answer of other query as well related to indexing. Couchbase has some default setting for auto indexing

{
“updateInterval”:5000,
“updateMinChanges”:5000,
“replicaUpdateMinChanges”:5000
}


#4

Hi daschl,

The strategy which I followed for inserting bulk data through couchbase is ,
I have 100,000 records. I have created batches of 5000 and inserting data asynchronously. First few times when running the application, it is able to insert whole 100,000 records. but 3rd for 4th times, I see that it is not to insert complete data.

Code which I am using for inserting the data:

final CountDownLatch latch = new CountDownLatch(listJsonObjects.size());
for (JsonObject jsonObject : listJsonObjects)
{
try
{
OperationFuture setFuture = this.couchbaseClient.set(key,jsonObject);// I am fetching key from jsonObject itself so just put as key here

    setFuture.addListener(new OperationCompletionListener()
    {
      @Override
      public void onComplete(OperationFuture<?> future) throws Exception
      {
        latch.countDown();
      }
    });
  }
  catch (Exception e)
  {
    e.printStackTrace();
  }
}  
try
{
  latch.await();
}
catch (InterruptedException e)
{
  e.printStackTrace();
}

each time listJsonObjects will have 5000 records. this code is being called from a place where we create list of 5000 records till the whole set of records is inserted.


#5

What you should probably do is look at the future result in the callback and see if it was successful. If it wasn’t the, status on the future will probably tell you what went wrong and then you can adapt your code to handle that accordingly.


#6

Thanks Michal,

I have done the suggested changes.Based on the status , I have created new array for failed documents and retried RECURSIVELY. Failed documents are based as below:

if (!(setFuture.getStatus()).isSuccess()
&& (“timed out”.equals(setFuture.getStatus().getMessage()) || “Temporary failure”.equals(setFuture.getStatus().getMessage())))
{
failedRecords.add(jsonObject);
}

This ensures me that records that are failed due to ‘timed out’ or ‘temporary failure’ will be tried again and again until it gets inserted. Is it correct?


#7

With above approach, I was able to insert 4M records into couch base without any data loss. The problem currently facing is in reading of data. When ever I am reading this inserted data , It gives me timeout error.
I tried with default couch base client setting as well as below setting:

CouchbaseConnectionFactoryBuilder cfb = new CouchbaseConnectionFactoryBuilder();
cfb.setFailureMode(FailureMode.Retry);
cfb.setMaxReconnectDelay(5000); 
cfb.setOpTimeout(15000);
cfb.setOpQueueMaxBlockTime(10000);
CouchbaseConnectionFactory cf =  cfb.buildCouchbaseConnection(uris, bucketName,pwd);

I am reading from view:

  view = couchbaseClient.getView(designDocumentName, viewName);
  Query query = new Query();
  query.setIncludeDocs(true);
  query.setStale(Stale.FALSE); //neither of stale value worked for me.
  ViewResponse result = couchbaseClient.query(view, query);
  viewIterator = result.iterator();

After inserting the data, waited for indexing to be done but even I was not able to read the data.The exception which I am getting is as follows:
java.lang.RuntimeException: Timed out waiting for operation
at com.couchbase.client.internal.HttpFuture.get(HttpFuture.java:75)
at com.couchbase.client.CouchbaseClient.query(CouchbaseClient.java:781)

… at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:601)
at org.apache.cxf.service.invoker.AbstractInvoker.performInvocation(AbstractInvoker.java:181)
at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:97)
at org.apache.cxf.jaxws.AbstractJAXWSMethodInvoker.invoke(AbstractJAXWSMethodInvoker.java:232)
at org.apache.cxf.jaxws.JAXWSMethodInvoker.invoke(JAXWSMethodInvoker.java:69)
at org.apache.cxf.service.invoker.AbstractInvoker.invoke(AbstractInvoker.java:75)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor$1.run(ServiceInvokerInterceptor.java:59)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
at java.util.concurrent.FutureTask$Sync.innerRun(FutureTask.java:334)
at java.util.concurrent.FutureTask.run(FutureTask.java:166)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor$2.run(ServiceInvokerInterceptor.java:126)
at org.apache.cxf.workqueue.SynchronousExecutor.execute(SynchronousExecutor.java:37)
at org.apache.cxf.interceptor.ServiceInvokerInterceptor.handleMessage(ServiceInvokerInterceptor.java:131)
at org.apache.cxf.phase.PhaseInterceptorChain.doIntercept(PhaseInterceptorChain.java:307)
at org.apache.cxf.transport.ChainInitiationObserver.onMessage(ChainInitiationObserver.java:121)
at org.apache.cxf.transport.http.AbstractHTTPDestination.invoke(AbstractHTTPDestination.java:243)
at org.apache.cxf.transport.servlet.ServletController.invokeDestination(ServletController.java:223)
at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:197)
at org.apache.cxf.transport.servlet.ServletController.invoke(ServletController.java:149)
at org.apache.cxf.transport.servlet.CXFNonSpringServlet.invoke(CXFNonSpringServlet.java:171)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.handleRequest(AbstractHTTPServlet.java:286)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.doPost(AbstractHTTPServlet.java:206)
at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
at org.apache.cxf.transport.servlet.AbstractHTTPServlet.service(AbstractHTTPServlet.java:262)
at …
Caused by: java.util.concurrent.TimeoutException: Timed out waiting for operation
at com.couchbase.client.internal.HttpFuture.waitForAndCheckOperation(HttpFuture.java:93)
at com.couchbase.client.internal.ViewFuture.get(ViewFuture.java:66)
at com.couchbase.client.internal.ViewFuture.get(ViewFuture.java:50)
at com.couchbase.client.internal.HttpFuture.get(HttpFuture.java:72)


#8

The query you are performing does not set a limit, so are you trying to load all records in one batch? The old SDK does not do proper streaming, so this is more or less expected to timeout.

Please use the paginator (available on the client object) to get better batch loading of your responses or limit the clauses upfront. With the new SDK it should be doable since we stream it from the server in a more efficient manner.


#9

In this context, “old” means 1.1.x through 1.4.x releases. New means 2.0.0 or later.

(for other future readers to understand)


#10

Checked with paginator . It is working fine without any exceptions but not as fast as expected.
For 4 Million data ,with setting of page count as 20,000, it is taking 10-15 seconds/100,000 records for reading.


#11

I’d rather do smaller batch counts, since getBulks are synchronous and also rather expensive on a single thread.
You can try smaller batch counts, Stale TRUE as well if that fits your requirements, or if that is still too slow I’d recommend you to either hand-roll some multithreaded view querying where batches are loaded from a thread pool in parallel or you try out the new SDK and use the asynchronous workflows.