Spark couchbase write not work


#1

Couchbase version 5.1
spark connector version : 2.2.0
and my couchbase enviorment as below

{
sslEnabled = false, sslKeystoreFile = 'null', sslKeystorePassword = false, sslKeystore = null, bootstrapHttpEnabled = true, bootstrapCarrierEnabled = true, bootstrapHttpDirectPort = 8091, bootstrapHttpSslPort = 18091, bootstrapCarrierDirectPort = 11210, bootstrapCarrierSslPort = 11207, ioPoolSize = 4, computationPoolSize = 4, responseBufferSize = 16384, requestBufferSize = 16384, kvServiceEndpoints = 1, viewServiceEndpoints = 12, queryServiceEndpoints = 12, searchServiceEndpoints = 12, configPollInterval = 2500, ioPool = NioEventLoopGroup, kvIoPool = null, viewIoPool = null, searchIoPool = null, queryIoPool = null, coreScheduler = CoreScheduler, memcachedHashingStrategy = DefaultMemcachedHashingStrategy, eventBus = DefaultEventBus, packageNameAndVersion = couchbase - java - client / 2.5 .0(git: 2.5 .0, core: 1.5 .0), retryStrategy = BestEffort, maxRequestLifetime = 75000, retryDelay = ExponentialDelay {
	growBy 1.0 MICROSECONDS, powers of 2;
	lower = 100, upper = 100000
}, reconnectDelay = ExponentialDelay {
	growBy 1.0 MILLISECONDS, powers of 2;
	lower = 32, upper = 4096
}, observeIntervalDelay = ExponentialDelay {
	growBy 1.0 MICROSECONDS, powers of 2;
	lower = 10, upper = 100000
}, keepAliveInterval = 3000000, continuousKeepAliveEnabled = true, keepAliveErrorThreshold = 4, keepAliveTimeout = 3000000, autoreleaseAfter = 2000, bufferPoolingEnabled = true, tcpNodelayEnabled = true, mutationTokensEnabled = false, socketConnectTimeout = 3000000, callbacksOnIoPool = false, disconnectTimeout = 25000, requestBufferWaitStrategy = com.couchbase.client.core.env.DefaultCoreEnvironment$2 @1c7e9fcd, certAuthEnabled = false, coreSendHook = null, queryTimeout = 75000, viewTimeout = 75000, searchTimeout = 75000, analyticsTimeout = 75000, kvTimeout = 1000000, connectTimeout = 3000000, dnsSrvEnabled = false

}

but still I faced this error com.couchbase.client.core.RequestCancelledException: Request cancelled in-flight.


#2

I am facing the same problem, had tried to set all the timeouts, but didn’t work.


#3

Hi @wenjiezhang2013. Can you provide your code, and, assuming you’re running a query, an estimate of how much data it would be returning? Also, are you able to run the query in the Couchbase query workbench UI?


#4

I am actually not running any query in my spark job. My spark job is loading data from a table in oracle which has about 9m records and write all the data into couchbase.

This is the log I have for my task.

timestamp="2018-08-09T18:17:31,019+0000",level="INFO",threadName="Executor task launch worker for task 1",logger="com.couchbase.client.core.CouchbaseCore",jobName="jdbc-poc",workspace="spark-sample-app",jobInstanceId="1533837911293-196",sparkEnvironment="databridge",instanceName="SparkExecutor",jobRepo="spark-sample-app",version="0.0.1-45",attemptId="1",sslEnabled="false,",sslKeystoreFile="null",sslKeystorePassword="false,",sslKeystore="null,",bootstrapHttpEnabled="true,",bootstrapCarrierEnabled="true,",bootstrapHttpDirectPort="8091,",bootstrapHttpSslPort="18091,",bootstrapCarrierDirectPort="11210,",bootstrapCarrierSslPort="11207,",ioPoolSize="32,",computationPoolSize="32,",responseBufferSize="16384,",requestBufferSize="16384,",kvServiceEndpoints="1,",viewServiceEndpoints="12,",queryServiceEndpoints="12,",searchServiceEndpoints="12,",configPollInterval="2500,",ioPool="NioEventLoopGroup,",kvIoPool="null,",viewIoPool="null,",searchIoPool="null,",queryIoPool="null,",coreScheduler="CoreScheduler,",memcachedHashingStrategy="DefaultMemcachedHashingStrategy,",eventBus="DefaultEventBus,",packageNameAndVersion="couchbase-java-client/2.5.0",retryStrategy="BestEffort,",maxRequestLifetime="75000,",retryDelay="ExponentialDelay{growBy",lower="100,",upper="100000},",reconnectDelay="ExponentialDelay{growBy",lower="32,",upper="4096},",observeIntervalDelay="ExponentialDelay{growBy",lower="10,",upper="100000},",keepAliveInterval="30000,",continuousKeepAliveEnabled="true,",keepAliveErrorThreshold="4,",keepAliveTimeout="2500,",autoreleaseAfter="2000,",bufferPoolingEnabled="true,",tcpNodelayEnabled="true,",mutationTokensEnabled="false,",socketConnectTimeout="1000,",callbacksOnIoPool="false,",disconnectTimeout="25000,",requestBufferWaitStrategy="com.couchbase.client.core.env.DefaultCoreEnvironment$2@1cc17b3d,",certAuthEnabled="false,",coreSendHook="null,",queryTimeout="75000,",viewTimeout="75000,",searchTimeout="75000,",analyticsTimeout="75000,",kvTimeout="2147483647,",connectTimeout="2147483647,",dnsSrvEnabled="false}",message="CouchbaseEnvironment: {, (git: 2.5.0, core: 1.5.0), 1.0 MICROSECONDS, powers of 2; 1.0 MILLISECONDS, powers of 2; 1.0 MICROSECONDS, powers of 2;"

And I am getting

com.couchbase.client.core.RequestCancelledException: Could not dispatch request, cancelling instead of     retrying.
    	at com.couchbase.client.core.retry.RetryHelper.retryOrCancel(RetryHelper.java:51)
    	at com.couchbase.client.core.endpoint.AbstractGenericHandler.write(AbstractGenericHandler.java:245)
    	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeWrite0(AbstractChannelHandlerContext.java:732)
    	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:724)
    	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:35)
    	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:1061)
    	at com.couchbase.client.deps.io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:1050)
    	at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:399)
    	at com.couchbase.client.deps.io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:464)
    	at com.couchbase.client.deps.io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:131)
    	at com.couchbase.client.deps.io.netty.util.concurrent.DefaultThreadFactory$DefaultRunnableDecorator.run(DefaultThreadFactory.java:138)
    	at java.lang.Thread.run(Thread.java:748)"

#5

Ah ok. Could you possibly share the important parts of the code? Or perhaps a simplified version of the core writing to Couchbase part?


#6
 {
        final SparkSession spark = SparkSession.builder()
            .appName("databridge-poc")
            .config("com.couchbase.nodes", couchbaseNodes)
            .config("com.couchbase.username", couchbaseUsername)
            .config("com.couchbase.password", couchbasePassword)
            .config("com.couchbase.bucket.spark-poc", "")
            .sparkContext(context)
            .getOrCreate();

        printLog("couchbaseNodes: "
            + couchbaseNodes);

    final DataFrameReader oracleReader = getOracleReader(spark.sqlContext());


    printLog("Oracle connection connected.");

    final JavaRDD<JsonDocument> couchbaseRDD = oracleReader.load()
        .select(to_json(struct(
            struct(col("COL1").alias("addressLine1"),
                col("COL2").alias("addressLine2"),
                col("COL3").alias("city")).alias("address"),
            struct(col("ALT_COL1").alias("addressLine1"),
                col("ALT_COL2").alias("addressLine2"),
                col("ALT_COL3").alias("city")).alias("localizedAddress")
        .toJavaRDD()
        .map(new Function<Row, JsonDocument>() {

            /**  */
            private static final long serialVersionUID = -6423991529277672360L;

            @Override
            public JsonDocument call(final Row v1) throws Exception {
                return JsonDocument.create(UUID.randomUUID().toString(), JsonObject.fromJson(v1.mkString()));
            }
        });

    final String timeout = String.valueOf(Integer.MAX_VALUE);

    System.setProperty("com.couchbase.kvTimeout", timeout);
    System.setProperty("com.couchbase.socketConnectTimeout", timeout);
    System.setProperty("com.couchbase.connectTimeout", timeout);


    couchbaseRDD.context().conf().set("com.couchbase.nodes", couchbaseNodes);
    couchbaseRDD.context().conf().set("com.couchbase.username", couchbaseUsername);
    couchbaseRDD.context().conf().set("com.couchbase.password", couchbasePassword);
    couchbaseRDD.context().conf().set("com.couchbase.bucket.spark-poc", "");
    couchbaseRDD.context().conf().set("com.couchbase.kvTimeout", timeout);
    couchbaseRDD.context().conf().set("com.couchbase.connectTimeout", timeout);
    couchbaseRDD.context().conf().set("com.couchbase.socketConnectTimeout", timeout);

    CouchbaseDocumentRDD.couchbaseDocumentRDD(couchbaseRDD).saveToCouchbase(Integer.MAX_VALUE);

    printLog("Finished writing to CB");
    spark.close();
}

private static DataFrameReader getOracleReader(final SQLContext context) {

    printLog("Opening oracle connection.");
    //oracle user name
    final String oracleUsername = System.getenv("database_username");
    final String oraclepassword = System.getenv("database_password");

    final String jdbcUrl = context.getConf("jdbc_url");
    printLog("jdbc_url: "
        + jdbcUrl);

    return context.read()
        .format("jdbc")
        .option("url", jdbcUrl)
        .option("dbtable", "LOCATION")
        .option("user", oracleUsername)
        .option("password", oraclepassword)
        .option("driver", "oracle.jdbc.driver.OracleDriver");

}