Download Riak 2.0

Using MapReduce

MapReduce (M/R) is a technique for dividing data processing work across a distributed system. It takes advantage of the parallel processing power of distributed systems and also reduces network bandwidth, as the algorithm is passed around to where the data lives rather than transferring a potentially huge dataset to a client algorithm.

Developers can use MapReduce for things like filtering documents by tags, counting words in documents, and extracting links to related data. In Riak, MapReduce is one method for querying that is not strictly based on key querying, alongside secondary indexes and Search. MapReduce jobs can be submitted through the HTTP API or the Protocol Buffers API, although we strongly recommend using the Protocol Buffers API for performance reasons.

MapReduce not for production use

Riak MapReduce operations are very expensive computationally. Running an expensive MapReduce job in production could have a significant performance impact on your cluster. Because of this, we recommend MapReduce operations only for batch processing purposes, not for real- time querying.

Features

When to Use MapReduce

When Not to Use MapReduce

How it Works

The MapReduce framework helps developers divide a query into steps, divide the dataset into chunks, and then run those step/chunk pairs in separate physical hosts.

There are two steps in a MapReduce query:

Riak MapReduce queries have two components:

The elements of the input list are object locations as specified by bucket type, bucket, and key. The elements of the phases list are chunks of information related to a map, a reduce, or a link function.

A MapReduce query begins when a client makes the request to Riak. The node that the client contacts to make the request becomes the coordinating node responsible for the MapReduce job. As described above, each job consists of a list of phases, where each phase is either a map or a reduce phase. The coordinating node uses the list of phases to route the object keys and the function that will operate over the objects stored in those keys and instruct the proper vnode to run that function over the right objects.

After running the map function, the results are sent back to the coordinating node. This node then concatenates the list and passes that information over to a reduce phase on the same coordinating node, assuming that the next phase in the list is a reduce phase.

Example

MapReduce Diagram

Data object input commands

In this example, we'll create four objects with the text “caremad” repeated a varying number of times and store those objects in the bucket training (which does not bear a bucket type). An Erlang MapReduce function will be used to count the occurrences of the word “caremad.”

Data object input commands

For the sake of simplicity, we'll use curl in conjunction with Riak's HTTP API to store the objects:

curl -XPUT http://localhost:8098/buckets/training/keys/foo -H 'Content-Type: text/plain' -d 'pizza data goes here'

curl -XPUT http://localhost:8098/buckets/training/keys/bar -H 'Content-Type: text/plain' -d 'pizza pizza pizza pizza'

curl -XPUT http://localhost:8098/buckets/training/keys/baz -H 'Content-Type: text/plain' -d 'nothing to see here'

curl -XPUT http://localhost:8098/buckets/training/keys/bam -H 'Content-Type: text/plain' -d 'pizza pizza pizza'

MapReduce script and deployment

The code sample below runs a MapReduce job that returns a list of tuples. The first member of each tuple is the key of the object, while the second member states the number of times that the word “pizza” has occurred in the text body of that object.

Here is the Erlang function:

%% Need an Erlang function here

Here is the HTTP call that

curl -XPOST http://localhost:8098/mapred \
  -H 'Content-Type: application/json' \
  -d '{ function }'

Next, to call ReFun on all keys in the training bucket, we can do the following in the Erlang shell. Do not use this in a production environment; listing all keys to identify those in the training bucket is a very expensive process.

The output is the key of each object, followed by the count of the word “pizza” for that object. It looks like:

That will return output along the following lines, verifying that compilation has completed:

{ok,{re_pattern,0,0,
                <<69,82,67,80,69,0,0,0,0,0,0,0,6,0,0,0,0,0,0,0,99,0,100,
                  ...>>}}

Then, we can create a socket link to our cluster:

{ok, Riak} = riakc_pb_socket:start_link("127.0.0.1", 8087).

%% This should return a process ID:
%% {ok,<0.34.0>}

Then we can run the compiled MapReduce job on the training bucket:

riakc_pb_socket:mapred_bucket(Riak, <<"training">>,
    [{map, {qfun, ReFun}, Re, true}]).

That will return a list of tuples. The first element in each tuple is the key for each object in the bucket, while the second element displays the number of instances of the word “caremad” in the object:

{ok,[{0,
      [{<<"foo">>,1},{<<"bam">>,3},{<<"baz">>,0},{<<"bar">>,4}]}]}

Recap

In this tutorial, we ran an Erlang MapReduce function against a total of four object in the training bucket. This job took each key/value object in the bucket and searched the text for the word “caremad,” counting the number of instances of the word.

Advanced MapReduce Queries

Further Reading