Async Bulk Create, Update with incremental retry not working as exptected

I have been struggling to create a retry mechanism for my bulk update function.
I want my bulk insert to continue if any single insert fails without returning error right away
and retry in case of some temporal failures based on retry strategy.
But the code is not ignoring error nor retrying.
This is my sample code

public Map<String, JsonDocument> bulkCreate(List<JsonDocument> documents, PersistTo persistTo, ReplicateTo replicateTo) {
    return Observable
            .from(documents)
            .flatMap(doc -> Observable.defer(() -> asyncBucket
                    .insert(doc, persistTo, replicateTo)
                    .onErrorResumeNext(new IgnoreExceptionStrategy())
                    .retryWhen(retryStrategy())
                    .onErrorResumeNext(new LastResortErrorHandler())))
            .toMap(JsonDocument::id, d -> d)
            .toBlocking()
            .single();
} 


private static  class LastResortErrorHandler implements Func1<Throwable, Observable<? extends JsonDocument>> {
    @Override
    public Observable<? extends JsonDocument> call(Throwable throwable) {
        System.err.println("Could not complete flow because of: " + throwable.getMessage());
        throwable.printStackTrace();
        return Observable.empty();
    }
}    


private static class IgnoreExceptionStrategy implements Func1<Throwable, Observable<? extends JsonDocument>> {
    @Override
    public Observable<? extends JsonDocument> call(Throwable error) {
        if (error instanceof CASMismatchException || error instanceof DocumentAlreadyExistsException) {
            return Observable.empty();
        }
        return Observable.error(error);
    }
}
 private static RetryWhenFunction retryStrategy() {
    return RetryBuilder
            .anyOf(TemporaryFailureException.class, RequestCancelledException.class, BackpressureException.class)
            .delay(Delay.exponential(TimeUnit.MILLISECONDS, 100))
            .max(3)
            .build();
}

With the same logic, bulkGet works as expected.

public Map<String, JsonDocument> bulkGetStringJsonDocumentMap(List<String> ids) {
    return Observable.from(ids)
            .flatMap(doc -> Observable.defer(() -> asyncBucket.get(doc))
                    .onErrorResumeNext(new IgnoreExceptionStrategy())
                    .retryWhen(retryStrategy())
                    .onErrorResumeNext(new LastResortErrorHandler())
            )
            .toMap(JsonDocument::id, d -> d)
            .toBlocking()
            .single();
}

I don’t want to full retry by moving onErrorResumeNext() and retryWhen() outside. Is there any workaround for this?