Something unexpected makes bucket unable to fetch doc with get()

JavaSDK 2.3.5

Trying to debug a problem with arrayAppend, i wrote a simple test. I could not find what i need, but i’ve found a different “? bug ?”. The scenario is simple: just perform arrayAppend with multiple Observables and after that fetch the doc with bucket.get() and check the number of elements in array.

The problem (seems like) comes into play when you use more (much more ?) Observables then number of threads in ioPool (UPDATE: in fact, i must operate computationPool threads-number, but changing to that changes nothing, seems like they both have default init of the same size, which is 4 in my testing machine)

Here is the code (i removed “package arrayappend”):

import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.atomic.AtomicInteger;
public class ArrayAppend {
// number of simulteneous "reactive" calls
public static final int MAX_SIMULTENEOUS_OBSERVABLES = 16;
// total number of insertions
public static final int MAX_INSERTIONS_NUM = 1024;

private static final AtomicInteger totalArrayAppendCalls;
private static final Bucket bucket;
private static final Phaser doneSync;
private static final CouchbaseEnvironment ce;
// init couchbase
static {
    ce = DefaultCouchbaseEnvironment
        .builder()
            .kvTimeout(10000)
        .build();
    final List<String> nodes = new ArrayList();
    nodes.add("192.168.254.80");
    nodes.add("192.168.254.81");
    nodes.add("192.168.254.82");
    
    final Cluster cluster = CouchbaseCluster.create(ce, nodes);
    bucket = cluster.openBucket(
        "storage",
        "password"    
    );
    totalArrayAppendCalls = new AtomicInteger(MAX_INSERTIONS_NUM);
    doneSync = new Phaser(MAX_SIMULTENEOUS_OBSERVABLES > ce.ioPoolSize() ? ce.ioPoolSize() + 1 : MAX_SIMULTENEOUS_OBSERVABLES + 1) ;
}
public static final String DOCID = "doc";
public static final String PATH = "array";
public static final void next() {
    try {
        final int calls = totalArrayAppendCalls.decrementAndGet();
        if(calls >= 0) {
            bucket.async()
                .mutateIn(DOCID)
                .arrayAppend(PATH, System.currentTimeMillis(), false)
                .execute()
                .subscribe(
                    (d) -> { }, 
                    (e) -> { System.out.println(String.format("error for %d", calls)); next(); }, 
                    ()  -> { next(); }
                );
        } else doneSync.arriveAndAwaitAdvance();
    } catch(final Exception e) {
        System.out.println("... unexpected oops");
    }    
}
public static void main(String[] args) {
    bucket.upsert(JsonDocument.create(DOCID, JsonObject.empty().put(PATH, JsonArray.empty())));
    for(int i = 0; i < MAX_SIMULTENEOUS_OBSERVABLES; i++) next();
    doneSync.arriveAndAwaitAdvance();
    System.out.println("ok, append completed");
    int max = 10, imax = 0, size = 0;
    while(max > 0) {
        imax = 3;
        while(imax > 0) {
            try {
                size = bucket.get(DOCID).content().getArray(PATH).size();
                break;
            } catch(RuntimeException e) {
                System.out.println("... hm, timed out, let's retry / " +  imax + " / " + e);
            }
            imax--;
        }    
        if(imax <= 0) {
            System.out.println("unable to fetch document / " + imax);
            break;
        }
        if(size != MAX_INSERTIONS_NUM) {
            System.out.println("not formed yet => " + size + " / " + MAX_INSERTIONS_NUM);
            try { 
                Thread.currentThread().sleep(1000); 
            } catch(final Exception e) { 
                System.out.println("...sleep oops"); 
            }
        } else {
            System.out.println("doc formation completed => " + size + " / " + MAX_INSERTIONS_NUM);
            break;
        }
        max--;
    }
    System.out.println("done");
}

Just set MAX_SIMULTENEOUS_OBSERVABLES >= 2* ioPoolSize and you’ll see: it’s unable to fetch with doc with bucket.get() after multi-observable-calls:

ok, append completed
… hm, timed out, let’s retry / 3 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
… hm, timed out, let’s retry / 2 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
… hm, timed out, let’s retry / 1 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
unable to fetch document / 0
done

it’s pretty good with 1 Observable (we can see async “doc-growing”):

ok, append completed
not formed yet => 1 / 1024
not formed yet => 159 / 1024
not formed yet => 340 / 1024
not formed yet => 514 / 1024
not formed yet => 710 / 1024
not formed yet => 915 / 1024
doc formation completed => 1024 / 1024
done

it’s also good with 4, 6 and with even 8 Observables. But it blocks AFTER using 16+ Observables on simple bucket.get().

What is that ?

Depending on your full call stack for your Timeouts, this could be similar to:

and:

Also, make sure your JVM has enough memory allocated.

? probably, you supposed a recursion, saying this ? there should be none.
For example, 12 Observables and 100,000 appends (i also added “count logging” to output):

time java -jar -Xmx8M -Xms8M ArrayAppend.jar
… couchbase env init log skipped
99000
98000
… and counting down, and down, and down …
2000
1000
0
ok, append completed
java.lang.RuntimeException: java.util.concurrent.TimeoutException
at com.couchbase.client.java.util.Blocking.blockForSingle(Blocking.java:73)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:119)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:114)
at arrayappend.ArrayAppend.main(ArrayAppend.java:81)
Caused by: java.util.concurrent.TimeoutException
… 4 more
… hm, timed out, let’s retry / 3 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
java.lang.RuntimeException: java.util.concurrent.TimeoutException
at com.couchbase.client.java.util.Blocking.blockForSingle(Blocking.java:73)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:119)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:114)
at arrayappend.ArrayAppend.main(ArrayAppend.java:81)
Caused by: java.util.concurrent.TimeoutException
… 4 more
… hm, timed out, let’s retry / 2 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
java.lang.RuntimeException: java.util.concurrent.TimeoutException
at com.couchbase.client.java.util.Blocking.blockForSingle(Blocking.java:73)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:119)
at com.couchbase.client.java.CouchbaseBucket.get(CouchbaseBucket.java:114)
at arrayappend.ArrayAppend.main(ArrayAppend.java:81)
Caused by: java.util.concurrent.TimeoutException
… 4 more
… hm, timed out, let’s retry / 1 / java.lang.RuntimeException: java.util.concurrent.TimeoutException
unable to fetch document / 0
done

real 6m49.029s
user 0m42.882s
sys 0m7.748s

8Mb, 6.49min (-30 sec for 3 timeouts) =~ 6min for 100.000; slow, but my CB-server is under load now.

trace is absolutely generic for blocking API and leads us to a generic timeout-wrapper for Observables:

And, as you can see from Line73 (with content from Line102 “… class TrackingSubscriber…”), the top-side of a problem is “an observable that did not return result in time”. The reason “why?” is deeper, and more deep investigation is needed (and we all are lazy ones and love “shortcuts”, that’s why we are here :slight_smile:
… except @daschl, @subhashni :wink: ( … but they are busy ones :slight_smile: )

Hi @egrep, I haven’t used phaser before, it seems like a complex synchronizer :). Tried changing the code a bit to await phase before proceeding to the get and it works as expected.

You can give a try. Here it is


import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.CouchbaseCluster;
import com.couchbase.client.java.document.JsonDocument;
import com.couchbase.client.java.document.json.JsonArray;
import com.couchbase.client.java.document.json.JsonObject;
import com.couchbase.client.java.env.CouchbaseEnvironment;
import com.couchbase.client.java.env.DefaultCouchbaseEnvironment;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Phaser;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

public class ArrayAppend {

	// number of simulteneous "reactive" calls
	public static final int MAX_SIMULTENEOUS_OBSERVABLES = 16;
	// total number of insertions
	public static final int MAX_INSERTIONS_NUM = 1024;

	private static final AtomicInteger totalArrayAppendCalls;
	private static final Bucket bucket;
	private static final Phaser doneSync;
	private static final CouchbaseEnvironment ce;

	// init couchbase
	static {
		ce = DefaultCouchbaseEnvironment
				.builder()
				.kvTimeout(10000)
				.build();

		final List<String> nodes = new ArrayList();
		nodes.add("127.0.0.1");

		final Cluster cluster = CouchbaseCluster.create(ce, nodes);
		bucket = cluster.openBucket(
				"storage",
				"password"
		);
		totalArrayAppendCalls = new AtomicInteger(MAX_INSERTIONS_NUM);
		doneSync = new Phaser(MAX_SIMULTENEOUS_OBSERVABLES);
	}

	public static final String DOCID = "doc";
	public static final String PATH = "array";

	public static final void next() {
		try {
			final int calls = totalArrayAppendCalls.decrementAndGet();
			if (calls >= 0) {
				bucket.async()
						.mutateIn(DOCID)
						.arrayAppend(PATH, System.currentTimeMillis(), false)
						.execute()
						.subscribe(
								(d) -> {
								},
								(e) -> {
									System.out.println(String.format("error for %d", calls));
									next();
								},
								() -> {
									next();
								}
						);
			} else doneSync.arrive();
		} catch (final Exception e) {
			System.out.println("... unexpected oops");
		}
	}

	public static void main(String[] args) {
		bucket.upsert(JsonDocument.create(DOCID, JsonObject.empty().put(PATH, JsonArray.empty())));

		for (int i = 0; i < MAX_SIMULTENEOUS_OBSERVABLES; i++) next();

		doneSync.awaitAdvance(0);

		System.out.println("ok, append completed");
		int max = 10, imax = 0, size = 0;
		while (max > 0) {
			imax = 3;
			while (imax > 0) {
				try {
					size = bucket.get(DOCID, 3, TimeUnit.SECONDS).content().getArray(PATH).size();
					break;
				} catch (RuntimeException e) {
					System.out.println("... hm, timed out, let's retry / " + imax + " / " + e);
				}
				imax--;
			}
			if (imax <= 0) {
				System.out.println("unable to fetch document / " + imax);
				break;
			}
			if (size != MAX_INSERTIONS_NUM) {
				System.out.println("not formed yet => " + size + " / " + MAX_INSERTIONS_NUM);
				try {
					Thread.currentThread().sleep(1000);
				} catch (final Exception e) {
					System.out.println("...sleep oops");
				}
			} else {
				System.out.println("doc formation completed => " + size + " / " + MAX_INSERTIONS_NUM);
				break;
			}
			max--;
		}
		System.out.println("done");
	}
}

it seems like a complex synchronizer :).

:wink: sounds like “dude, please, next time RTFM before using non-deterministic reverse-emissive synchrophasotron”

thanks, @subhashni, i do see the problem now (post-sync observables are ones who blocks comp-pool threads and there is no one to unblock), and in fact, your sync-solution is very elegant (and my is just wrong).

So, even considering that i asked one of your colleagues about “is it ok to say something pleasant to girls without any special occasion, just because they are girls in USA IT-culture” and the answer was “no, it’s not a good idea, at least in IT”, now we do have an occasion, so
thank you and kinda “pretty-cutie :kissing_heart: :kissing_heart: :kissing_heart:
And :trophy: of course :wink:

1 Like