Multithread Document Insert in Couchbase


#1

Hi!
I’ve a program that inserts info about events collected by an event broker (Kafka).
When the system receive an event launch a thread that write info on Couchbase.
To every thread is passed a bucket object. This is the code

public void writeToCouchbase(){
		JsonObject content = JsonObject.empty().put("Timestamp", eventToAnalyze.getTimestamp()).put("Product_Id", eventToAnalyze.getProduct_Id())
				.put("Model_Id", eventToAnalyze.getModel_Id()).put("Serial_Id", eventToAnalyze.getSerial_Id())
				.put("Profile_Id", eventToAnalyze.getProfile_Id()).put("pressure", eventToAnalyze.getPressure())
				.put("Temperature", eventToAnalyze.getTemperature()).put("Laps", eventToAnalyze.getLaps())
				.put("Status1", eventToAnalyze.getStatus1()).put("Status2", eventToAnalyze.getStatus2());
		String documentId = eventToAnalyze.getTimestamp()+eventToAnalyze.getSerial_Id(); 
		JsonDocument doc = JsonDocument.create(documentId,content);
		JsonDocument inserted = bucket.insert(doc);
	}

Nerverthless I have, at some events this same error
Exception in thread “Thread-7” com.couchbase.client.java.error.DocumentAlreadyExistsException
at com.couchbase.client.java.CouchbaseAsyncBucket$8.call(CouchbaseAsyncBucket.java:373)
at com.couchbase.client.java.CouchbaseAsyncBucket$8.call(CouchbaseAsyncBucket.java:357)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
at rx.internal.operators.NotificationLite.accept(NotificationLite.java:150)
at rx.subjects.SubjectSubscriptionManager$SubjectObserver.accept(SubjectSubscriptionManager.java:319)
at rx.subjects.AsyncSubject$1.call(AsyncSubject.java:72)
at rx.subjects.AsyncSubject$1.call(AsyncSubject.java:67)
at rx.subjects.SubjectSubscriptionManager.add(SubjectSubscriptionManager.java:96)
at rx.subjects.SubjectSubscriptionManager.call(SubjectSubscriptionManager.java:61)
at rx.subjects.SubjectSubscriptionManager.call(SubjectSubscriptionManager.java:35)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable$1.call(Observable.java:145)
at rx.Observable$1.call(Observable.java:137)
at rx.Observable.subscribe(Observable.java:7393)
at com.couchbase.client.java.util.Blocking.blockForSingle(Blocking.java:73)
at com.couchbase.client.java.CouchbaseBucket.insert(CouchbaseBucket.java:226)
at com.couchbase.client.java.CouchbaseBucket.insert(CouchbaseBucket.java:221)
at ThreadedEventAnalyzer.writeToCouchbase(ThreadedEventAnalyzer.java:63)
at ThreadedEventAnalyzer.anomalyErrorObserve(ThreadedEventAnalyzer.java:51)
at ThreadedEventAnalyzer.run(ThreadedEventAnalyzer.java:118)
at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.InsertResponse.class
at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:98)
at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
… 19 more

What’s the problem?

Thank you for your support!


#2

I see DocumentAlreadyExistsException
try bucket.upsert(doc); method to either insert or update or check that documentId is unique
I hope it helps

Also good introduction