Inconsistent result while using bucket.async()

n1ql
java
#1

Hello,

I have the following code

HashMap<String, String> codes = new HashMap<>();

                bucket
                .async()
                .query(Select.select("mappings, id")
                        .fromCurrentBucket()
                        .where("type='X'"))
                .timeout(5, TimeUnit.MINUTES)
                .flatMap(AsyncN1qlQueryResult::rows)
                .subscribe(new Subscriber<AsyncN1qlQueryRow>() {

                    @Override
                    public void onCompleted() {
                        log.info("Populating codes lookup map, size {} - FINISHED", codes.size());
                    }

                    @Override
                    public void onError(Throwable throwable) {
                        log.error("Problem while initializing codes lookup map", throwable);
                    }

                    @Override
                    public void onNext(AsyncN1qlQueryRow asyncN1qlQueryRow) {
                        JsonObject value = asyncN1qlQueryRow.value();
                        String id = value.getString("id");
                        JsonObject providers = value.getObject("mappings");
                        Map<String, Object> providersMap = providers.toMap();

                        for(Map.Entry<String, Object> mapEntry : providersMap.entrySet()) {
                            String providerName = mapEntry.getKey();
                            ArrayList<HashMap> codesList = (ArrayList<HashMap>) mapEntry.getValue();

                            for(HashMap code : codesList) {
                                String key = providerName + Constants.LOOKUP_KEY_SEPERATOR + code.get("code");
                                String val = codes.get(key);

                                codes.put(key, id);
                            }
                        }
                    }
                });

Each time I run this codes.size() is inconsistent… I’m not getting any errors. What could be the reason that I’m missing some records?

Edit:

I discovered why each time the code is ran it yields an inconsistent # of results. It’s due to the timeout configured on the environment here

    DefaultCouchbaseEnvironment defaultCouchbaseEnvironment =
            DefaultCouchbaseEnvironment
                    .builder()
                    .connectTimeout(60000)
                    .kvTimeout(60000)
                    .socketConnectTimeout(60000)
                    .autoreleaseAfter(5000)
                    .queryTimeout(60000)  <-------
                    .build();

If I increase it then I start getting consistent results.

I’m confused if queryTimeout is configured on the environment, is the timeout that I define on the query itself (on the code above) useless and does not take effect? Can anyone clarify please?

#2

How long does your query take? the .timeout() operator does not change the timeout we send to the server. It looks like it runs into a timeout on the server which is when the query ends.

Also, your code is not thread safe which can also be a problem. the subscriber callback inserts into the codes hashmap from a different thread - please use the ConcurrentHashMap instead.

1 Like
#3

Short answer: yes, it’s useless.

That timeout isn’t “on the query itself”… it’s an operator on the RxJava Observable which is independent of the code that executes the query and handles low-level timeouts. The only way the timeout operator on the observable would have any effect is if it were set to a duration shorter than the queryTimeout on the environment.