Multithread in Python SDK for Query N1QL

I want to use multithread to run N1QL query and encountered wired problem.
(lockmode=LockMode.WAIT)

  1. When each thread has its own Cluster object, StopIteration or Segmentation fault will occur soon.
  2. When all threads shared a single Cluster object, all are fine. But I found that there are only one db connection to Couchbase Server.
  3. All are fine when using multi-process

What’s wrong with each thread has its own Cluster object?
And what should I do to increase the db connection under multithread scenario?

My Code: https://gist.github.com/0x0400/0898047e13cc345d8ec275cebaf239a5

OS: Ubuntu 20.04.1 LTS
Python: 3.8.5
SDK: couchbase==3.2.6

Hi @JerryZhang - Have you tried using LockMode.NONE? If you are NOT sharing the cluster object across threads, that option should work.

Hi, @jcasey I tried using LockMode.NONE and it doesn’t work. StopIteration or Segmentation fault will occur soon.

Hi @JerryZhang - I will try to look into the sample code you provided a bit more before the end of the week. However, after a quick couple of runs, I did not see any exceptions. What do you mean by “will occur soon”? Also, so are some example inputs you provided to the sample code (num threads, num queries, etc.)?

Do you have any exception outputs with tracebacks you can provide?

@jcasey For example, I executed the script with 4 threads to run N1QL 1000 times.
Console will show show output like below

$ python test_n1ql.py 4 1000 1
139773822669104
139773820009824
139773820009104
139773820009584
sql => SELECT * FROM `travel-sample`.inventory.airport WHERE airportname = $1 LIMIT 10, args => Calais Dunkerque
Traceback (most recent call last):
  File "test_n1ql.py", line 40, in run_query_loop
    for row in result:
  File "/home/xxx/tmp/cb_venv/lib/python3.8/site-packages/couchbase_core/__init__.py", line 281, in __iter__
    next_item = next(parent_iter)
SystemError: <built-in function next> returned NULL without setting an error


$ python test_n1ql.py 4 1000 1
139632115597616
139632112937616
139632112938096
139632112938336
sql => SELECT * FROM `travel-sample`.inventory.airport WHERE airportname = $1 LIMIT 10, args => hjk
StopIteration

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "test_n1ql.py", line 39, in run_query_loop
    result = cb.query(sql, QueryOptions(metrics=True, positional_parameters=(airportname,)))
  File "/home/xxx/tmp/cb_venv/lib/python3.8/site-packages/couchbase/cluster.py", line 759, in query
    opt.to_query_object(statement, *opts, **kwargs),
SystemError: PyEval_EvalFrameEx returned a result with an error set
future result ===>  success
future error ===>  PyEval_EvalFrameEx returned a result with an error set
future result ===>  success
future result ===>  success
main end:  6.533238172531128

Hi @JerryZhang - Thanks for the feedback. I am able to reproduce the issue and have created a JIRA ticket to track the issue.

Maybe you can use an async approach (i.e. Python’s asyncio) instead of a multi-threaded approach? You can use the acouchbase API which is based on asyncio. Docs can be found here.

It’s a hard work to transfer from multi-thread to asyncio on our Flask-based services :sweat_smile:

I tried acouchbase async API and found that a NotImplementedError will be raised sometimes.

Traceback (most recent call last):
  File "test_async.py", line 41, in <module>
    loop.run_until_complete(n1ql_query(cluster))
  File "/usr/lib/python3.8/asyncio/base_events.py", line 616, in run_until_complete
    return future.result()
  File "test_async.py", line 27, in n1ql_query
    result = cluster.query(
  File "/home/ubuntu/cb_bench/venv/lib/python3.8/site-packages/acouchbase/cluster.py", line 267, in query
    return super(ACluster, self).query(*args, **kwargs)
  File "/home/ubuntu/cb_bench/venv/lib/python3.8/site-packages/couchbase/cluster.py", line 756, in query
    return self._maybe_operate_on_an_open_bucket(
  File "/home/ubuntu/cb_bench/venv/lib/python3.8/site-packages/couchbase/cluster.py", line 774, in _maybe_operate_on_an_open_bucket
    if self._is_6_5_plus():
  File "/home/ubuntu/cb_bench/venv/lib/python3.8/site-packages/couchbase/cluster.py", line 706, in _is_6_5_plus
    raise NotImplementedError(
NotImplementedError: Cannot execute synchronous HTTP request with asynchronous Admin cluster.

My code:

from couchbase.exceptions import (
    CouchbaseException, ParsingFailedException)


from acouchbase.cluster import get_event_loop

from acouchbase.cluster import Cluster
from couchbase.cluster import ClusterOptions
from couchbase.auth import PasswordAuthenticator


async def get_couchbase():
    uri = "couchbase://localhost"
    user = "YourUsername"
    password = "YourPassword"
    
    cluster = Cluster(
        uri,
        ClusterOptions(PasswordAuthenticator(user, password)))
    bucket = cluster.bucket("travel-sample")
    await bucket.on_connect()

    return cluster, bucket


async def n1ql_query(cluster):
    try:
        result = cluster.query(
            "SELECT * FROM `travel-sample` LIMIT 10")

        async for row in result:
            print("Found row: {}".format(row))
    except ParsingFailedException as ex:
        print(ex)
    except CouchbaseException as ex:
        print(ex)



loop = get_event_loop()
cluster, _ = loop.run_until_complete(get_couchbase())
loop.run_until_complete(n1ql_query(cluster))

Hi @JerryZhang - Sorry, I meant to get you a reply quicker. I have tweaked the sample code you provided to demonstrate a way to integrate asyncio with threading. The basic idea is that you create a separate thread that will run the event loop and when you want to execute async code, you can pass async coroutines to that thread using asyncio.run_coroutine_threadsafe(). You can also take another route of passing the event loop to threads and use loop.run_until_complete(), but that will be slower as all the calls would need to be blocking and you would also need to keep track of a lock to avoid race conditions.

I hope this helps.

import asyncio
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
import time
import traceback
import threading


from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import ClusterOptions, QueryOptions
from acouchbase.cluster import Cluster, get_event_loop


class CouchbaseConfig:
    uri = "couchbase://localhost"
    user = "Administrator"
    password = "password"
    cert_path = None


async def get_couchbase():
    cb = Cluster(CouchbaseConfig.uri, ClusterOptions(
        PasswordAuthenticator(
            CouchbaseConfig.user, CouchbaseConfig.password, CouchbaseConfig.cert_path)
    )
    )

    await cb.on_connect()

    return cb


async def run_query_loop(sql, cnt=1000):
    cb = await get_couchbase()
    airportnames = ["Calais Dunkerque", "Les Loges", "Bray", "Glisy", "Cazaux"]
    for i in range(cnt):
        try:
            airportname = random.choice(airportnames)
            result = cb.query(sql, QueryOptions(
                metrics=True, positional_parameters=(airportname,)))
            num_results = len([r async for r in result])
            if cnt < 100:
                print(f"got {num_results} results")
        except:
            print(f"sql => {sql}, args => {airportname}")
            traceback.print_exc()
            traceback.print_stack()
            raise
    return "success"


def run_loop(loop):
    asyncio.set_event_loop(loop)
    loop.run_forever()


def run_query_in_thread(sql, count, loop):
    return asyncio.run_coroutine_threadsafe(run_query_loop(sql, count), loop)


def main():
    loop = get_event_loop()
    loop_thread = threading.Thread(target=lambda: run_loop(loop))
    loop_thread.start()

    bt = time.time()
    workers = 10
    loop_count = 1000
    futs = []
    sql = 'SELECT * FROM `travel-sample`.inventory.airport WHERE airportname = $1 LIMIT 10'
    with ThreadPoolExecutor(workers) as pool:
        for _ in range(workers):
            f = pool.submit(run_query_in_thread, sql, loop_count, loop)
            # the future pool.submit returns wraps the future
            # asyncio.run_coroutine_threadsafe() returns
            # so, should be okay to use .result() on the pool.submit's future
            # as it should have a result (another future) quickly
            # but, might want to verify...
            futs.append(f.result())

    for fut in as_completed(futs):
        exc = fut.exception()
        if exc:
            print("future error ===> ", exc)
        else:
            print("future result ===> ", fut.result())

    print("main end: ", time.time() - bt)

    loop.call_soon_threadsafe(loop.stop)
    loop_thread.join()


if __name__ == "__main__":
    main()

1 Like

Hi @JerryZhang - Trying to keep up with follow-up on the forums! We have release the 4.0 SDK and I tested the script originally provided against the new version w/o any issues. While the original script should work, I have provided a slightly modified version below that is more representative of what you can do w/ 4.0 (namely, lockmode is not needed).

import argparse
import random
from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor, as_completed
import time
import traceback


from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.options import ClusterOptions, QueryOptions


class CouchbaseConfig:
    uri = "couchbase://localhost"
    user = "YourUsername"
    password = "YourPassword"
    cert_path = None


sql = 'SELECT * FROM `travel-sample`.inventory.airport WHERE airportname = $1 LIMIT 10'


def get_couchbase():
    cb = Cluster(
        CouchbaseConfig.uri,
        ClusterOptions(
            PasswordAuthenticator(
                CouchbaseConfig.user,
                CouchbaseConfig.password,
                CouchbaseConfig.cert_path),
        ))

    return cb


cb = get_couchbase()


def run_query_loop(cnt=1000):
    print(f'Using Couchbase cluster id: {id(cb)}')
    airportnames = [
        "Calais Dunkerque",
        "Les Loges",
        "Bray",
        "abc",
        "efg",
        "hjk"]
    for _ in range(cnt):
        try:
            airportname = random.choice(airportnames)
            result = cb.query(
                sql, QueryOptions(
                    metrics=True, positional_parameters=(
                        airportname,)))
            rows = result.execute()
            # print(f'Found {len(rows)} in {result.metadata().metrics().execution_time()}')
        except BaseException:
            print(f"sql => {sql}, args => {airportname}")
            traceback.print_exc()
            raise
    return "success"


def get_args():
    parser = argparse.ArgumentParser()
    parser.add_argument(
        'workers',
        metavar='int',
        type=int,
        help='number of thread/prcoess')
    parser.add_argument(
        'q_count',
        metavar='int',
        type=int,
        help='run n1ql query times')
    parser.add_argument(
        'use_thread',
        metavar='int',
        type=int,
        default=0,
        help='use thread to run task')
    return parser.parse_args()


if __name__ == '__main__':
    args = get_args()
    bt = time.time()
    futs = []
    if args.use_thread:
        with ThreadPoolExecutor(args.workers) as pool:
            for _ in range(args.workers):
                futs.append(pool.submit(run_query_loop, args.q_count))
    else:
        with ProcessPoolExecutor(args.workers) as pool:
            for _ in range(args.workers):
                futs.append(pool.submit(run_query_loop, args.q_count))

    for fut in as_completed(futs):
        exc = fut.exception()
        if exc:
            print("future error ===> ", exc)
        else:
            print("future result ===> ", fut.result())

    print("main end: ", time.time() - bt)

    # run example:
    # python test_n1ql.py 4 1000 1