IncrementRequest TIMEOUT

I use Community Edition 6.0.0 cluster with two cocuhbase server and Java SDK ver 3.4.0 and Spring Webflux.
I call saveSomeObject function at Service layer

public Mono<SomeObject> saveSomeObject(SomeObject campaign){
      return basicFactorsBucket.reactive()
                .viewQuery(CouchbaseDesign.ZONE, CouchbaseView.BY_COUNTRY_CODE, viewOptions)
                .flatMapMany(ReactiveViewResult::rows)
                .filter(viewRow -> viewRow.id().isPresent())
                .flatMap(viewRow -> basicFactorsBucket.reactive().defaultCollection()
                    .get(viewRow.id().get()))
                .map(GetResult::contentAsObject)
                .map(json -> {
                    //.. make some Object
                    SomeObject obj = ...;
                    
                    return obj;
                })
                .collect(ConcurrentHashMap<String, SomeObject>::new, (map, task) -> {
                    String key = task.time();

                    if (map.containsKey(key)) {
                        map.get(key).zones().add(task.zones().getFirst());
                    } else {
                        map.put(key, task);
                    }
                })
                .map(tasks -> {
                    // DO some process...
                    
                    Long counter = campaignBucket.defaultCollection().binary()
                            .increment("campaignCounter",
                                IncrementOptions.incrementOptions().delta(1).initial(1).timeout(
                                    Duration.ofMillis(2000))).content();
                                    
                    campaignId = counter;
                    campaignBucket.defaultCollection()
                        .upsert(String.format("campaign::%d", campaignId), jsonObject);

                    return campaign;
                })
                .doOnError(error -> log.error(error.getMessage()))
                .onErrorResume(error -> {
                    log.error(error.getMessage());
                    return Mono.error(new InternalServerErrorException(error.getMessage()));
                });
}

Then some times occure under error log.
It doesn’t happen every time.

 IncrementRequest, Reason: TIMEOUT {"cancelled":true,"completed":true,"coreId":"0x56a0a28600000001","idempotent":false,"lastChannelId":"56A0A28600000001/00000000FE2E258E","lastDispatchedFrom":"SERVER IP:27434","lastDispatchedTo":"SERVER IP:11210","reason":"TIMEOUT","requestId":59414,"requestType":"IncrementRequest","retried":0,"service":{"bucket":"Campaigns","collection":"_default","documentId":"campaignCounter","opaque":"0xe7e4","scope":"_default","type":"kv","vbucket":751},"timeoutMs":2000,"timings":{"totalMicros":2010700}}

and Really sometimes the below error also occurs. But there is no problem.

2023:03:30 10:01:02.856 ERROR 2515106 --- [reactor-http-server-epoll-5] reactor.core.publisher.Operators[error:324] : Operator called default onErrorDropped
io.netty.channel.unix.Errors$NativeIoException: syscall:read(..) failed: Connection reset by peer
        at io.netty.channel.unix.FileDescriptor.readAddress(..)(Unknown Source)

When calling Increment on the third map, it is suspected that the above phenomenon occurs because it does not bucket reactive().
plz help me why some times occure Increment TIMEOUT Error and Operator called default onErrorDropped.

Hi there, apologies for the delay in getting back to you.

From one of those log messages, it says that the connection reset by peer which may indicate a networking issue or that the process has crashed.

This may help you investigate the specific cause further: Handling Errors | Couchbase Docs

1 Like

Do you mean you’re making a blocking call inside a reactive chain? That’s not good, and can lead to timeouts. For a bad example:


Bucket bucket = ...

bucket.reactive().get("foo")
    .map(it -> bucket.upsert("bar", it.contentAsObject)
    //                ^-- Blocking call in reactive chain.
    //                    Never do this!

Thanks,
David

1 Like

Just to add what to David said - the call to upsert should be bucket.reactive().upsert(…)

It might be more straight-forward obvious just to have a reactiveBucket variable and use that throughout, instead of having to .reactive() everywhere.

Also - when intercepting an error message, if only the getMessage() is propagated, much useful information is lost. Mono.error(error) would not lose that information.

                .onErrorResume(error -> {
                    log.error(error.getMessage());
                    return Mono.error(new InternalServerErrorException(error.getMessage()));

Also - since you already log.errorin doOnError() there’s no need to do it again in onErrorResume().

Also - since the only other thing that the onErrorResume() does is return the error - which would happen anyway without the onErrorResume, I think you would be better off just not to have the onErrorResume().

1 Like

This topic was automatically closed 90 days after the last reply. New replies are no longer allowed.