Modify the hash algorithm so some keys will always end up on the same node

Hi

I need to modify the hash algorithm so that some keys will always end up on the same node.
As an example, my document Id is XYZ:123, we want the hash to be on only XYZ and not the whole document Id.

Is it possible to tweak it in java SDK somehow to achieve this.

Regards,
Venkat

@sri_ram this is not something the SDK supports out of the box, since Couchbase is really designed that all your partitions hold a roughly equal set of documents (your cluster is balanced).

There are ways to do this, but it involves copying some code and figuring out document IDs that match the partitions and then saving it - again I advise against it in general. May I ask what’s the use case?

Thanks for your response, we have a requirement to insert around 5k documents in a single transaction.
However, though we can achieve it using N1QL transactions but it is slower.
We wanted to at least ensure some level of Atomicity by ensuring all documents of XYZ (as explained in example) lands on the same data node.

Example docIds:
XYZ:123, XYZ:345, XYZ:789…etc
ABC:111, ABC:898,ABC:900…etc

I was able to modify KeyValueLocator.java to route to same partition(vbucketId), however the document is getting saved as below, and meta information is missing.

Can you let me know why this is happening or what changes or files I need to look at to fix this??

Hey @sri_ram,

Note that just going to a single node doesn’t give you all-or-nothing transaction behavior.

Also note that you need not go through N1QL statements for transactions. Couchbase Transactions has direct access through Java. I think for a bulk add of 5k documents, it likely will be more performant to use that API.

The concept you’re following there, putting the related documents in a single vbucket, is what we call setting a shard key for the doc. Back in the memcached days, it was common for some clients to do this. We did allow for this in the early design of the system.

At some point though, when some of the UI, query, XDCR and replication components were added, we didn’t carry that shard key through the system. So, when you go to the UI, there are a couple of ways it may retrieve the document, and one is through its own calculation of the vbucketid based on the key.

Most of that is background to say that while you could, by modifying your client and working with the KV service, coerce a number of documents into the same vbucket and thus the same node, there are other parts of the system that may not hold.

If what you really need is all-or-nothing, read committed visibility of the docs, try the Java API. We designed in particular features for this particular use case.

1 Like

Thanks for your explanation, will try to have a perf test with 5k documents with transaction and will provide an update

@sri_ram
For 5k documents you’ll likely want to use the reactive API, to allow parallelising the staging work. There’s an example here. You’ll also benefit from future performance improvements we have planned, that will parallelise the commit portion.

Note that currently the API has to be used in a specific way to be thread-safe, as detailed in that documentation link. The next release of the Java transactions library will remove those restrictions, which will make parallel operations even simpler. E.g. you won’t need to perform the first operation in serial anymore, or track failed operations to rollback the transaction. The ETA for the next release is middle of next month.

Also that example is doing all operations in parallel, and you may need something more sophisticated with 5k documents. Something like the below should work:

        List<String> docIds = Arrays.asList("doc1", "doc2", "doc3", "doc4", "doc5");
        int concurrency = 100; // This many operations will be in-flight at once
        
        TransactionResult result = transactions.reactive((ctx) -> {
            return Flux.fromIterable(docIds)
                    .parallel(concurrency)
                    .runOn(Schedulers.boundedElastic())
                    .concatMap(docId -> ctx.get(collection.reactive(), docId)
                            .flatMap(doc -> {
                                JsonObject content = doc.contentAsObject();
                                content.put("value", "updated");
                                return ctx.replace(doc, content);
                            }))
                    .sequential()
                    .then();
        }).block();

This example requires the upcoming release, and will not work with the current version of the library. For the current version you will need to use something like this instead:

        TransactionResult result = transactions.reactive((ctx) -> {

            // Tracks whether all operations were successful
            AtomicBoolean allOpsSucceeded = new AtomicBoolean(true);

            // The first mutation must be done in serial, as it also creates a metadata entry
            return ctx.get(collection.reactive(), docIds.get(0)).flatMap(doc -> {
                        JsonObject content = doc.contentAsObject();
                        content.put("value", "updated");
                        return ctx.replace(doc, content);
                    })

                    // Do all other docs in parallel
                    .thenMany(Flux.fromIterable(docIds.subList(1, docIds.size()))
                            .parallel(concurrency)
                            .runOn(Schedulers.boundedElastic())
                                    .concatMap(docId -> ctx.get(collection.reactive(), docId).flatMap(doc -> {
                                                JsonObject content = doc.contentAsObject();
                                                content.put("value", "updated");
                                                return ctx.replace(doc, content);
                                            }).onErrorResume(err -> {
                                                allOpsSucceeded.set(false);
                                                // App should replace this with logging
                                                err.printStackTrace();

                                                // Allow other ops to finish
                                                return Mono.empty();
                                            }))

                    // The commit or rollback must also be done in serial
                    ).then(Mono.defer(() -> {
                        // Commit iff all ops succeeded
                        if (allOpsSucceeded.get()) {
                            return ctx.commit();
                        } else {
                            throw new RetryTransaction();
                        }
                    }));
        }).block();
1 Like