How to set configuration in AWS Glue using the spark connector?

I’m trying to use the couchbase spark connector inside a glue job, but can’t succed. I get the error:

Exception in User Class: java.lang.NoClassDefFoundError : com/couchbase/client/core/error/InvalidArgumentException

My code is:

import com.amazonaws.services.glue.GlueContext
import com.amazonaws.services.glue.util.GlueArgParser
import com.amazonaws.services.glue.util.Job

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf

import com.couchbase.spark._
import com.couchbase.client.scala.json.JsonObject
import com.couchbase.spark.kv.Get

object GlueApp {
  def main(args: Array[String]): Unit = {
    val jobParams = Seq("JOB_NAME", "CONNECTION_STRING", "USERNAME", "PASSWORD")
    val options = GlueArgParser.getResolvedOptions(args, jobParams.toArray)
    
    val connectionString = options("CONNECTION_STRING")
    val username = options("USERNAME")
    val password = options("PASSWORD")
    
    val conf = new SparkConf()
        .setAll(
         Seq(
            ("spark.couchbase.connectionString", connectionString), 
            ("spark.couchbase.username", username), 
            ("spark.couchbase.password", password)
        ))
        
    val sparkContext = new SparkContext(conf)
    val glueContext = new GlueContext(sparkContext)
    val sparkSession = glueContext.getSparkSession
    
    Job.init(options("JOB_NAME"), glueContext, options.asJava)

    val ids = Seq(Get("645102691_1::coupon_codes"))
    val fromKeyspace = Keyspace(bucket = Some("bucket"))

    sparkContext
      .couchbaseGet(ids, fromKeyspace)
      .collect()
      .foreach(println)

    Job.commit()
  }
}

As far as I understand it, it’s because of wrong arguments. I’m passing only host address as connectionString, username and password.

Dependencies:
spark-connector_2.12-3.2.0.jar
scala-client_2.12-1.2.4.jar

2022-04-13 13:16:23,352 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
java.lang.NoClassDefFoundError: com/couchbase/client/core/error/InvalidArgumentException
	at com.couchbase.spark.config.CouchbaseConnection$.connection$lzycompute(CouchbaseConnection.scala:164)
	at com.couchbase.spark.config.CouchbaseConnection$.connection(CouchbaseConnection.scala:164)
	at com.couchbase.spark.config.CouchbaseConnection$.apply(CouchbaseConnection.scala:166)
	at com.couchbase.spark.kv.GetRDD.getPartitions(GetRDD.scala:66)
	at org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
	at GlueApp$.main(AWSGlueCouchbaseETLJob.scala:41)
	at GlueApp.main(AWSGlueCouchbaseETLJob.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
	at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
	at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
	at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
	at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Caused by: java.lang.ClassNotFoundException: com.couchbase.client.core.error.InvalidArgumentException
	at java.net.URLClassLoader.findClass(URLClassLoader.java:387)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:418)
	at sun.misc.Launcher$AppClassLoader.loadClass(Launcher.java:352)
	at java.lang.ClassLoader.loadClass(ClassLoader.java:351)
	... 25 more
2022-04-13 13:16:23,357 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(73)): Exception in User Class: java.lang.NoClassDefFoundError : com/couchbase/client/core/error/InvalidArgumentException
com.couchbase.spark.config.CouchbaseConnection$.connection$lzycompute(CouchbaseConnection.scala:164)
com.couchbase.spark.config.CouchbaseConnection$.connection(CouchbaseConnection.scala:164)
com.couchbase.spark.config.CouchbaseConnection$.apply(CouchbaseConnection.scala:166)
com.couchbase.spark.kv.GetRDD.getPartitions(GetRDD.scala:66)
org.apache.spark.rdd.RDD.$anonfun$partitions$2(RDD.scala:300)
scala.Option.getOrElse(Option.scala:189)
org.apache.spark.rdd.RDD.partitions(RDD.scala:296)
org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1030)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
org.apache.spark.rdd.RDD.withScope(RDD.scala:414)
org.apache.spark.rdd.RDD.collect(RDD.scala:1029)
GlueApp$.main(AWSGlueCouchbaseETLJob.scala:41)
GlueApp.main(AWSGlueCouchbaseETLJob.scala)
sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
java.lang.reflect.Method.invoke(Method.java:498)
com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)

Couchbase Spark Connector is not tested with Glue Job so cannot say for sure what is going on. One thing obvious that you can try is providing the implicit bucket name in the configuration.
“spark.couchbase.implicitBucket”, "<> "
CouchbaseConnection.scala file has the following code

 def bucketName(cfg: CouchbaseConfig, name: Option[String]): String = {
    name.orElse(cfg.bucketName) match {
      case Some(name) => name
      case None => throw InvalidArgumentException
        .fromMessage("No bucketName provided (neither configured globally, "
          + "nor in the per-command options)")
    }
  }

See if it works !

I added the code and I also changed the couchbase spark connector to 3.1 to match the spark 3.1 support on aws glue.

Now, I get this error… Connection refused, must be policy issue, i think.

2022-04-13 16:09:02,255 ERROR [main] glue.ProcessLauncher (Logging.scala:logError(94)): Exception in User Class
java.lang.reflect.UndeclaredThrowableException
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1748)
	at org.apache.spark.deploy.SparkHadoopUtil.runAsSparkUser(SparkHadoopUtil.scala:61)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.run(CoarseGrainedExecutorBackend.scala:455)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.main(CoarseGrainedExecutorBackend.scala:444)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendPlugin.launch(CoarseGrainedExecutorBackendWrapper.scala:10)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendPlugin.launch$(CoarseGrainedExecutorBackendWrapper.scala:10)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper$$anon$1.launch(CoarseGrainedExecutorBackendWrapper.scala:15)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper.launch(CoarseGrainedExecutorBackendWrapper.scala:19)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper$.main(CoarseGrainedExecutorBackendWrapper.scala:5)
	at org.apache.spark.executor.CoarseGrainedExecutorBackendWrapper.main(CoarseGrainedExecutorBackendWrapper.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke(ProcessLauncher.scala:48)
	at com.amazonaws.services.glue.SparkProcessLauncherPlugin.invoke$(ProcessLauncher.scala:48)
	at com.amazonaws.services.glue.ProcessLauncher$$anon$1.invoke(ProcessLauncher.scala:78)
	at com.amazonaws.services.glue.ProcessLauncher.launch(ProcessLauncher.scala:143)
	at com.amazonaws.services.glue.ProcessLauncher$.main(ProcessLauncher.scala:30)
	at com.amazonaws.services.glue.ProcessLauncher.main(ProcessLauncher.scala)
Caused by: org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:301)
	at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
	at org.apache.spark.rpc.RpcEnv.setupEndpointRefByURI(RpcEnv.scala:101)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$9(CoarseGrainedExecutorBackend.scala:475)
	at scala.runtime.java8.JFunction1$mcVI$sp.apply(JFunction1$mcVI$sp.java:23)
	at scala.collection.TraversableLike$WithFilter.$anonfun$foreach$1(TraversableLike.scala:877)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:876)
	at org.apache.spark.executor.CoarseGrainedExecutorBackend$.$anonfun$run$7(CoarseGrainedExecutorBackend.scala:473)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:62)
	at org.apache.spark.deploy.SparkHadoopUtil$$anon$1.run(SparkHadoopUtil.scala:61)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1730)
	... 19 more
Caused by: java.io.IOException: Failed to connect to /172.31.20.189:46005
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:287)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:218)
	at org.apache.spark.network.client.TransportClientFactory.createClient(TransportClientFactory.java:230)
	at org.apache.spark.rpc.netty.NettyRpcEnv.createClient(NettyRpcEnv.scala:204)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:202)
	at org.apache.spark.rpc.netty.Outbox$$anon$1.call(Outbox.scala:198)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: io.netty.channel.AbstractChannel$AnnotatedConnectException: Connection refused: /172.31.20.189:46005
Caused by: java.net.ConnectException: Connection refused
	at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
	at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:716)
	at io.netty.channel.socket.nio.NioSocketChannel.doFinishConnect(NioSocketChannel.java:330)
	at io.netty.channel.nio.AbstractNioChannel$AbstractNioUnsafe.finishConnect(AbstractNioChannel.java:334)
	at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:702)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:650)
	at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:576)
	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:493)
	at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:989)
	at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
	at java.lang.Thread.run(Thread.java:750)

Just noticed this in your exception Failed to connect to /172.31.20.189:46005 curious if this is the connection string you had provided or totally something else ? the connection string for CB need not include port.

This is the glue jobs ip address, i think (Or something like it). I passed only the ip address of the ec2 instance on which the database is located. I’m refreshing all access policies.

1 Like