Reactive code issue

java

#1

Hi,

I am trying to use bulk get using reactive API but the performance I am getting is not that good. Can you look at code and suggest what I am doing wrong.

TestReactive.java

import java.text.MessageFormat;
import java.util.Arrays;
import java.util.List;
import java.util.logging.ConsoleHandler;
import java.util.logging.FileHandler;
import java.util.logging.Handler;
import java.util.logging.Level;
import java.util.logging.Logger;

import com.couchbase.client.java.AsyncBucket;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import com.couchbase.client.java.query.AsyncN1qlQueryResult;
import com.couchbase.client.java.query.AsyncN1qlQueryRow;
import com.couchbase.client.java.query.N1qlQuery;

import rx.Observable;
import rx.functions.Func1;

public class TestReactive {
        public static void main(String[] args) {
                /*Logger logger = Logger.getLogger("com.couchbase");
                logger.setLevel(Level.FINEST);
                for (Handler h : logger.getParent().getHandlers()) {
                        if (h instanceof ConsoleHandler) {
                                h.setLevel(Level.FINEST);
                        }
                }*/
                TestReactive testReactive = new TestReactive();
                testReactive.testCode();
        }
        public void testCode() {
                CouchbaseEnvironment couchbaseEnvironment = DefaultCouchbaseEnvironment.builder()
                                .queryTimeout(10000000)
                                .kvTimeout(10000000)
                                .searchTimeout(1000000)
                                .socketConnectTimeout(1000000000)
                                .managementTimeout(1000000000)
                                .connectTimeout(1000000000)
                                .viewTimeout(1000000000)
                                .autoreleaseAfter(1000000000)
                                .build();
                Cluster cluster = CouchbaseCluster.create(couchbaseEnvironment, Arrays.asList("pdcbcl00149.ind.test.com"));
                Bucket bucket = cluster.openBucket("gpc", "G1@b@l123");
                AsyncBucket asyncBucket = bucket.async();
                String n1ql = MessageFormat.format("select raw meta().id from `{0}` LIMIT 500", asyncBucket.name());

                //Get 500 meta() ids
                long startTime = System.currentTimeMillis();
                List<String> metaIds = asyncBucket.query(N1qlQuery.simple(n1ql))
                                .doOnNext(res -> res.info().forEach(t -> {
                                        System.out.println("N1QlMetrics: " + t);;
                                }))
                                .flatMap(AsyncN1qlQueryResult::rows)
                                .flatMap(new Func1 <AsyncN1qlQueryRow, Observable <String>>() {
                                        @Override
                                        public Observable<String> call(AsyncN1qlQueryRow asyncN1qlQueryRow) {
                                                return Observable.just(new String(asyncN1qlQueryRow.byteValue()).replace("\"", "").replace("[", "").replace("]", ""));
                                        }
                                })
                                .toList()
                                .toBlocking()
                                .single();
                System.out.println("Time taken in getting meta().id: " + (System.currentTimeMillis() - startTime));

                // JVM warm-up
                Observable.from(metaIds.subList(0, 100))
                .flatMap(new Func1<String, Observable<JsonDocument>>() {
                        @Override
                        public Observable<JsonDocument> call(String metaId) {
                                return asyncBucket.get(metaId).singleOrDefault(JsonDocument.create(metaId));
                        }
                })
                .toList()
                .toBlocking()
                .single();

                // Get the actual 500 JsonDocuments
                startTime = System.currentTimeMillis();
                List<JsonDocument> documents = Observable.from(metaIds).flatMap(new Func1<String, Observable<JsonDocument>>() {
                        @Override
                        public Observable<JsonDocument> call(String metaId) {
                                return asyncBucket.get(metaId).singleOrDefault(JsonDocument.create(metaId));
                        }
                })
                                .toList()
                                .toBlocking()
                                .single();
                System.out.println("Time taken in getting 500 documents: " + (System.currentTimeMillis() - startTime));
                bucket.close();
                System.out.println("Completed.....");

        }
}

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>
  <groupId>com.couchbase.test</groupId>
  <artifactId>standAloneApplication</artifactId>
  <version>0.0.1-SNAPSHOT</version>
    <dependencies>
        <dependency>
            <groupId>com.couchbase.client</groupId>
            <artifactId>java-client</artifactId>
            <version>2.6.0</version>
        </dependency>
    </dependencies>
</project>

Logs:

sh-4.2$java -cp .:/opt/app-root/src/target/lib/* TestReactive
Jul 20, 2018 7:39:37 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured query timeout is greater than the maximum request lifetime. This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured key/value timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured view timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured search timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.java.env.DefaultCouchbaseEnvironment <init>
WARNING: The configured management timeout is greater than the maximum request lifetime.This can lead to falsely cancelled requests.
Jul 20, 2018 7:39:38 AM com.couchbase.client.core.CouchbaseCore <init>
INFO: CouchbaseEnvironment: {sslEnabled=false, sslKeystoreFile='null', sslTruststoreFile='null', sslKeystorePassword=false, sslTruststorePassword=false, sslKeystore=null, sslTruststore=null, bootstrapHttpEnabled=true, bootstrapCarrierEnabled=true, bootstrapHttpDirectPort=8091, bootstrapHttpSslPort=18091, bootstrapCarrierDirectPort=11210, bootstrapCarrierSslPort=11207, ioPoolSize=12, computationPoolSize=12, responseBufferSize=16384, requestBufferSize=16384, kvServiceEndpoints=1, viewServiceEndpoints=12, queryServiceEndpoints=12, searchServiceEndpoints=12, configPollInterval=2500, configPollFloorInterval=50, networkResolution=NetworkResolution{name='auto'}, ioPool=NioEventLoopGroup, kvIoPool=null, viewIoPool=null, searchIoPool=null, queryIoPool=null, analyticsIoPool=null, coreScheduler=CoreScheduler, memcachedHashingStrategy=DefaultMemcachedHashingStrategy, eventBus=DefaultEventBus, packageNameAndVersion=couchbase-java-client/2.6.0 (git: 2.6.0, core: 1.6.0), retryStrategy=BestEffort, maxRequestLifetime=75000, retryDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=100, upper=100000}, reconnectDelay=ExponentialDelay{growBy 1.0 MILLISECONDS, powers of 2; lower=32, upper=4096}, observeIntervalDelay=ExponentialDelay{growBy 1.0 MICROSECONDS, powers of 2; lower=10, upper=100000}, keepAliveInterval=30000, continuousKeepAliveEnabled=true, keepAliveErrorThreshold=4, keepAliveTimeout=2500, autoreleaseAfter=1000000000, bufferPoolingEnabled=true, tcpNodelayEnabled=true, mutationTokensEnabled=false, socketConnectTimeout=1000000000, callbacksOnIoPool=false, disconnectTimeout=25000, requestBufferWaitStrategy=com.couchbase.client.core.env.DefaultCoreEnvironment$4@5dfcfece, certAuthEnabled=false, coreSendHook=null, forceSaslPlain=false, compressionMinRatio=0.83, compressionMinSize=32, operationTracingEnabled=true, operationTracingServerDurationEnabled=true, tracer=ThresholdLogTracer, orphanResponseReportingEnabled=true, orphanResponseReporter=DefaultOrphanResponseReporter, keyValueServiceConfig=KeyValueServiceConfig{minEndpoints=1, maxEndpoints=1, pipelined=true, idleTime=0}, queryServiceConfig=QueryServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, searchServiceConfig=SearchServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, viewServiceConfig=ViewServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, analyticsServiceConfig=AnalyticsServiceConfig{minEndpoints=0, maxEndpoints=12, pipelined=false, idleTime=300}, queryTimeout=10000000, viewTimeout=1000000000, searchTimeout=1000000, analyticsTimeout=75000, kvTimeout=10000000, connectTimeout=1000000000, dnsSrvEnabled=false}
Jul 20, 2018 7:39:43 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.22.69.133/pdcbcl00149ecp1.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.config.DefaultConfigurationProvider upsertBucketConfig
INFO: Selected network configuration: default
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.config.DefaultConfigurationProvider$8 call
INFO: Opened bucket gpc
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.22.69.133/pdcbcl00149ecp1.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.91/lpdospdb50750.ind.test.com
Jul 20, 2018 7:39:44 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.92/lpdospdb50751.ind.test.com
Jul 20, 2018 7:39:45 AM com.couchbase.client.core.node.CouchbaseNode signalConnected
INFO: Connected to Node 10.0.213.93/lpdospdb50752.ind.test.com

N1QlMetrics: N1qlMetrics{resultCount=500, errorCount=0, warningCount=0, mutationCount=0, sortCount=0, resultSize=7346, elapsedTime='6.654785ms', executionTime='6.618172ms'}

Time taken in getting meta().id: 1505

Time taken in getting 500 documents: 3600

Jul 20, 2018 7:39:51 AM com.couchbase.client.core.config.DefaultConfigurationProvider$11 call
INFO: Closed bucket gpc
Completed.....


Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.93/lpdospdb50752.ind.test.com
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.91/lpdospdb50750.ind.test.com
Jul 20, 2018 7:39:51 AM com.couchbase.client.core.node.CouchbaseNode signalDisconnected
INFO: Disconnected from Node 10.0.213.92/lpdospdb50751.ind.test.com
sh-4.2$ 

Any guidance would be of great help.


#2

Any suggestions?
Am I missing something or it is issue with couchbase server?


#3

Is there anything I am doing wrong in code. Please suggest.