Map Reduce 2.0
by Vladimir Blagojevic
Hey guys,
Before moving forward with next iteration of map reduce I wanted to hear
your thoughts about the following proposal. After we agree on the
general direction I will transcribe the agreed design on a wiki page and
start implementation.
Shortcoming of current map reduce implementation
While our current map reduce implementation is more than a proof of a
concept there are several drawbacks preventing it from being an
industrial grade map reduce solution. The main drawback is the inability
of the current solution to deal with a large data (in GB/TB) map reduce
problems. This shortcoming is mainly around our reduce phase execution.
Reduce phase, as you might know, is currently done on a single
Infinispan master task node; reduce phase of map reduce problems we can
support (data size wise) is therefore shrunk to a working memory of a
single node.
Proposed solution
The proposed solution involves distributing execution of reduce phase
tasks across the cluster thus effectively achieving higher reduce task
parallelization and at the same time removing the above mentioned reduce
phase restriction. Effectively leveraging our consistent hashing
solution even further we can parallelize reduce phase and elevate our
map reduce solution to an industrial level. Here is how we can achieve that.
Map phase
MapReduceTask, as it currently does, will hash task input keys and group
them by execution node N they are hashed to. For each node N and its
grouped input KIn keys MapReduceTask creates a MapCombineCommand which
is migrated to an execution target node N. MapCombineCommand is similar
to current MapReduceCommand. MapCombineCommand takes an instance of a
Mapper and an instance of a Reducer, which is a combiner [1].
Once loaded into target execution node MapCombineCommand takes each
local KIn key and executes Mapper method void map(KIn key, VIn value,
Collector<KOut, VOut> collector). Results are collected to a common
Collector<KOut, VOut> collector and combine phase is initiated. A
Combiner, if specified, takes KOut keys and imediatelly invokes reduce
phase on keys. The result of mapping phase executed on each node is
<KOut, VOut> map M. There will be one resulting M map per execution node N.
At the end of combine phase instead of returning map M to the master
task node (as we currently do), we now hash each KOut in map M and group
KOut keys by the execution node N they are hashed to. Each group of KOut
keys and its VOut values, hashed to the same node, is wrapped with a new
command Migrate. Command Migrate, which is very similar to
PutKeyValueCommand,executed on Infinispan target node N esentially
maintains KOut K -> List<VOut> mapping, i.e all KOut/VOut pairs from all
executed MapCombineCommands will be collocated on a node N where KOut is
hashed to and value for KOut will be a list of all VOut values. We
essentially collect all VOut values under each KOut for all executed
MapCombineCommands.
At this point MapCombineCommand has finished its execution; list of KOut
keys is returned to a master node and its MapReduceTask. We do not
return VOut values as we do not need them at master task node.
MapReduceTask is ready to start with reduce phase.
Reduce phase
MapReduceTask initializes ReduceCommand with a user specified Reducer.
For each key KOut collected from a map phase we group them by execution
node N they are hashed to. For each node N and its grouped input KOut
keys MapReduceTask creates a ReduceCommand and sends it to a node N
where KOut keys are hashed. Once loaded on target execution node,
ReduceCommand for each KOut key grabs list of values VOut and invokes:
VOut reduce(KOut reducedKey, Iterator<VOut> iter).
A result of ReduceCommand is a map M where each key is KOut and value is
VOut. Each Infinispan execution node N returns one map M where each key
KOut is hashed to N and each VOut is KOut's reduced value.
When all ReduceCommands return to a calling node, MapReduceTask simply
combines all these M maps and returns final Map<KOut, VOut> as a result
of MapReduceTask. All intermediate KOut->List<VOut> maps left on
Infinispan cluster are then cleaned up.
[1] See section 4.3 of http://research.google.com/archive/mapreduce.html
12 years, 6 months
Time for a new NBST thread
by Dan Berindei
Hi guys
I published a new version of the non-blocking state transfer design
document here:
https://community.jboss.org/wiki/Non-blockingStateTransfer
This time I focused a bit on the "other" cache configuration (other
than dist-sync, that is). This revealed some new problems, these are
the most interesting ones:
* With async-commit phase or async-1pc, we can get stale locks on
joiners. I have an Infinispan-only approach that should work, but it
may be easier to just require FLUSH.
* Async-commit + recovery enabled: a node sends a commit command to 2
owners, one node loses the message, and then the originator dies
before retransmitting. How do we let the TM know that the transaction
is in-doubt, since to the TM it looks like it has been committed?
Please try to read the document and let me know if anything sounds too
opaque or doesn't make sense. I think it's good enough for starting
the implementation, although I've removed the actual implementation
details from the document for now.
Cheers
Dan
12 years, 6 months
Let's stop with pull requests
by Sanne Grinovero
guys, please don't take me as the one who is again complaining about
failing tests; I'm having doubts about the development process and the
amount of time this is wasting on all of us.
We're all humans and do mistakes, still it happens so extremely often
that this is getting systemic, and discipline could definitely be
improved: people regularly send pull requests with failing tests or
broken code, and very regularly this is just merged in master.
I did it myself a couple of days ago: didn't notice a failure, all
looked good, sent a pull, it was merged with no complaints. Three days
later, I resume my work and am appalled to see that it was broken. Now
fixing it, but I'll have to send another pull and wait for it - which
feels very pointless, as I'm pretty sure nobody is checking anyway.
It looks like as the pull request procedure is having this effect:
# patch writer is not as carefull as he used to be: "someone else
will check if it's fine or not. I have no time to run the tests
again..".
# reviewer has as quick look. "Looks good - in fact I don't care
much, it's not my code and need to return to my own issues.. worst
case someone else will fix it blaming the original author"
And then again some incomplete test makes it to master, or a patch
which doesn't even compile is integrated.
This pull request process is being a big failure. Shall we stop
wasting time on it and just push on master?
Which doesn't mean I'm suggesting "let's make it worse" | "unleash
hell": we should all take responsibility on any change very seriously.
Again, I'm not enjoying the role of "whom who complains on the
testsuite again". Just stating a fact, and trying to propose something
to make it work better. We have great individuals on this team, but we
need to admit that team work isn't working and we should deal with it
at it's best; denying it won't help.
Cheers,
Sanne
12 years, 6 months
FLUSH-medium and NBST
by Dan Berindei
Hi Bela
Let me try to expand a bit on how I see a potential FLUSH-medium protocol.
The NBST design starts off from the premise that we can reject
commands tagged with an older cache view id and the originator will
retry the command with the proper view id (and on the proper owners).
This is quite straightforward in sync caches, but can get messy in
async caches (or even with only async commit phase). Note that I'm
only talking now about a new node joining, not about a node leaving
(which is another scenario where FLUSH helps, but unfortunately only
in replicated mode).
# Async commit
Let's say we have a cluster with three nodes, A (the coordinator), B
and C, and a distributed cache async commit phase and numOwners = 1
running on it. The current cache view is {1|A, B, C}
1. B starts a transaction Tx1 { put(k1, v1) }. It sends the
prepare(Tx1) command to C, which is the only owner of key k1, and gets
a success response.
2. B sends the commit(Tx1) command to C, but the message is delayed.
It then proceeds to delete Tx1 from its transaction table.
3. D joins the cluster, sends a requestJoin(D) to A, and A sends a
prepareView({2|A, B, C, D}) to B, C, and D.
4. C receives the prepareView({2|A, B, C, D}) command, sees that it's
no longer the owner of k, and sends an applyState(prepare(Tx1))
command to D.
5. C receives the commit(Tx1) command, discards it because it has an
old view id.
6. D receives the prepareView({2|A, B, C, D}) command.
6. D receives the applyState(prepare(Tx1)) command and blocks any
other transaction from acquiring the lock on key k1. B will never send
a commit(Tx1) command to D, so the lock is held until B or D die.
I can see two ways of solving this:
a. In step 5, instead of rejecting the commit(Tx1) command, C could
forward it to the new owner (D).
b. In step 4, before pushing the transaction information to D, C could
wait to receive all commit commands sent by any node in the old cache
view.
In the previous version of the NBST document I had chosen to forward
the prepare(Tx1) command from C to D, but that seemed overly
complicated. I'm not so sure any more... (It could be because I was
trying to use forwarding to "solve" the scenario where the originator
dies after sending a message, but I'll send a separate email for
that.)
In the latest version of the NBST design document I chose option b). I
had an idea for an API (whether in JGroups or in Infinispan), more or
less like a distributed CountDownLatch, that would ensure when a
thread returns from await() any messages sent by the other members (to
this node) before their countDown() call have already been delivered.
I'd also need to identify latches by increasing id numbers (like the
cache view id) and whenever someone calls countDown(id) everyone
blocked in await(id - 1) should unblock.
The TO guys have an alternative that's quite a bit simpler (although
it does mean a longer "down" time): block until we receive commit
commands for all the transactions we have in our transaction table.
This means that if a 3rd party XA resource on B takes a very long time
to prepare the transaction, C that would have to wait for that XA
resource to finish preparing, and everyone else would have to wait for
C to send its transaction data.
# Async prepare
If the prepare would have been asynchronous, we wouldn't have a stale
lock (because 1PC is automatically activated when prepares are async),
but we would lose the prepare on the joiner:
1. B starts a transaction Tx1 { put(k1, v1) }. It sends the
prepare(Tx1) command to C, which is the only owner of key k1, but the
message is delayed.
2. D joins the cluster, sends a requestJoin(D) to A, and A sends a
prepareView({2|A, B, C, D}) to B, C, and D.
3. C and D receive the prepareView({2|A, B, C, D}), C doesn't have any
transaction information to send to D.
4. C receives the prepare(Tx1) command, discards it because it has an
old view id.
5. D never receives the prepare(Tx1) command, the entry k1->v1 is lost.
It's definitely not as bad as having a stale lock, but still, losing
data without any node leaving the cache doesn't look good.
We have the same two options here as well:
a. In step C, forward the prepare(Tx1) command to the new owner (D).
b. Never reject prepare commands with viewId-1: block state transfer
on C as long as there are any in-flight commands.
We can't use the TO guys' alternative for b) here, because C has no
idea that it will receive the prepare(Tx1) command.
# TOB/TOM
in TOB/TOM the prepare command is asynchronous, but it is also looped
back to the originator in total order with the prepareView command. So
it's not possible for C to reject the prepare(Tx1) command without B
also rejecting it and resending it to D.
That's about it. Thanks for looking into it Bela!
--
Dan
12 years, 6 months