Is it possible to execute MERGE INTO query statements using the Python SDK?

Hi All,

I have a MERGE INTO ... WHEN MATCHED THEN UPDATE SET ... WHEN NOT MATCHED THEN INSERT-style query, that works if I execute it using the couchbase web UI. Although, if I execute the same query using the python SDK, the inserted documents don’t appear in the document lookup page of the UI.

My script looks like this:

from couchbase.cluster import Cluster, ClusterOptions
from couchbase_core.cluster import PasswordAuthenticator
from some.internal.lib import (
    create_my_connection_string,
    get_my_merge_into_query_statement
)
from couchbase_core import __version__
print(__version__)  # 3.0.10

connection_string = create_my_connection_string()
auth = PasswordAuthenticator(self._username, self._password)
cluster  = Cluster(connection_string, ClusterOptions(auth))

query_statement = get_my_merge_into_query_statement()
cluster.query(query_statement)

I can see that in order to update or insert documents with the Python SDK, one should use the functions couchbase.cluster.Cluster.insert or couchbase.cluster.Cluster.upsert, but what I want to do can be done very naturally with a MERGE INTO statement, which AFAIK I can only execute using couchbase.cluster.Cluster.query.

Is there a solution to the problem above?

Found the solution. There is a lazy-evaluation-like behaviour.

I just have to iterate through the query results.
Something like this:

res = cluster.query(some_query_statement) # this doesn't do anything.
res.rows() # this line actually posts the query HTTP request and evaluates the query.
# the console log after this line:
# 6699ms [Idc6b6e608960ef0e] {93549} [TRACE] (http-io - L:390) <localhost:8093> POST  http://localhost:8093/query/service. Body=279 bytes

Hello @mtannerman great and thanks for posting back your response

Hi @mtannerman - another approach could be using the .execute() method which acts as pass/fail. The following sample code should provide the necessary info. Hope this helps.

from couchbase.cluster import Cluster, ClusterOptions, QueryOptions
from couchbase.auth import PasswordAuthenticator
from couchbase.exceptions import CouchbaseException
import traceback

db_info = {
    'host': 'couchbase://localhost',
    'secure': False,
    'bucket_name': 'beer-sample',
    'username': 'Administrator',
    'password': 'password'
}

def run_sample_code():
    try:
        auth = PasswordAuthenticator(db_info['username'], db_info['password'])
        cluster = Cluster(db_info['host'], authenticator=auth)

        query_str = """
UPDATE `beer-sample` b0
SET beers = (
    SELECT RAW b3.name
    FROM `beer-sample` b1 USE KEYS META(b0).id
        JOIN (
        SELECT b2.name,
               b2.brewery_id
        FROM `beer-sample` b2
        WHERE b2.type='beer') AS b3 ON b3.brewery_id = META(b1).id)
WHERE b0.type='brewery'
AND META(b0).id = '21st_amendment_brewery_cafe'
        """

        cluster.query(query_str).execute()
        
    except CouchbaseException as ex:
        traceback.print_exc()
    except Exception as ex:
        traceback.print_exc()


if __name__ == '__main__':
    run_sample_code()