Connect Apache Spark to your MongoDB database using the mongo-spark-connector

By | April 3, 2019

A couple of days back, we saw how we can connect Apache Spark to an Apache HBase database and query the data from a table using a catalog. Today, we’ll see how we can connect Apache Spark to a MongoDB database and get data directly into Spark from there.

MongoDB provides us a plugin called the mongo-spark-connector, which will help us connect MongoDB and Spark without any drama at all. We just need to provide the MongoDB connection URI in the SparkConf object, and create a ReadConfig object specifying the collection name. It might sound complicated right now, but once you look at the code, you’ll understand how extremely easy this is. So, let’s look at an example.


The Dataset

Before we look at the code, we need to make sure we have some data in our MongoDB to play around with. It’ll be difficult and impractical to generate sample data ourselves. There are various sources of sample datasets for MongoDB which you can use to learn the tool. I’m using the “tweets” data from the dataset available on GitHub here. You can use any dataset you want, just make sure you change the database name, collection name, and the query in the code accordingly.

The Example

As usual, we’ll be writing a Spring Boot application as a POC. Instead of hard-coding the MongoDB connection URI, we’ll get the value from the properties file using the @Value annotation:

private String mongoDbConnectionUri;

Next, we’ll create the SparkConf object with the MongoDB connection URI.

SparkConf sparkConf = new SparkConf()
.set(“spark.mongodb.input.uri”, mongoDbConnectionUri + “.tweets”)

Here, tweets is the name of the collection. You can create multiple Spark configurations like this for multiple collection so that you can query data from different collections and merge them using Spark. But you have to note that each collection needs it’s own Spark configuration, and one Spark configuration can connect to only one collection.

Once we have the configuration setup, we’ll create a read preference using a ReadConfig object, like this:

Map<String, String> tweetsReadOverrides = new HashMap<>();
tweetsReadOverrides.put(“collection”, “tweets”);
tweetsReadOverrides.put(“”, “secondaryPreferred”);
ReadConfig tweetsReadConfig = ReadConfig.create(sparkConf).withOptions(tweetsReadOverrides);

As you can see, we provide the collection name here again. After this, we create the JavaSparkContext, and using that, we’ll load the collection into an instance of JavaMongoRDD<Document>. When I say we’ll load the collection, I don’t mean the whole collection will actually be loaded into memory. We’re just telling Spark that we’d want to load the data into this particular variable, when the loading actually happens.

JavaSparkContext javaSparkContext = new JavaSparkContext(sparkConf);

JavaMongoRDD<Document> tweetsRdd = MongoSpark.load(javaSparkContext, tweetsReadConfig);

Next, we provide the actual query which will be used to filter the data that will be loaded into memory. If you provide an empty query, the whole collection will be loaded. If you provide a non-empty query, only the documents that qualify the query will be loaded. The query should be provided in the form of a String, the string which goes inside the {} in a MongoDB find({}) query. I looked at the sample data I downloaded and saw that each tweet has a user document embedded, and which in turn has an id_str field, which holds (hopefully) the ID of the user who sent out the tweet. So I’ll use this field to create a simple query, like this:

String matchQuery = "\"user.id_str\": \"150820027\"";

I found that ID in the sample dataset itself. Now that we have the query ready, let’s tell Spark that this is the query based on which we want to load the data from MongoDB. For this, we’ll use the tweetsRdd variable we created earlier, and use a feature called pipeline, which accepts a list of queries. But because we have only one query to worry about, we’ll just provide that one query.

JavaRDD<Document> tweetsJavaRdd = tweetsRdd.withPipeline(Collections.singletonList(
Document.parse(“{ $match: { “ + matchQuery + “ } }”)

The .withPipeline() function returns an instance of type RDD<Document>. As you probably guessed by now, the Document is the MongoDB document which will be fetched from the database. 

Now, on this RDD variable, you can use any kind of transformation you fancy, or call any required operation. I’ll give examples of both transformation and operation. Let’s just do a map() on this so that we can get a list of all the tweets that this person has sent out. We’ll take the actual tweet texts, which is in the text field in the sample dataset.

JavaRDD<String> tweetTexts = -> {
return singleRdd.getString("text");

We now have a new JavaRDD of type String, which has only the text of the tweets. Another use case could be to count the number of tweets which have a particular word or phrase. Anyway, now let’s just print out the list of tweets we just mapped to the RDD variable. Fortunately, the list is very small. But to get the list into one variable, we’ll have to call the collect() action on the RDD.

List<String> tweets = tweetTexts.collect();"Collected tweets: " + tweets);

As another example, let’s also see how many tweets we have, which will illustrate the use of the count() action on the RDD.

long tweetsCount = tweetTexts.count();"Collected tweets count: " + tweetsCount);

Right, that’s it, folks. You have now successfully connected Apache Spark to your MongoDB database and loaded data directly into Spark.

As always, you can take a look at the complete project over at GitHub.

Spread the knowledge

Leave a Reply

Your email address will not be published. Required fields are marked *

This site uses Akismet to reduce spam. Learn how your comment data is processed.