Java sdk reactive transactions responses

Hi

I am trying to use java sdk (java-client v3.4.3).
Since I need transactions, I am using transactional flavor of cluster object
cluster.reactive().transactions().run((ctx) -> {});
Both sync/ async versions expect lambda and return TransactionResult /
Mono<TransactionResult>

Since java requires local variables to be final or effectively final, how am I supposed to share the response of say ctx.insert() or ctx.replace() outside this transaction? I want to access the cas value for ctx.insert but I am not able to send data out of this transactions().run() function.

Hi @Ankur-Shukl
You can use an AtomicReference, but it won’t help you much as the document will be changed again during the commit, which happens after the lambda exits successfully. This will update the document’s CAS, making your saved CAS outdated.

We are seeing some interest from users in having the post-commit CAS values. It’s not necessarily something that’s off the table, but we do need to be very careful and conservative in how we evolve the API. It would help us if you could broadly outline your use-case for it?

1 Like

Hi @graham.pople

  1. The cas value is needed to let consumer know the max CAS version to process for a document. But this is just one case around cas, my major requirement is transactions with bulk writes.
  2. I have pure transactional use cases for writes. As per documentation of CB (Using Couchbase Transactions | Couchbase Docs), this does not say anything about using async version and uses reactive programming completely, so I am assuming transactions are not even supported for cluster.async() version. I also did not find any method to initiate transaction with cluster.async() versions.
  3. Given that I am using transactions, let me take an example:
ReactiveCollection rc = collection.reactive();
List<TransactionGetResult> getResultsMono = new ArrayList<>();
TransactionResult transactionResult = cluster.reactive().transactions().run((ctx) -> {
            JsonObject js = JsonObject.create().put("name","testName");        
            ctx.insert(rc, "abcd", js);
            return ctx.insert(rc, "pqrs", js);
        }).block();

cluster.reactive().transactions().run() expects a return statement from the lambda of type Mono<?>. In above example, I am doing 2 inserts, and any of these 2 can fail. However, I can return Mono from only 1 of these 2. This is really confusing. Which is the Mono<?> expected by transactions().run(), always the last one?

As long as I did not return a null, and returned a Mono of anything, there was no change in output of TransactionResult. I even returned a Mono<List> and still same response.

  1. All the examples given in documentation of java sdk tell how to chain operations within a transaction. Usually, applications interact with DBs via a data access layer, which primarily would provide bulk APIs (txs with multi writes options), and not too much custom logic to enable long chains. Going by this logic, async() version using Futures should be recommended more, but your documentation has almost nothing on it.

  2. Rather than create a new transactions object for every new transaction, I am creating just once for one cluster object using

ReactiveTransactions txs = cluster.reactive().transactions();

Is new ReactiveTransactions recommended before every transaction? I do not see anything dynamic passed during the creation of ReactiveTransactions, it is just core and environment().jsonSerializer().

Hi @Ankur-Shukl

  1. Correct, there’s isn’t a cluster.async() version of the transactions API. Reactive and blocking are supported as those are the two that are most commonly used. If an async CompletableFuture-based API would be useful to you, then please let us know.

3 & 4. When you’re using reactive programming, it’s crucial to chain the operations together using .concatMap() and .flatMap(). Please take a look at the Project Reactor docs. Because your operations are not in a chain, that is why your transaction is not behaving correctly.

If you’re unfamiliar with this style of programming then I would recommend using the much simpler blocking API. Then your example would be simply:

final JsonObject js = JsonObject.create().put("name","testName");        

TransactionResult transactionResult = cluster.transactions().run(ctx -> {
    ctx.insert(rc, "abcd", js);
    ctx.insert(rc, "pqrs", js);
});
  1. It’s up to you. You can access cluster.transactions().run( on each all call, or cache the result of the cluster.transactions() object. There should be basically no difference in performance, as those objects are very cheap to create.

Hope this helps.

@graham.pople to ensure good performance, I need to use reactive versions.
Can you please provide a sample of how you would run multiple inserts within a transaction using reactive version?
These inserts have no specified ordering and so can be done in parallel once a transaction a started.
The only place where I have confusion is the Mono to be returned to the run() function.

I tried following

        reactiveTransactions
                .run((ctx) -> {
                    Flux<TransactionGetResult> resultFlux = Flux.fromIterable(input)
                            .flatMap(document -> {
                                ReactiveCollection collection =
                                        cbBuckets.get(document.getBucketName()).getBucket().defaultCollection().reactive();
                                return ctx.insert(collection, document.getKey(), document.getDoc())
                                        .onErrorResume(err -> Mono.error(new NonRetryableException("tx_insert:exception:" + document.getKey() + ":" + err.getLocalizedMessage()))
                                        )
                                        .subscribeOn(Schedulers.parallel());
                            }, MAX_CONCURRENCY);

                    return resultFlux.collectList();
                })
                .timeout(Duration.ofSeconds(TXN_TIMEOUT))
                .subscribeOn(Schedulers.parallel())
                .block();

Hi @Ankur-Shukl

Sure, this should do what you need:

    TransactionOptions options = TransactionOptions.transactionOptions().timeout(Duration.ofSeconds(TXN_TIMEOUT));

    reactiveTransactions
      .run(ctx -> Flux.fromIterable(input)
        .parallel(MAX_CONCURRENCY)
        .runOn(Schedulers.parallel())
        .concatMap(document -> {
          ReactiveCollection collection =
            cbBuckets.get(document.getBucketName()).getBucket().defaultCollection().reactive();
          return ctx.insert(collection, document.getKey(), document.getDoc());
        }), options)
      .block();

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.