Need example code in Java to read and lock a record then update it, releasing the lock

Hello,

I seem to be having trouble finding a simple example that does a getAndLock (or whatever the equivalent CAS related operation is), programmatically changing the document, then updating (upserting?) it and thereby releasing the lock. This is using the java-client 2.1.2.

Would someone be so kind as to direct me to something or otherwise post an example?

Thanks,

Matthew

The following code seems to work…

    Bucket couchbaseBucket = couchbaseCluster.openBucket("matthew");


    try {

        for (int j = 0; j < 2; j++) {

            JsonDocument existingDocument = couchbaseBucket.getAndLock("id1", 30);

            if (existingDocument == null) {

                JsonArray intialArray = JsonArray.empty();

                JsonObject initialData = JsonObject.empty();

                initialData.put("value", "value-" + j + "-" + System.currentTimeMillis());

                intialArray.add(initialData);

                JsonObject arrayData = JsonObject.empty();

                arrayData.put("data", intialArray);

                JsonDocument intialDocument = JsonDocument.create("id1", arrayData);

                couchbaseBucket.insert(intialDocument);

            } else {

                long documentCas = existingDocument.cas();

                JsonArray arrayIn = existingDocument.content().getArray("data");

                JsonObject newData = JsonObject.empty();

                newData.put("value", "value-" + j + "-" + System.currentTimeMillis());

                arrayIn.add(newData);

                if (couchbaseBucket.unlock("id1", documentCas)) {

                    couchbaseBucket.upsert(existingDocument);

                }else {

                    throw new RuntimeException("Document not unlocked");
                }

            }
        }

    } finally {

        couchbaseBucket.close();

        couchbaseCluster.disconnect();

    }

…but I have to unlock right before the upsert during which time another thread could make updates. I do not see a way for the unlock operation to occur at the same time as the upsert. I know one could start ‘synchronizing’ things, but was hoping Couchbase would manage that under the covers.

This seems to work and addresses the above issue, hopefully someone else will find this useful…

    Bucket couchbaseBucket = couchbaseCluster.openBucket("matthew");

    try {
        couchbaseBucket.remove("id1");
    } catch(RuntimeException e) {

    }

    try {

        for (int j = 0; j < 2; j++) {

            JsonDocument existingDocument = couchbaseBucket.getAndLock("id1", 30);

            if (existingDocument == null) {

                JsonArray intialArray = JsonArray.empty();

                JsonObject initialEntry = JsonObject.empty();

                initialEntry.put("value", "value-" + j + "-" + System.currentTimeMillis());

                intialArray.add(initialEntry);

                JsonObject arrayData = JsonObject.empty();

                arrayData.put("data", intialArray);

                JsonDocument intialDocument = JsonDocument.create("id1", arrayData);

                couchbaseBucket.insert(intialDocument);

            } else {

                long documentCas = existingDocument.cas();

                JsonArray arrayIn = existingDocument.content().getArray("data");

                JsonArray arrayOut = JsonArray.empty();

                for (int i = 0; i < arrayIn.size(); i++) {

                    arrayOut.add(arrayIn.getObject(i));

                }

                JsonObject newEntry = JsonObject.empty();

                newEntry.put("value", "value-" + j + "-" + System.currentTimeMillis());

                arrayOut.add(newEntry);

                JsonObject newArrayData = JsonObject.empty();

                newArrayData.put("data", arrayOut);

                JsonDocument newDocument = JsonDocument.create("id1", newArrayData, documentCas);

                couchbaseBucket.replace(newDocument);

            }
        }

    } finally {

        couchbaseBucket.close();

        couchbaseCluster.disconnect();

    }

…seems that the critical item is the need to create a new JsonDocument and provide the existing CAS when doing so.

since existingDocument already bears the latest CAS when obtained via getAndLock, you should be able t just update its content and do a replace on it, the SDK will pick up the CAS and unlock&replace.

Please show an example as I tried just about every combination of operation I could think of (some of which were included in my above comment that has been heavily edited).

Hi, here is a simple example. This is a non-contended lock which updates successfully:

JsonDocument initialDoc = bucket.upsert(JsonDocument.create("doc", JsonObject.empty()));

System.out.println("Initial Write: " + initialDoc);

JsonDocument lockedDoc = bucket.getAndLock("doc", 10);

System.out.println("Locked Get: " + lockedDoc);

lockedDoc.content().put("updated", true);
JsonDocument replacedDoc = bucket.replace(lockedDoc);

System.out.println("Replaced: " + replacedDoc);

This prints:

Initial Write: JsonDocument{id='doc', cas=173337848184832, expiry=0, content={}}
Locked Get: JsonDocument{id='doc', cas=173337851002880, expiry=0, content={}}
Replaced: JsonDocument{id='doc', cas=173337858867200, expiry=0, content={"updated":true}}

… now if you want to simulate a contention, you can create a new doc from the old one, but change the CAS to something invalid:

lockedDoc.content().put("updated", true);
JsonDocument replacedDoc = bucket.replace(JsonDocument.from(lockedDoc, 12345));

If you change the code like that you’ll see this exception raised:

Exception in thread "main" com.couchbase.client.java.error.CASMismatchException
	at com.couchbase.client.java.CouchbaseAsyncBucket$17.call(CouchbaseAsyncBucket.java:608)
	at com.couchbase.client.java.CouchbaseAsyncBucket$17.call(CouchbaseAsyncBucket.java:590)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
	at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
	at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:100)
	at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.ReplaceResponse.class
	at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:101)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
	... 11 more

Thanks, let me take a look at that.

Hi

i also have question on this example:
JsonDocument replacedDoc = bucket.replace(lockedDoc); - working fine
JsonDocument replacedDoc = bucket.upsert(lockedDoc); - CASMismatchException

CAS is correct why it is produces CASMismatchException?
how to handle it?

only way upsert is working when lock is expired (add Thread.sleep(10s) before it)

@SiriusDour upsert does not look at the CAS, only replace and remove do. So please use replace if you want to replace a document with the proper CAS

ok. thank you

i just wanted to use only get/upsert/remove
but when i started to implement concurrent modification of documents (by using getAndLock in some cases) i faced this problem with upsert
looks like i need to rework my application now to use get/getAndLock/insert/replace/remove