Need example code in Java to read and lock a record then update it, releasing the lock


#1

Hello,

I seem to be having trouble finding a simple example that does a getAndLock (or whatever the equivalent CAS related operation is), programmatically changing the document, then updating (upserting?) it and thereby releasing the lock. This is using the java-client 2.1.2.

Would someone be so kind as to direct me to something or otherwise post an example?

Thanks,

Matthew


#2

The following code seems to work…

    Bucket couchbaseBucket = couchbaseCluster.openBucket("matthew");


    try {

        for (int j = 0; j < 2; j++) {

            JsonDocument existingDocument = couchbaseBucket.getAndLock("id1", 30);

            if (existingDocument == null) {

                JsonArray intialArray = JsonArray.empty();

                JsonObject initialData = JsonObject.empty();

                initialData.put("value", "value-" + j + "-" + System.currentTimeMillis());

                intialArray.add(initialData);

                JsonObject arrayData = JsonObject.empty();

                arrayData.put("data", intialArray);

                JsonDocument intialDocument = JsonDocument.create("id1", arrayData);

                couchbaseBucket.insert(intialDocument);

            } else {

                long documentCas = existingDocument.cas();

                JsonArray arrayIn = existingDocument.content().getArray("data");

                JsonObject newData = JsonObject.empty();

                newData.put("value", "value-" + j + "-" + System.currentTimeMillis());

                arrayIn.add(newData);

                if (couchbaseBucket.unlock("id1", documentCas)) {

                    couchbaseBucket.upsert(existingDocument);

                }else {

                    throw new RuntimeException("Document not unlocked");
                }

            }
        }

    } finally {

        couchbaseBucket.close();

        couchbaseCluster.disconnect();

    }

…but I have to unlock right before the upsert during which time another thread could make updates. I do not see a way for the unlock operation to occur at the same time as the upsert. I know one could start ‘synchronizing’ things, but was hoping Couchbase would manage that under the covers.

This seems to work and addresses the above issue, hopefully someone else will find this useful…

    Bucket couchbaseBucket = couchbaseCluster.openBucket("matthew");

    try {
        couchbaseBucket.remove("id1");
    } catch(RuntimeException e) {

    }

    try {

        for (int j = 0; j < 2; j++) {

            JsonDocument existingDocument = couchbaseBucket.getAndLock("id1", 30);

            if (existingDocument == null) {

                JsonArray intialArray = JsonArray.empty();

                JsonObject initialEntry = JsonObject.empty();

                initialEntry.put("value", "value-" + j + "-" + System.currentTimeMillis());

                intialArray.add(initialEntry);

                JsonObject arrayData = JsonObject.empty();

                arrayData.put("data", intialArray);

                JsonDocument intialDocument = JsonDocument.create("id1", arrayData);

                couchbaseBucket.insert(intialDocument);

            } else {

                long documentCas = existingDocument.cas();

                JsonArray arrayIn = existingDocument.content().getArray("data");

                JsonArray arrayOut = JsonArray.empty();

                for (int i = 0; i < arrayIn.size(); i++) {

                    arrayOut.add(arrayIn.getObject(i));

                }

                JsonObject newEntry = JsonObject.empty();

                newEntry.put("value", "value-" + j + "-" + System.currentTimeMillis());

                arrayOut.add(newEntry);

                JsonObject newArrayData = JsonObject.empty();

                newArrayData.put("data", arrayOut);

                JsonDocument newDocument = JsonDocument.create("id1", newArrayData, documentCas);

                couchbaseBucket.replace(newDocument);

            }
        }

    } finally {

        couchbaseBucket.close();

        couchbaseCluster.disconnect();

    }

…seems that the critical item is the need to create a new JsonDocument and provide the existing CAS when doing so.


#3

since existingDocument already bears the latest CAS when obtained via getAndLock, you should be able t just update its content and do a replace on it, the SDK will pick up the CAS and unlock&replace.


#4

Please show an example as I tried just about every combination of operation I could think of (some of which were included in my above comment that has been heavily edited).


#5

Hi, here is a simple example. This is a non-contended lock which updates successfully:

JsonDocument initialDoc = bucket.upsert(JsonDocument.create("doc", JsonObject.empty()));

System.out.println("Initial Write: " + initialDoc);

JsonDocument lockedDoc = bucket.getAndLock("doc", 10);

System.out.println("Locked Get: " + lockedDoc);

lockedDoc.content().put("updated", true);
JsonDocument replacedDoc = bucket.replace(lockedDoc);

System.out.println("Replaced: " + replacedDoc);

This prints:

Initial Write: JsonDocument{id='doc', cas=173337848184832, expiry=0, content={}}
Locked Get: JsonDocument{id='doc', cas=173337851002880, expiry=0, content={}}
Replaced: JsonDocument{id='doc', cas=173337858867200, expiry=0, content={"updated":true}}

… now if you want to simulate a contention, you can create a new doc from the old one, but change the CAS to something invalid:

lockedDoc.content().put("updated", true);
JsonDocument replacedDoc = bucket.replace(JsonDocument.from(lockedDoc, 12345));

If you change the code like that you’ll see this exception raised:

Exception in thread "main" com.couchbase.client.java.error.CASMismatchException
	at com.couchbase.client.java.CouchbaseAsyncBucket$17.call(CouchbaseAsyncBucket.java:608)
	at com.couchbase.client.java.CouchbaseAsyncBucket$17.call(CouchbaseAsyncBucket.java:590)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:55)
	at rx.subjects.SubjectSubscriptionManager$SubjectObserver.onNext(SubjectSubscriptionManager.java:224)
	at rx.subjects.AsyncSubject.onCompleted(AsyncSubject.java:100)
	at com.couchbase.client.core.endpoint.AbstractGenericHandler$1.call(AbstractGenericHandler.java:199)
	at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
	at java.lang.Thread.run(Thread.java:745)
Caused by: rx.exceptions.OnErrorThrowable$OnNextValue: OnError while emitting onNext value: com.couchbase.client.core.message.kv.ReplaceResponse.class
	at rx.exceptions.OnErrorThrowable.addValueAsLastCause(OnErrorThrowable.java:101)
	at rx.internal.operators.OperatorMap$1.onNext(OperatorMap.java:58)
	... 11 more

#6

Thanks, let me take a look at that.