[infinispan-issues] [JBoss JIRA] (ISPN-4575) Map/Reduce incorrect results with a non-shared non-tx intermediate cache

Dan Berindei (JIRA) issues at jboss.org
Fri Sep 5 08:41:00 EDT 2014


    [ https://issues.jboss.org/browse/ISPN-4575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=12999339#comment-12999339 ] 

Dan Berindei commented on ISPN-4575:
------------------------------------

Whichever you prefer, I would go for CreateCacheCommand.perform.

Since you send the create cache command to all the nodes in the cluster, the expected size should be the number of nodes in the cluster (i.e. {{transport.getMembers().size()}}). Of course, reality creeps in, nodes can join or leave the cluster while you're waiting, or even while the command is in flight. So you actually need to take the initial list of members on the originator, send it to every node as a command parameter, and remove leavers on every iteration. Something like this should work, though the {{sleep}} still isn't very nice:

{code}
int expectedSize = cacheMembers.size();
while (stm.getCacheTopology().getMembers() != expectedSize && stm.getCacheTopology().getPendingCH() != null) {
   Thread.sleep(50);
   cacheMembers.retain(rpcManager.getTransport().getMembers());
   expectedSize = cacheMembers.size();
}
{code}

To be fair, I don't think the current M/R code works very well with nodes joining or leaving the cluster anyway, so perhaps it would be simpler to get the cluster size once at the beginning of the loop and add a timeout.

> Map/Reduce incorrect results with a non-shared non-tx intermediate cache
> ------------------------------------------------------------------------
>
>                 Key: ISPN-4575
>                 URL: https://issues.jboss.org/browse/ISPN-4575
>             Project: Infinispan
>          Issue Type: Bug
>          Components: Core, Distributed Execution and Map/Reduce
>    Affects Versions: 7.0.0.Alpha5
>            Reporter: Dan Berindei
>            Assignee: Vladimir Blagojevic
>            Priority: Blocker
>              Labels: testsuite_stability
>             Fix For: 7.0.0.Beta2
>
>
> In a non-tx cache, if a command is started with topology id {{T}}, and when it is replicated on another node the distribution interceptor sees topology {{T+1}}, it throws an {{OutdatedTopologyException}}. The originator of the command will then retry the command, setting topology {{T+1}}.
> When this happens with a {{PutKeyValueCommand(k, MapReduceManagerImpl.DeltaAwareList)}}, it can lead to duplicate intermediate values.
> Say _A_ is the primary owner of {{k}} in {{T}}, _B_ is a backup owner both in {{T}} and {{T+1}}, and _C_ is the backup owner in {{T}} and the primary owner in {{T+1}} (i.e. _C_ just joined and a rebalance is in progress during {{T}} - see {{NonTxBackupOwnerBecomingPrimaryOwnerTest}}).
> _A_ starts the {{PutKeyValueCommand}} and replicates it to _B_ and _C_. _C_ applies the command, but _B_ already has topology {{T+1}} and throws an {{OutdatedTopologyException}}. _A_ installs topology {{T+1}}, sends the command to _C_ (as the new primary owner), which replicates it to _B_ and then applies it locally a second time.
> This scenario can happen during a M/R task even without nodes joining or leaving. That's because {{CreateCacheCommand}} only calls {{getCache()}} on each member, it doesn't wait for the cache to have a certain number of members or for state transfer to be complete for all the members. The last member to join the intermediate cache is guaranteed to have topology {{T+1}}, but the others may have topology {{T}} by the time the combine phase starts inserting values in the intermediate cache.
> I have seen the {{OutdatedTopologyException}} happen pretty often during the test suite, especially after I removed the duplicate {{invokeRemotely}} call in {{MapReduceTask.executeTaskInit()}}. Most of them were harmless, but there was one failure in CI: http://ci.infinispan.org/viewLog.html?buildId=9811&tab=buildResultsDiv&buildTypeId=bt8
> A short-term fix would be to wait for all the members to finish joining in {{CreateCacheCommand}}. Long-term, M/R tasks should be resilient to topology changes, so we should investigate making {{PutKeyValue(k, DeltaAwareList)}} handle {{OutdatedTopologyException}} s. 



--
This message was sent by Atlassian JIRA
(v6.3.1#6329)


More information about the infinispan-issues mailing list