Bucket insert behaves asynchronously

java
n1ql

#1

Hi,
I am using JsonDocument inserted = bucket.insert(doc); on couchbase-server-enterprise_4.1.0-ubuntu14.04_amd64.

What exactly happening is, I trigger this insert command and on success of this I trigger the Select command on same document. This Select command doesn’t returns the update entries instead returns the old ones only. When I am using debugger before Select (wait for some seconds after insert and then call Select) it works totally fine. So i think insert behaves in async manner, not sure.

Other thing i have checked is upsert instead of insert, it is also not working. However when I do bucket.replace(doc) and then call Select, it returns the updated results. I have tried explicitly using the bucket.async().insert(doc) and the using toBlocking().single() on it, this also fails.

Is the issue with insert/upsert or I am doing something wrong.

Below is some of my code snippet ::

@Override
public T save(T entity, String username) {

String id = generateId(entity);

JsonObject data = JsonObject.fromJson(getContent(entity));
data.removeKey(ID);
data.put(TYPE, klass.getSimpleName());
data.put(CREATED_AT, new Date().getTime());
data.put(CREATED_BY, username);
data.put(MODIFIED_AT, new Date().getTime());
data.put(MODIFIED_BY, username);
data.put(ACTIVE, true);

 T persistedEntity = getEntity(bucket.insert(JsonDocument.create(id, data)));

return persistedEntity;

}

@Override
public List findAll() {
Statement query = selectAll().where(typeExpression()).orderBy(Sort.desc(x(CREATED_AT)));
return getEntities(query);
}

protected AsPath selectAll() {
return Select.select("meta().id, *").from(i(bucket.name()));
}

#2

N1QL queries can run with varying degrees of consistencies, unlike key/value operations which are always consistent (you “read-your-own-writes”).

So there is a slight delay between an insertion and the indexer catching up to it. If you execute a N1QL query by default it will return what’s the current state of the indexer, so if it is still indexing your document you won’t see the update.

Try executing the query with a ScanConsistency of REQUEST_PLUS, where you do the bucket().query(...):

N1qlQuery n1qlQuery = N1qlQuery.simple(
    selectStatement, //that's the Statement query in your example
    N1qlParams
        .build()
        .consistency(ScanConsistency.REQUEST_PLUS)
);

This will instruct N1QL to wait for the indexer to finish indexing pending mutations.


#3

This works fine, thanks. Still, is there any other way where I can directly get the results from document instead of indexer.


#4

no, if you don’t have a list of document IDs known in advance, you cannot directly access documents, you have to go through an index one way or the other…


#5

Hi, is there any way by which insertion operation not returns until indexer finishes indexing pending mutations. So that each time I Select it gets the updated results without any delay


#6

No, it is an architectural design decision that secondary indexing shouldn’t delay key/value responsiveness.

Plus, saying “the select should let the indexer process all pending mutations and catch up rather than returning stale results” achieves virtually the same goal, so why not use that?

If you’re concerned that there might be too many mutations waiting to be indexed when you’re only interested in one, there’s another finer level of consistency in N1qlParams in the latest versions of the SDK: AT_PLUS.

AT_PLUS requires a Couchbase Server 4.0 or above, and a little bit of configuration when initializing the SDK:

CouchbaseEnvironment env = DefaultCouchbaseEnvironment.builder()
    .mutationTokensEnabled(true) //this is needed for AT_PLUS
    .build();
Cluster cluster = CouchbaseCluster.create(env, "localhost"); //replace "localhost" with your IP(s) or hostname(s)

Now when you do a mutation, you must keep track of the returned document(s):

JsonDocument inserted1 = bucket.insert(JsonDocument.create(id1, data1));

//do not track the document you use as input...
JsonDocument toInsert2 = JsonDocument.create(id2, data2);
//...but rather the one returned by the insert method:
JsonDocument inserted2 = bucket.insert(toInsert2);

Finally, when you do the query, use consistentWith rather than consistency in the parameters:

N1qlQuery n1qlQuery = N1qlQuery.simple(
    selectStatement,
    N1qlParams
        .build()
        .consistentWith(inserted1, inserted2)
);

Here the query engine will be able to use the index as soon as the indexer has caught up with the two documents. If there are more documents pending to be indexed in the queue, the query engine won’t necessarily wait for them to be indexed.
That is only useful if there is an heavy write workload between the moment where you insert your document and the moment you SELECT for it.


#7

Is this the case for the primary index too or would we get consistent results with just the primary index?