Connect Apache Spark to your HBase database (Spark-HBase Connector)
There will be times when you’ll need the data in your HBase database to be brought into Apache Spark for processing. Usually, you’ll query the database, get the data in whatever format you fancy, and then load that into Spark, maybe using the `parallelize()`function. This works, just fine. But depending on the size of the data, this could cause delays. At least it did for our application.
So after some research, we stumbled upon a Spark-HBase connector in Hortonworks repository. Now, what is this connector and why should you be considering this?
The Spark-HBase Connector (shc-core)
The SHC is a tool provided by Hortonworks to connect your HBase database to Apache Spark so that you can tell your Spark context to pickup the data directly from HBase instead of you writing code to load data into memory or files, and then reading from there inside Spark. This makes lives easier, by letting you write queries, which could be passed on to HBase to fetch only the subset of data that you need. I feel some example code will help you understand this better. So let’s get our hands dirty.
The Example
We’re going to write a Spring Boot application for this example. So first, let’s get some values from the application.properties
file:
@Value("${spark.master}")
private String sparkMaster;
@Value("${spark.hbase.host}")
private String sparkHbaseHost;
@Value("${spark.hbase.port}")
private String sparkHbasePort;
It’s pretty obvious why we need these values. So I’m not going to explain why we need this, and also how this works in Spring Boot. Just make sure you have the required data. Now, using this information, we’re going to create a Spark Context. Then, we’ll provide the the context with the HBase configuration. After that, we’ll create a SQLContext object.
SparkConf conf = new SparkConf().setAppName("SparkHbaseConnectorPOC")
.setMaster(sparkMaster);
JavaSparkContext javaSparkContext = new JavaSparkContext(conf);
javaSparkContext.hadoopConfiguration().set("spark.hbase.host", sparkHbaseHost);
javaSparkContext.hadoopConfiguration().set("spark.hbase.port", sparkHbasePort);
SQLContext sqlContext = new SQLContext(javaSparkContext);
For each table in HBase, a ‘catalog’ has to be provided, which has the table name, the row key, and the columns. The columns need to include the data type and the column family information as well. In the catalog, we also map the HBase column qualifiers with the column names in the SparkSQL table schema. The whole catalog is a JSON string. Here is the catalog in our example:
String catalog = "{\n" +
"\t\"table\":{\"namespace\":\"default\", \"name\":\"test\", \"tableCoder\":\"PrimitiveType\"},\n" +
" \"rowkey\":\"key\",\n" +
" \"columns\":{\n" +
"\t \"rowkey\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},\n" +
"\t \"value\":{\"cf\":\"value\", \"col\":\"value\", \"type\":\"string\"}\n" +
" }\n" +
"}";
As you can see, we have the namespace as default
, table name as test
. The row key from the HBase table will be stored in the SparkSQL schema in the column key
. You can also see from the catalog that we have the column family and the column qualifier both as value
. We’ll save this in a column called value
again.
The code snippet below, which is pretty much self-explanatory, is how we specify the catalog, and load the dataset into Spark:
Map<String, String> optionsMap = new HashMap<>();
String htc = HBaseTableCatalog.tableCatalog();
optionsMap.put(htc, catalog);
// optionsMap.put(HBaseRelation.MIN_STAMP(), "123");
// optionsMap.put(HBaseRelation.MAX_STAMP(), "456");
Dataset dataset = sqlContext.read().options(optionsMap).format("org.apache.spark.sql.execution.datasources.hbase").load();
As you can see from the commented code in the snippet above, you can filter data based on the HBase timestamps as well.
Next, we’ll look at some example filters which we can use on the SparkSQL dataset we just loaded:
Dataset filteredDataset = dataset.filter(
dataset.col("rowkey").$greater$eq("1")
.and(dataset.col("rowkey").$less$eq("10")))
.filter(
dataset.col("rowKey").$greater$eq("3")
.and(dataset.col("rowkey").$less$eq("30"))
)
.select("rowkey");
Here, we’re filtering the data based on the values of the row keys. Once we have this filtered data in the Dataset object, we can use any of the transformations and operations that are provided in Spark. For example, let’s see how many records we have, and what the records actually are:
System.out.println("Count: " + filteredDataset.count());
System.out.println("List: " + filteredDataset.collect());
And that’s pretty much it. We now have connected HBase and Spark, and can query data directly from HBase into Spark, instead of having to load the data into memory manually and then load it into the Spark cluster.
If you want to have a look at the complete Spring Boot project, here it is.
Thanks