Hey guys,
I’ve been struggling to get my simple C++ wrapper around libcouchbase to work consistently. I’m using libcouchbase-2.3.1_x86_vc11 against a “2.5.1 enterprise edition (build-1083)” server. At the bottom of this post is my example code built with VC11. It just wraps libcouchbase using the default IOPS and calls run_event_loop
from a std::thread
. Get’s and Store’s can then be scheduled with callbacks to get the results.
However, when running the example either I get an access violation somewhere inside libcouchbase (usually at different places) or after about 50 loops the callbacks stop getting called altogether but the application keeps running and continually eats up more and more memory. (I assume the read/write buffers are going as more Get’s and Store’s are scheduled)
Because of the access violations I assume that some sort of race condition is occurring. Am I supposed to be using some kind of locking\sync mechanism that I’m unaware of?
Is it safe for me to start the event loop this way (see bottom of CouchbaseCache constructor)?
Any help will be greatly appreciated.
The below code listing can be copied pasted directly and should work provided you have the libcouchase include and lib paths set correctly.
#include
#include
#include
#include
#include
#include
#include “libcouchbase\couchbase.h”
class CouchbaseCache
{
public:
/// Creates a connection to a couchbase cluster.
/// \param host The ip and port of a host in the cluster. eg “w.x.y.z:8080”.
/// \param bucket The bucket which will be used to store and retrieve documents.
/// \param username The username required to login to the couchbase cluster.
/// \param password The password required to login to the couchbase cluster.
/// \param errorHandler A handler that will be called for errors not related
/// to get or store events.
CouchbaseCache(const std::string& host, const std::string& bucket,
const std::string& username, const std::string& password,
const std::function<void(const std::string&)>& errorHandler);
~CouchbaseCache();
enum class StoreMode
{
Add = 1, /// Add the item to the cache, but fail if the object already exists.
Replace = 2, /// Replace the existing object in the cache.
Set = 3, /// Unconditionally set the object in the cache.
};
/// Stores the specified data in couchbase at the specified key.
/// Use the StoreMode to to set the storage conditions.
void Store(const std::string& key, const std::string& value, StoreMode,
const std::function<void()>& successHandler,
const std::function<void(const std::string& errorMessage)>& failureHandler);
/// Returns the value for the specified key if one exists.
void Get(const std::string& key,
const std::function<void(const std::string& value)>& successHandler,
const std::function<void(const std::string& errorMessage)>& failureHandler);
private:
std::function<void(const std::string&)> m_errorHandler;
std::unique_ptr<lcb_io_opt_st, std::function<void(lcb_io_opt_st*)>> m_io;
std::unique_ptr<lcb_create_st> m_connectionOptions;
std::unique_ptr<lcb_st, std::function<void(lcb_st*)>> m_connection;
std::thread m_eventThread;
};
struct StoreHandlers
{
std::function<void()> successHandler;
std::function<void(const std::string& errorMessage)> failureHandler;
};
struct GetHandlers
{
std::function<void(const std::string& value)> successHandler;
std::function<void(const std::string& errorMessage)> failureHandler;
};
std::string CreateErrorMessage(lcb_st* connection, const char* errorPrefix, lcb_error_t error)
{
std::ostringstream message;
message << errorPrefix << “: (” << error << ") " << lcb_strerror(connection, error);
return message.str();
}
void ErrorCallback(lcb_t connection, lcb_error_t error, const char* errorInfo)
{
auto cookie = lcb_get_cookie(connection);
if (!cookie)
{
return;
}
auto errorHandler = static_cast<const std::function<void(const std::string&)>*>(cookie);
if (!errorHandler)
{
return;
}
std::ostringstream errorMessage;
errorMessage << “General failure (” << error << “):”;
if (errorInfo)
{
errorMessage << " " << errorInfo;
}
(*errorHandler)(errorMessage.str());
}
void StoreCallback(lcb_t connection, const void cookie,
lcb_storage_t operation, lcb_error_t error, const lcb_store_resp_t resp)
{
std::unique_ptr storeHandlers(static_cast<StoreHandlers>(const_cast<void>(cookie)));
if (error == LCB_SUCCESS && storeHandlers->successHandler)
{
storeHandlers->successHandler();
}
else if (storeHandlers->failureHandler)
{
storeHandlers->failureHandler(CreateErrorMessage(connection, “Store failure”, error));
}
}
void GetCallback(lcb_t connection, const void cookie, lcb_error_t error, const lcb_get_resp_t item)
{
std::unique_ptr getHandlers(static_cast<GetHandlers>(const_cast<void>(cookie)));
if (error == LCB_SUCCESS && getHandlers->successHandler)
{
std::string returnVal(static_cast<const char*>(item->v.v0.bytes), item->v.v0.nbytes);
getHandlers->successHandler(returnVal);
}
else if (getHandlers->failureHandler)
{
getHandlers->failureHandler(CreateErrorMessage(connection, “Get failure”, error));
}
}
lcb_io_opt_st* CreateIo()
{
lcb_io_opt_st* io;
auto error = lcb_create_io_ops(&io, nullptr);
if (error != LCB_SUCCESS)
{
throw std::runtime_error(CreateErrorMessage(nullptr, “Failed to create couchbase IO backend”, error));
}
return io;
}
lcb_st* CreateConnection(lcb_create_st* connectionOptions)
{
lcb_st* connection;
auto error = lcb_create(&connection, connectionOptions);
if (error != LCB_SUCCESS)
{
throw std::runtime_error(CreateErrorMessage(nullptr, “Failed to create couchbase connection object”, error));
}
return connection;
}
CouchbaseCache::CouchbaseCache(const std::string& ipPort, const std::string& bucket,
const std::string& username, const std::string& password,
const std::function<void(const std::string&)>& errorHandler)
: m_errorHandler(errorHandler),
m_io(CreateIo(), [](lcb_io_opt_st* io) { if (io) lcb_destroy_io_ops(io); }),
m_connectionOptions(new lcb_create_st),
m_connection(nullptr, [](lcb_st* connection) { if (connection) lcb_destroy(connection); })
{
m_connectionOptions->version = 0;
m_connectionOptions->v.v0.host = ipPort.c_str();
m_connectionOptions->v.v0.bucket = bucket.c_str();
m_connectionOptions->v.v0.user = username.c_str();
m_connectionOptions->v.v0.passwd = password.c_str();
m_connectionOptions->v.v0.io = m_io.get(); // Safe because m_io always gets deleted last.
m_connection.reset(CreateConnection(m_connectionOptions.get()));
lcb_set_cookie(m_connection.get(), &m_errorHandler);
lcb_set_error_callback(m_connection.get(), ErrorCallback);
lcb_set_store_callback(m_connection.get(), StoreCallback);
lcb_set_get_callback(m_connection.get(), GetCallback);
auto error = lcb_connect(m_connection.get());
if (error != LCB_SUCCESS)
{
throw std::runtime_error(CreateErrorMessage(m_connection.get(), “Failed to schedule connection to couchbase”, error));
}
error = lcb_wait(m_connection.get()); // Wait until connection is complete before leaving the constructor.
if (error != LCB_SUCCESS)
{
throw std::runtime_error(CreateErrorMessage(m_connection.get(), “Failed to connect to couchbase”, error));
}
m_eventThread = std::thread(=
{
m_io->v.v0.run_event_loop(m_io.get());
});
}
CouchbaseCache::~CouchbaseCache()
{
m_io->v.v0.stop_event_loop(m_io.get());
m_eventThread.join();
lcb_wait(m_connection.get()); // Process any events the loop didn’t get to.
}
void CouchbaseCache::Store(const std::string& key, const std::string& value, CouchbaseCache::StoreMode mode,
const std::function<void()>& successHandler,
const std::function<void(const std::string& errorMessage)>& failureHandler)
{
lcb_store_cmd_t cmd;
const lcb_store_cmd_t *cmds[] = {&cmd};
memset(&cmd, 0, sizeof(cmd));
cmd.version = 0;
cmd.v.v0.key = key.c_str();
cmd.v.v0.nkey = key.size();
cmd.v.v0.bytes = value.c_str();
cmd.v.v0.nbytes = value.size();
cmd.v.v0.operation = static_cast<lcb_storage_t>(mode);
std::unique_ptr storeHandlers(new StoreHandlers);
storeHandlers->successHandler = successHandler;
storeHandlers->failureHandler = failureHandler;
auto error = lcb_store(m_connection.get(), storeHandlers.release(), 1, cmds);
if (error != LCB_SUCCESS)
{
std::ostringstream errorMessage;
errorMessage << "Failed to schedule store operation for key " << key;
failureHandler(CreateErrorMessage(m_connection.get(), errorMessage.str().c_str(), error));
}
}
void CouchbaseCache::Get(const std::string& key,
const std::function<void(const std::string& value)>& successHandler,
const std::function<void(const std::string& errorMessage)>& failureHandler)
{
lcb_get_cmd_t cmd;
const lcb_get_cmd_t *cmds[] = {&cmd};
cmd.version = 0;
cmd.v.v0.key = key.c_str();
cmd.v.v0.nkey = key.size();
std::unique_ptr getHandlers(new GetHandlers);
getHandlers->successHandler = successHandler;
getHandlers->failureHandler = failureHandler;
auto error = lcb_get(m_connection.get(), getHandlers.release(), 1, cmds);
if (error != LCB_SUCCESS)
{
std::ostringstream errorMessage;
errorMessage << "Failed to schedule get operation for key " << key;
failureHandler(CreateErrorMessage(m_connection.get(), errorMessage.str().c_str(), error));
}
}
int main()
{
try
{
CouchbaseCache cache(“http://dercachetier01:8091”, “default”, “default”, “”,
[](const std::string& errorMessage)
{
auto x = errorMessage;
});
int loopCount = 0;
std::string key = "Test1";
std::string value = "SomeData";
while (true)
{
cache.Store(key, value, CouchbaseCache::StoreMode::Set,
[]()
{
std::cout << "Store success" << std::endl;
},
[](const std::string& errorMessage)
{
std::cout << "Store failure " << errorMessage << std::endl;
});
loopCount++;
std::cout << loopCount << std::endl;
std::this_thread::sleep_for(std::chrono::milliseconds(200));
cache.Get(key,
[](const std::string& val)
{
std::cout << "Get success" << std::endl;
},
[](const std::string& errorMessage)
{
std::cout << "Get failure " << errorMessage << std::endl;
});
}
}
catch (const std::exception& e)
{
std::cout << e.what() << std::endl;
}
}