Could not publish Event because the queue is full. RequestRetryScheduledEvent

Hi Team,

When trying to push data from nifi to bucket getting the below error.

Could not publish Event because the queue is full. RequestRetryScheduledEvent

connections were established using jdk.

bucket memory : 16 GB
KV connection : (tried with 8 & 16 ).
Enabled full ejection policy for the bucket.

This issues occur only when trying to insert data by asynchronous mode.

when trying by using synchronous mode it retries and update with huge delay and i could see some data failed.

Can someone help on this.

Thanks and Regards,
Suresh

Hi @suresh2112,

At this point all we know is you are using Apache NiFi and tried some unknown code (both async and sync) to push data into couchbase.

Without seeing your actual code it is very hard to give you advice about your issue and how to solve it. IMHO it does sound like a back pressure issue. So we would need more information to advise.

Best

Jon Strabala

HI jon,

We will be processing more than 10 lakh record at one time…

Please find the below code FYR…

PersistTo persistTo = PersistTo.NONE;
ReplicateTo replicateTo = ReplicateTo.NONE;
Duration kvTimeOut = Duration.ofSeconds(30);

		AsyncBucket asyncBucket = openBucket(context).async();
		AsyncCollection asyncBucketCollection = asyncBucket.defaultCollection();
		UpsertOptions upsertOptions = UpsertOptions.upsertOptions().durability(persistTo, replicateTo).timeout(kvTimeOut);
		
		for (JsonObject jsonObject : listOfObjects) {
			
			CompletableFuture<MutationResult> result = asyncBucketCollection.upsert(UUID.randomUUID().toString(), jsonObject, upsertOptions);
			result.exceptionally(ex -> {
				throw new ProcessException("Unable to save the documents");
			});
		}

Thanks and Regards,
Suresh

Hi Suresh,

Thanks for the further details. Before I get into a possible solution I recommend you review the following document. Choosing an API | Couchbase Docs it specifically states (emphasis added):

We recommend using the reactive API over the CompletableFuture counterpart because it provides all the bells and whistles you need to build scalable asynchronous stacks.

We recommend using this API [CompletableFuture] only if you are either writing integration code for higher level concurrency mechanisms or you really need the last drop of performance. In all other cases, the blocking API (for simplicity) or the reactive API (for richness in operators) is likely the better choice.`

I’ve played with the CompletableFuture API before it works but it takes effort to get right (note I am no expert as my focus is on Eventing Service - serverless JavaScript compute).

Note, you might add your own RetryStrategy as per Handling Errors | Couchbase Docs although it is not absoulutly nessesary.

I think what you are seeing is that when the KV system becomes busy for the first time you then overwhelm the KV system by stacking up 1000’s of async requests. Unlike doing CompletableFuture in pure Java there is a limit to the number concurrent async requests that the Couchbase Java SDK can handle.

In my quick solution, I just put in a bit of flow control so as not to overwhelm KV by keeping the CompletableFuture “queue” under some sane bound (I used 512).

I am sure that the Couchbase Java SDK experts like @daschl and @ingenthr would be able to come up with a more elegant solution for implementing flow control or callbacks (so take my solution with a grain of salt). I just wanted to play with the Java SDK3 a bit and learn about Java’s CompletableFuture (which I am still a novice at) thus my implementation may not be elegant or optimal.

Please try this standalone code I provided and adopt the technique for your own use ( it is set to 1,000,000 inserts) you can play with the flow control in “while (queuedRequests.get() >= 512)” try 4, 16, 64, 1024, 2048, 8192 and see what happens. Note hopefully you have a seperate insertion thread and don’t do your loading directly from main.

import com.couchbase.client.java.AsyncCollection;
import com.couchbase.client.java.Bucket;
import com.couchbase.client.java.Cluster;
import com.couchbase.client.java.ClusterOptions;
import com.couchbase.client.java.env.ClusterEnvironment;
import com.couchbase.client.java.json.JsonObject;
import com.couchbase.client.java.kv.MutationResult;
import com.couchbase.client.core.env.IoConfig;
import com.couchbase.client.core.env.IoEnvironment;
import com.couchbase.client.core.env.TimeoutConfig;
import com.couchbase.client.core.retry.BestEffortRetryStrategy;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Logger;


public class CompletableUpsertTest {
	private final static Logger LOGGER = Logger.getLogger(/* MyLogger.class.getName() */ "com");
	
	public static void main(String[] args) throws InterruptedException {
	
	    AtomicLong inputDocCount = new AtomicLong(0);
	    AtomicLong bucketWrSucc = new AtomicLong(0);
	    AtomicLong queuedRequests = new AtomicLong(0);
	    
	    BestEffortRetryStrategy mrs = BestEffortRetryStrategy.INSTANCE.withExponentialBackoff(Duration.ofMillis(1200), Duration.ofSeconds(29), 2);
	    ClusterEnvironment env = ClusterEnvironment.builder()
	        	.retryStrategy(mrs) 
	        	
	    		.ioEnvironment(IoEnvironment
	    		        .eventLoopThreadCount(2) 
	    		)	    		
	    		.ioConfig(IoConfig
	    		        .numKvConnections(6)  
	    		        .maxHttpConnections(32) 
	    		)
	    		.timeoutConfig(TimeoutConfig.kvTimeout(Duration.ofSeconds(30)))
	    	    .build();
	    
	    Cluster cluster = Cluster
	    		.connect("192.168.3.150", ClusterOptions
	    	    .clusterOptions("Administrator","password")
	    	    .environment(env));
	    cluster.waitUntilReady(Duration.ofSeconds(10));
	    
		Bucket bucket = cluster.bucket("destination");
		bucket.waitUntilReady(Duration.ofSeconds(10));
		AsyncCollection asyncBucketCollection = bucket.defaultCollection().async();
		
	    long beg = System.currentTimeMillis();
		for (int i = 0; i < 1000000; i++) {
			inputDocCount.getAndIncrement();
			queuedRequests.getAndIncrement();

			String key = "tstkey:" + i;
			// Make arandom number between 1 ... 100K
			int random_int = (int) (Math.random() * (100000 - 1 + 1) + 1);
			JsonObject data =JsonObject.create().put("id", i).put("type", "tstkey").put("data", random_int);
			
			// Only allow 512 outstanding async requests (respond to back pressure)
			while (queuedRequests.get() >= 512) {
				// wait until we are < 512
				try { Thread.sleep(1); } catch (InterruptedException e) {};
			}

			CompletableFuture<MutationResult> result = asyncBucketCollection.upsert(key, data /* ,upsertOptions */)
				.whenComplete((res, ex) -> {
					// Decrement to free up the flow control for KV writes via the SDK
					queuedRequests.getAndDecrement();
					if (ex == null) {
						// no exception we did our upsert
						bucketWrSucc.incrementAndGet();
					} else {
						// something went wrong
						System.out.println("Had Exception: " + ex);
					}
				});
		}
		long end = System.currentTimeMillis();
		
		// stop and clean up the Java SDK
	    cluster.disconnect(Duration.ofSeconds(10));
		
	    // emit some stats my single node non-MDS hits 50K ops.sec.
		System.out.println("exiting duration in seconds " + (end-beg)/1000 + 
				", count " + inputDocCount.get() + 
				", succ " + bucketWrSucc.get() + 
				", fail " + (inputDocCount.get() - bucketWrSucc.get() ) +
				", ops/sec. " + inputDocCount.get() /((end-beg)/1000.0 + 0.000000001) );

    }
}

Upserting 1M docs

exiting duration in seconds 19, count 1000000, succ 1000000, fail 0, ops/sec. 51674.245553344655

Upserting 10M docs

exiting duration in seconds 145, count 10000000, succ 10000000, fail 0, ops/sec. 68717.6597509073

Upserting 50M docs

exiting duration in seconds 729, count 50000000, succ 50000000, fail 0, ops/sec. 68584.84768667618

1 Like

Hi Jon,

Thanks for your response…

Can you help us in getting the below details

What is default publish event size?? How do we increase the publish event count??

Regards,
Suresh

Hi @suresh2112

I am not sure what you are asking here (something about Couchbase or NiFi), my sample code doesn’t care about “publish event size” or a " publish event count" - I’ll try my best though.

For Couchbase every document must be less than or equal to 20MB and the documents KEY less than or equal to 1MB. I don’t know what you mean by “increase the publish event count”. If you have the disk space Couchbase will accept it as long as your residency ration doesn’t drop too low (I like 100% RR but couchbase should work fine at 10% RR).

Maybe you asking about my small test program if so:

  • I just make a lot Json doc in a loop controlled by the variable “i” in the for loop in this case I guess the " publish event count" is 1000000 or 1M. To increase it just put a larger number in like 50000000 or 50M.

  • The “publish event size” would merely be the size of the Json document that the code makes in couchbase it might look like the following: { “id”: 999999, “type”: “tstkey”, “data”: 27865 } with a KEY of “tstkey:999999”. To increase the size of each doc add more fields or as a few large fields.

Maybe you asking about NiFI if so:

  • In NiFi, the messages that you want published will be based off the content of the flow file.

  • If you specify a “Message Demarcator” in the processor properties, then the content of your flow file will be separated based on the demarcator.

  • If you don’t specify a demarcator then it will take the entire content of the flow file and make a single publish call for the whole content.

Thus with demarcator set in NiFi the result is you will sending each separate piece of data as a unique message to Couchbase via an Async upsert call (no need to batch thing because you are async). You just can not have a single document greater than 20MB.

Best

Jon