System.out.println("===>start");
client.async().insert(doc, ReplicateTo.ONE)
.single()
/*.retryWhen(new Func1<Observable<? extends Throwable>, ? extends Observable<?>>(){
public Observable<?> call(Observable<? extends Throwable> t1){
t1.zipWith(Observable.range(1, 3), new Func2<Throwable, Integer, Throwable>(){
@Override
public Throwable call(Throwable t1, Integer t2) {
return null;
}
})
.flatMap(new Func1<Throwable, Observable<JsonDocument>>(){
@Override
public Observable<JsonDocument> call(Throwable t1) {
//if(t1.getClass())
return client.async().insert(doc, ReplicateTo.ONE);
}
});
return Observable.timer(3, TimeUnit.SECONDS);
}
})*/
.subscribe(new Subscriber<JsonDocument>() {
@Override
public void onCompleted() {
cdl.countDown();
}
@Override
public void onError(Throwable e) {
//returnException.set(e);
errNum.set(1);
cdl.countDown();
System.out.println("onerror===出现异常!" + e.getMessage());
}
@Override
public void onNext(JsonDocument t) {
if(t.cas()>0){
System.out.println("成功插入");
}
returnValue.set(t);
//completeVal.set(1);
//cdl.countDown();
}
});
try{
//cdl.await();
if(!cdl.await(5000, TimeUnit.MILLISECONDS)){
System.out.println("超时; "+errNum.get()+";"+returnValue.get());
}else{
System.out.println("正常"+errNum.get()+";"+returnValue.get());
}
}catch (InterruptedException e) {
Thread.currentThread().interrupt();
//throw new RuntimeException("Interrupted while waiting for subscription to complete.", e);
}
System.out.println("===>end");