Hadoop Connector Does Not Work After Upgrading to 5.0


#1

Hi,

We recently updated our cluster from v4.0 to v5.0. After that we found the sqoop connector does not work anymore. It worked right before we did the upgrade. After that, it still runs without any error messages, however it always outputs INFO mapreduce.ImportJobBase: Retrieved 0 records.

I checked the documentation, it seems the hadoop connector has no change between v4.0 and v5.0. Can someone help me with this?


#2

I suspect a problem with authentication, as that was one of the big changes from 4.x to 5.x. Was this a rebalance upgrade? If so, can you confirm that the 5.x cluster has a user created which has the same name as the bucket?


#3

Yes, it is automatically created when the cluster got upgraded. The user name has no password - because our real time system is still using old version of Java SDK to connect to it, cannot use a password.


#4

Is your hadoop connector set up to use a password? It shouldn’t be as well to work.


#5

No, it was not using any password. The connector was working fine just before we did the upgrade.


#6

@ingenthr any other reason you can think of?


#7

Nothing I can think of. I’d probably recommend turning up log levels to see what’s happening. If you have that and don’t spot anything, I can have a quick look if you can post it somewhere I can get to it. Since it’s sort of “fast failing”, if you get a wireshark/tshark output of all of the client-to-server ports (see the docs) from where the connector is running, I could have a look at that as well, as long as you’re comfortable with it.

Related: the Hadoop Connector isn’t receiving many updates, as sqoop itself isn’t the ideal interface today to build this. You might take advantage of this by moving to the Kafka Connector or Spark Connector. Both of those use a more modern (DCP) interface to fetching items. Depending on how you run hadoop, maybe you have spark handy in the cluster already.


#8

Thanks! will try to run it use more logging.

I would like to use spark connector, however, if you check the old posts from me, you can see I had some issues with it. One being the number of records returned is not consistent, also the performance is slow. Anyway I would like to try the newer version of it.


#9

I had a quick look. It looks like the key difference there is you were using the Spark SQL interface and then hitting timeouts owing to the long running job, not the Spark Streaming interface. Spark Streaming is closer to how the Hadoop Connector works. The Hadoop Connector uses TAP, and the Spark Streaming interface uses DCP. DCP is a modernized version of the TAP interface.

That’s not to say what you were doing there was incorrect, but if your selectivity is low (e.g., most of the dataset will be transferred), it will probably run better with Spark Streaming and filtering at that level. Setting up a stream is pretty easy, and then adding filtering and output requires a bit more work but is much more flexible than the Hadoop connector…


#10

Thanks for your tip @ingenthr! Actually when I use sqoop (or spark SQL at the very beginning), my use case is to create a dump of all key-value pairs from my bucket to HDFS, without putting much pressure on the data service (does not impact the lookup performance). So far the sqoop approach has been working fine till the upgrade.

Do you have any suggestions on this user case? Maybe some way other than sqoop.


#11

If it’s all of the data, and you have Spark already, I’d probably use that. Should be pretty straightforward. Let @daschl or @graham.pople or I know if you hit any issues!


#12

Thanks, will try that out.


#13

Hi, @daschl, @graham.pople,

Could you please help me on this? I’m trying to use the Spark Connector (spark-connector_2.11, version 2.1.0) to get a full dump of all my documents from a bucket and save them to an HDFS location. I was using sqoop to perform this action, however, that stopped working after we updated couchbase server from 4.0.1 to 5.0.

My spark code is very simple:

import org.apache.spark.sql.SparkSession
import com.couchbase.client.java.query.N1qlQuery
import com.couchbase.spark._

object SparkCouchbaseConnectorTest {

def main(args: Array[String]): Unit = {

    val spark = SparkSession
      .builder()
      .appName("Spark Couchbase Connector Test")
      .config("spark.couchbase.nodes", "my.server.name.com")
      .config("spark.couchbase.bucket.metrics-metadata", "")
      .getOrCreate()

    val sc = spark.sparkContext

    val query = "SELECT * FROM `metrics-metadata`"
    sc
      .couchbaseQuery(N1qlQuery.simple(query))
      .map(_.value.toString)
      .saveAsTextFile("viewfs://cluster5/nameservices/beaconstore/allkeys")
}
}

This spark application runs without any error message, and completes successfully, however, nothing was read from couchbase and written to HDFS, except a 0-length part file. Here’s the relavent part of the log:

18/05/27 11:25:11 INFO spark.SparkContext: Starting job: saveAsTextFile at SparkCouchbaseConnectorTest.scala:30
18/05/27 11:25:11 INFO scheduler.DAGScheduler: Got job 0 (saveAsTextFile at SparkCouchbaseConnectorTest.scala:30) with 1 output partitions
18/05/27 11:25:11 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (saveAsTextFile at SparkCouchbaseConnectorTest.scala:30)
18/05/27 11:25:11 INFO scheduler.DAGScheduler: Parents of final stage: List()
18/05/27 11:25:11 INFO scheduler.DAGScheduler: Missing parents: List()
18/05/27 11:25:11 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17c17.server.hulu.com:60366 with 4.5 GB RAM, BlockManagerId(19, lashadoop-17c17.server.hulu.com, 60366, None)
18/05/27 11:25:11 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[2] at saveAsTextFile at SparkCouchbaseConnectorTest.scala:30), which has no missing parents
18/05/27 11:25:11 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17j15.server.hulu.com:35903 with 4.5 GB RAM, BlockManagerId(41, lashadoop-17j15.server.hulu.com, 35903, None)
18/05/27 11:25:11 INFO memory.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 93.1 KB, free 3.3 GB)
18/05/27 11:25:11 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-16h36.server.hulu.com:37738 with 4.5 GB RAM, BlockManagerId(47, lashadoop-16h36.server.hulu.com, 37738, None)
18/05/27 11:25:12 INFO memory.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 32.6 KB, free 3.3 GB)
18/05/27 11:25:12 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 10.88.165.23:54014 (size: 32.6 KB, free: 3.3 GB)
18/05/27 11:25:12 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1037
18/05/27 11:25:12 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 0 (MapPartitionsRDD[2] at saveAsTextFile at SparkCouchbaseConnectorTest.scala:30)
18/05/27 11:25:12 INFO cluster.YarnClusterScheduler: Adding task set 0.0 with 1 tasks
18/05/27 11:25:12 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, lashadoop-17d21.server.hulu.com, executor 9, partition 0, PROCESS_LOCAL, 5765 bytes)
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.88.179.11:47201) with ID 6
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Tag executor 6 as REGISTERED
18/05/27 11:25:13 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17g21.server.hulu.com:37516 with 4.5 GB RAM, BlockManagerId(6, lashadoop-17g21.server.hulu.com, 37516, None)
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.88.165.20:41899) with ID 17
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Tag executor 17 as REGISTERED
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.88.161.20:35778) with ID 11
18/05/27 11:25:13 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Tag executor 11 as REGISTERED
18/05/27 11:25:14 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17a12.server.hulu.com:55673 with 4.5 GB RAM, BlockManagerId(17, lashadoop-17a12.server.hulu.com, 55673, None)
18/05/27 11:25:14 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17c12.server.hulu.com:53518 with 4.5 GB RAM, BlockManagerId(11, lashadoop-17c12.server.hulu.com, 53518, None)
18/05/27 11:25:14 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on lashadoop-17d21.server.hulu.com:37260 (size: 32.6 KB, free: 4.5 GB)
18/05/27 11:25:21 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Registered executor NettyRpcEndpointRef(null) (10.88.203.21:43467) with ID 14
18/05/27 11:25:21 INFO cluster.YarnSchedulerBackend$YarnDriverEndpoint: Tag executor 14 as REGISTERED
18/05/27 11:25:21 INFO storage.BlockManagerMasterEndpoint: Registering block manager lashadoop-17j12.server.hulu.com:46691 with 4.5 GB RAM, BlockManagerId(14, lashadoop-17j12.server.hulu.com, 46691, None)
18/05/27 11:25:23 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 11871 ms on lashadoop-17d21.server.hulu.com (executor 9) (1/1)
18/05/27 11:25:23 INFO cluster.YarnClusterScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
18/05/27 11:25:23 INFO scheduler.DAGScheduler: ResultStage 0 (saveAsTextFile at SparkCouchbaseConnectorTest.scala:30) finished in 11.890 s
18/05/27 11:25:23 INFO scheduler.DAGScheduler: Job 0 finished: saveAsTextFile at SparkCouchbaseConnectorTest.scala:30, took 12.098759 s
18/05/27 11:25:23 INFO yarn.ApplicationMaster: Final app status: SUCCEEDED, exitCode: 0
18/05/27 11:25:23 INFO spark.SparkContext: Invoking stop() from shutdown hook

Thanks!


#14

Hi @hclc. Just to check the basics are working:

  1. What happens if you run the query in the UI Query tab?
  2. If you remove the saveAsTextFile and replace it with a .foreach(v -> println(v)) or similar, do you see anything?

#15

Thanks for replying. I didn’t try the two things you suggested, because the data set I’m retrieving has over 500 million key value pairs. That’s why I have to save them into HDFS file.


#16

BTW, seems like spark connector and sqoop are behaving consistently here: both run successfully without any error messages, but both retrieved zero records.


#17

Hi, @graham.pople, @daschl,

Any update on my question? Basically I was just looking for a way to get all data (> 1T and > 600 million key/value pairs) dump to HDFS system.

Thanks for your help!