layout | title |
---|---|
home |
ReactiveMongo |
ReactiveMongo is a scala driver that provides fully non-blocking and asynchronous I/O operations.
With a classic synchronous database driver, each operation blocks the current thread until a response is received. This model is simple but has a major flaw - it can't scale that much.
Imagine that you have a web application with 10 concurrent accesses to the database. That means you eventually end up with 10 frozen threads at the same time, doing nothing but waiting for a response. A common solution is to rise the number of running threads to handle more requests. Such a waste of resources is not really a problem if your application is not heavily loaded, but what happens if you have 100 or even 1000 more requests to handle, performing each several db queries? The multiplication grows really fast...
The problem is getting more and more obvious while using the new generation of web frameworks. What's the point of using a nifty, powerful, fully asynchronous web framework if all your database accesses are blocking?
ReactiveMongo is designed to avoid any kind of blocking request. Every operation returns immediately, freeing the running thread and resuming execution when it is over. Accessing the database is not a bottleneck anymore.
The future of the web is in streaming data to a very large number of clients simultaneously. Twitter Stream API is a good example of this paradigm shift that is radically altering the way data is consumed all over the web.
ReactiveMongo enables you to build such a web application right now. It allows you to stream data both into and from your MongoDB servers.
One scenario could be consuming progressively your collection of documents as needed without filling memory unnecessarily.
But if what you're interested in is live feeds then you can stream a MongoDB capped collection through a websocket, comet or any other streaming protocol. A capped collection is a fixed-size (FIFO) collection from which you can fetch documents as they are inserted. Each time a document is stored into this collection, the webapp broadcasts it to all the interested clients, in a complete non-blocking way.
Moreover, you can now use GridFS as a non-blocking, streaming datastore. ReactiveMongo retrieves the file, chunk by chunk, and streams it until the client is done or there's no more data. Neither huge memory consumption, nor blocked thread during the process!
Let's show a simple use case: print the documents of a collection.
We assume that you got a running MongoDB instance. If not, get the latest MongoDB binaries and unzip the archive. Then you can launch the database:
{% highlight sh %} $ mkdir data $ ./bin/mongod --dbpath data {% endhighlight %}
This will start a standalone MongoDB instance that stores its data in the data
directory and listens on the TCP port 27017.
ReactiveMongo is available on Maven Central. If you use SBT, you just have to edit build.sbt
and add the following:
{% highlight scala %} libraryDependencies ++= Seq( "org.reactivemongo" %% "reactivemongo" % "{{site.latest_release}}" ) {% endhighlight %}
Or if you want to be on the bleeding edge using snapshots:
{% highlight scala %} resolvers += "Sonatype Snapshots" at "https://oss.sonatype.org/content/repositories/snapshots/"
libraryDependencies ++= Seq( "org.reactivemongo" %% "reactivemongo" % "0.12.0-SNAPSHOT" ) {% endhighlight %}
You can get a connection to a server (or a replica set) like this:
{% highlight scala %} def connect() { import reactivemongo.api._ import scala.concurrent.ExecutionContext.Implicits.global
// gets an instance of the driver // (creates an actor system) val driver = new MongoDriver val connection = driver.connection(List("localhost"))
// Gets a reference to the database "plugin" val db = connection("plugin")
// Gets a reference to the collection "acoll" // By default, you get a BSONCollection. val collection = db("acoll") } {% endhighlight %}
A detailed documentation is available about the ReactiveMongo connections.
{% highlight scala %} import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global
import play.api.libs.iteratee.Iteratee
import reactivemongo.bson.BSONDocument import reactivemongo.api.collections.bson.BSONCollection
def listDocs(collection: BSONCollection) = { // Select only the documents which field 'firstName' equals 'Jack' val query = BSONDocument("firstName" -> "Jack") // select only the fields 'lastName' and '_id' val filter = BSONDocument( "lastName" -> 1, "_id" -> 1)
/* Let's run this query then enumerate the response and print a readable
- representation of each document in the response */ collection. find(query, filter). cursor[BSONDocument]. enumerate().apply(Iteratee.foreach { doc => println(s"found document: ${BSONDocument pretty doc}") })
// Or, the same with getting a list val futureList: Future[List[BSONDocument]] = collection. find(query, filter). cursor[BSONDocument]. collectList
futureList.map { list => list.foreach { doc => println(s"found document: ${BSONDocument pretty doc}") } } } {% endhighlight %}
The above code deserves some explanations. First, let's take a look to the collection.find
signature:
{% highlight scala %}
// Considering collection
is a BSONCollection
,
// with its pack
being a BSONSerializationPack
.
def find[S](selector: S)(implicit swriter: pack.Writer[S]): GenericQueryBuilder[pack.type] {% endhighlight %}
The find method allows you to pass any query selector of type S
, provided that there is an implicit BSONDocumentWriter[S]
in the scope. BSONDocumentWriter[S]
is a typeclass which instances implement a write(document: S)
function that returns a BSONDocument
. It can be described as follows:
{% highlight scala %} trait BSONDocumentWriter[DocumentType] { def write(document: DocumentType): BSONDocument } {% endhighlight %}
Obviously, there is a default writer for BSONDocument
s so you can give a BSONDocument
as an argument for the find
method.
The find method returns a GenericQueryBuilder
– the query is therefore not performed yet. It gives you the opportunity to add options to the query, like a sort order, projection, flags...
When your query is ready to be sent to MongoDB, you may just call the cursor
method on it. This method is parametrized with the type which the response documents will be deserialized to. A BSONDocumentReader[T]
must be implicitly available in the scope for that type. As opposed to BSONDocumentWriter[T]
, a reader is typically a deserializer that takes a BSONDocument
and returns an instance of T
:
{% highlight scala %} trait BSONDocumentReader[DocumentType] { def read(buffer: BSONDocument): DocumentType } {% endhighlight %}
Like for BSONDocumentWriter[T]
, there is a default reader for BSONDocument
in the package reactivemongo.bson
.
When a query matches too much documents, MongoDB sends just a part of them and creates a cursor in order to get the next documents. The problem is, how to handle it in a non-blocking, asynchronous, yet elegant way?
Obviously ReactiveMongo's Cursor
provides helpful methods to build a collection (like a List
) from it, so we can write:
{% highlight scala %} import scala.concurrent.Future import scala.concurrent.ExecutionContext.Implicits.global
import reactivemongo.bson.BSONDocument import reactivemongo.api.collections.bson.BSONCollection
def print(collection: BSONCollection) = { val query = BSONDocument("foo" -> "bar") val cursor = collection.find(query).cursor[BSONDocument] val futureList: Future[List[BSONDocument]] = cursor.collectList
futureList.map { list => println("ok, got the list: " + list) } } {% endhighlight %}
As always, this is perfectly non-blocking... but what if we want to process the returned documents on the fly, without creating a potentially huge list in memory?
That's where the streaming API comes to the rescue!
Let's consider the following statement:
{% highlight scala %} cursor.enumerate().apply(Iteratee.foreach { doc => println(s"found document: ${BSONDocument pretty doc}") }) {% endhighlight %}
The method cursor.enumerate()
returns an Enumerator[T]
. Enumerators can be seen as producers of data: their job is to give chunks of data when data is available. In this case, we get a producer of documents, which source is a future cursor.
Now that we have the producer, we need to define how the documents are processed: that is the Iteratee
's job. Iteratees, as the opposite of Enumerators, are consumers: they are fed in by enumerators and do some computation with the chunks they get.
Here, we write a very simple Iteratee: each time it gets a document, it makes a readable, JSON-like description of the document and prints it on the console. Note that none of these operations are blocking: when the running thread is not processing the callback of our iteratee, it can be used to compute other things.
When this snippet is run, we get the following:
found document: {
_id: BSONObjectID("4f899e7eaf527324ab25c56b"),
lastName: BSONString(London)
}
found document: {
_id: BSONObjectID("4f899f9baf527324ab25c56c"),
lastName: BSONString(Kerouac)
}
found document: {
_id: BSONObjectID("4f899f9baf527324ab25c56d"),
lastName: BSONString(Nicholson)
}
The developer documentation can be browsed online.
There is a pretty complete Scaladoc available. The code is accessible from the Github repository. And obviously, don't hesitate to ask questions in the ReactiveMongo Google Group!
ReactiveMongo uses of the Iteratee library. Although it is developped by the Play! Framework team, it does not depend on any other part of the framework. You can dive into Play's Iteratee documentation to learn about this cool piece of software, and make your own Iteratees and Enumerators.
Used in conjonction with stream-aware frameworks, like Play!, you can easily stream the data stored in MongoDB. For Play, there is a ReactiveMongo Plugin that brings some cool stuff, like JSON-specialized collection and helpers for GridFS. See the examples and get convinced!
These sample applications are kept up to date with the latest driver version. They are built upon Play Framework.