Create Bucket failed - need some guidance

I’m trying to simply create a bucket via python SDK ,
far as i search couldn’t find a working example ,
even on SDK documentation page i could find it:
https://docs.couchbase.com/sdk-api/couchbase-python-client/api/couchbase.html#module-couchbase.bucket

Im sure this is the wrong way -
as ‘Cluster’ object has no attribute ‘bucket_create’
but i couldn’t find something else:

    def _ConnectToBucket(self):
        try:
          bucket = self.curr_con.bucket(self.BUCKET_TARGET)
          print("bucket exist")        
        except Exception as e:
          
          test= self.curr_con.bucket_create('new-bucket-name', bucket_type = 'couchbase', ram_quota=994)
          print("here" ,test)

getting:


During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "./main.py", line 74, in <module>
    main()
  File "./main.py", line 47, in main
    name,uuid = obj._CheckRefernece()
  File "/xs/mig1/aaa/xdcr_lib.py", line 38, in _CheckRefernece
  File "/xs/mig1/aaa/xdcr_lib.py", line 97, in _ConnectToBucket
AttributeError: 'Cluster' object has no attribute 'bucket_create'

Hello @BgOKcomp you are right that cluster object doesnt have create bucket

You will have to use bucket manager, Unfortunately the docs is not updated yet, however while we wait on it here is the tests that you can use as reference to get some help

Thanks Arun,

I manage to create a bucket using the link you mention ,
Once call bucket_create -I’m getting an http code 202 ,
I’d like to confirm the bucket created before continue to insert data ,
meaning need to retry requests till get confirmation bucket created.

I find some retry functions which all working with an url based http/s
but in this case i rather use the couchbase bucket method:
Any ideas?

confirm bucket exist:

    def _ConnectToBucket(self):
        try:
          bucket = self.trg_cb_connention.bucket(self.trg_bucket_name)
          print("Bucket:",self.trg_bucket_name, " Exist")
          return True

        except Exception as e:
            print(e)
            return 1

Create bucket:

    def _CreateBucket(self):
          try:
             self.bm = self.trg_cb_connention.buckets()
             response = self.bm.create_bucket(CreateBucketSettings(name=self.trg_bucket_name, bucket_type="couchbase", ram_quota_mb=100))
             if response.http_status == 202: #acepected
                print("accepted:",response)

# here i want to send the request to confirm the bucket created successfully

                self._retry_request(  self.trg_cb_connention.bucket(self.trg_bucket_name))
             else:
                print("to check:",response)
           
                
          except Exception as e:
              print(e)

Hello @BgOKcomp if all you are trying to do is check if the bucket is created you can simply try to get the bucket

BTW the python management API can be found here. see get_bucket

https://docs.couchbase.com/sdk-api/couchbase-python-client/api/management.html

Hope this helps !

@BgOKcomp - to add on to what @AV25242 mentioned, here is a quick snippet that should provide some context. Hope this helps.

from couchbase.cluster import Cluster
from couchbase.management.buckets import CreateBucketSettings
from couchbase.auth import PasswordAuthenticator
import time

db_info = {
    'host': 'couchbase://localhost',
    'secure': False,
    'bucket_name': 'default',
    'username': 'Administrator',
    'password': 'password'
}

def retry(func, *args, back_off=0.5, limit=5, **kwargs):
    for i in range(limit):
        try:
            return func(*args, **kwargs)
        except Exception:
            print('Retry in {} seconds...'.format((i+1)*back_off))
            time.sleep((i+1) * back_off)

    raise Exception('Unable to successfully receive result from {}'.format(func))

def run_sample_code():
    try:
        auth = PasswordAuthenticator(db_info['username'], db_info['password'])
        cluster = Cluster(db_info['host'], authenticator=auth)
        bm = cluster.buckets()
        bm.create_bucket(CreateBucketSettings(name="test", bucket_type="couchbase", ram_quota_mb=100))
        bucket = retry(bm.get_bucket, 'test')
        print(bucket.name)
        
    except Exception as ex:
        import traceback
        traceback.print_exc()

if __name__ == '__main__':
    run_sample_code()

Many Thanks Jared for the sample code , it really helpfull.

I’m having some issue passing the function to retry function ,
see:

class xdcr:
    
    def __init__(self, **kwargs):
        self.__dict__.update(kwargs)
        self.kwargs = kwargs
        .....
        self.trg_cluster = Cluster("http://"+ self._returnIP(self.TARGET_CB_DB)+ ":8091",ClusterOptions(PasswordAuthenticator(self.TARGET_USER,self.TARGET_PASSWORD)))        
        self.trg_bm = self.trg_cluster.buckets()


    def _CreateBucket(self):
          try:
             
             response = self.trg_bm.create_bucket(CreateBucketSettings(name=self.trg_bucket_name, bucket_type="couchbase", ram_quota_mb=100))
             if response.http_status == 202: #acepected
                print("acepected:",response)             
                bucket = self._retry(self.trg_bm.get_bucket,self.trg_bucket_name)
                print(bucket .name)
              #  self._DeleteTrgBucket()
           
             else:
                print("to check:",response)
                
    def _retry( func, *args, back_off=0.5, limit=10, **kwargs):
         for i in range(limit):
            try:
               return func(*args, **kwargs)
            except Exception:
             #  print(str(func))
               print('Retry in {} seconds...'.format((i+1)*back_off))
               time.sleep((i+1) * back_off)
              
         raise Exception('Unable to successfully receive result from {}'.format(func))

it raise an error :


Traceback (most recent call last):
  File "/FSMIG/mig1/Avi/xdcr_lib.py", line 155, in _CreateBucket
    bucket = self._retry(self.trg_bm.get_bucket,self.trg_bucket_name)
  File "/FSMIG/mig1/Avi/xdcr_lib.py", line 183, in _retry
    raise Exception('Unable to successfully receive result from {}'.format(func))
Exception: Unable to successfully receive result from MIG_ilceosp089


I think the retry function doesn’t run the self.trg_bm.get_bucket properly,
I try debug it by print print(str(func)) and it return me other variable value,
When i call it without the retry function:

          try:
             
             response = self.trg_bm.create_bucket(CreateBucketSettings(name=self.trg_bucket_name, bucket_type="couchbase", ram_quota_mb=100))
             print ("here")
             if response.http_status == 202: #acepected
                print("accept to create")
                time.sleep(3)
                try:
                   print("verify bucket",self.trg_bucket_name," created")
# call the get_buckets without the retry
                   bucket = self.trg_bm.get_bucket(self.trg_bucket_name)
                   print("test",bucket.name)
                except Exception as ex:
                    import traceback
                    traceback.print_exc()
               
             else:
                print("to check:")

And it does return the new bucket name.

Hi @BgOKcomp - Are you wanting the _retry() method to be a part of the class you have created? The call to the method seems to be self._retry(…), but the method definition is def _retry(func, …) instead of def _retry(self, func,…). Also, are you able to run the sample code I provided without any errors (be sure to changing the db info, bucket name(s), etc. to match your app) ?

Thanks Jared ,

u right ,I missed the def_retry(self,func…),
adding it solved the problem ,
Actually also run your code - working perfectly , i think it can be reuse for other “wait to…” respond.

@jcasey ,
I’m trying to expand the functionality retry function ,
so it could also wait for specific return code and just success ,
in example ,I added a function for creating a xdcr replica ,
Once created first second the status is NotRunning,
i’d like to confirm the replica become status=‘running’.
I added the function for getting the replica status ,
I have some doubts for how to implement it in retry …

I’d like to send to _retry an optional parameter of retrun value ,
in this case it would be “running”.

    def _GetReplicaStatus(self):
    

        try:
          response = requests.get(self.src_http_path +'/pools/default/tasks', auth=(self.SOURCE_USER, self.SOURCE_PASSWORD))
          response_json = json.loads(response.text)
          for key in response_json:
              if 'source' not in key:
                   continue
              if key['source'] == self.src_bucket_name and key['target'].rsplit('/', 1)[-1] == self.trg_bucket_name :
                 self.replica_status = key['status']
                 return self.replica_status
          return "Not Exist"
        except requests.exceptions.HTTPError as err:
          print(err)  

    def _CreateXDCRReplica(self):        
        data = {'fromBucket': self.src_bucket_name,               
                'toBucket': self.trg_bucket_name,
                 'toCluster': self.cluster_name,
                'replicationType': 'continuous',
                'enableCompression': '1'
                 }

        try:
          response = requests.post(self.src_http_path + "/controller/createReplication", data=data, auth=(self.SOURCE_USER, self.SOURCE_PASSWORD))
          response_json = json.loads(response.text)
          response.raise_for_status()
          status = self._retry(self._GetReplicaStatus )


    def _CreateXDCRReplica(self):
        print("creating replica")
        data = {'fromBucket': self.src_bucket_name,               
                'toBucket': self.trg_bucket_name,
                 'toCluster': self.cluster_name,
                'replicationType': 'continuous',
                'enableCompression': '1'
                 }
        print(data,"\n",self.src_http_path + "/controller/createReplication")

        try:
          response = requests.post(self.src_http_path + "/controller/createReplication", data=data, auth=(self.SOURCE_USER, self.SOURCE_PASSWORD))
          response_json = json.loads(response.text)
          response.raise_for_status()
          status = self._retry(self._GetReplicaStatus )
##  here will handle the  status != 'running'
          
          return [response_json]
        except requests.exceptions.HTTPError as err:
           print(err)
        except requests.exceptions.HTTPError as err:
           print(err)

def _retry( func, *args, back_off=0.5, limit=10, **kwargs):
         for i in range(limit):
            try:
               return func(*args, **kwargs)
            except Exception:
             #  print(str(func))
               print('Retry in {} seconds...'.format((i+1)*back_off))
               time.sleep((i+1) * back_off)
              
         raise Exception('Unable to successfully receive result from {}'.format(func))