[infinispan-commits] Infinispan SVN: r1684 - in trunk/core/src: main/java/org/infinispan/notifications/cachemanagerlistener and 4 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Apr 12 10:34:21 EDT 2010
Author: manik.surtani at jboss.com
Date: 2010-04-12 10:34:20 -0400 (Mon, 12 Apr 2010)
New Revision: 1684
Modified:
trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java
trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
Log:
[ISPN-398] (Cluster breaks after a MergeView is issued) Tests and fix
Modified: trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -13,7 +13,9 @@
import org.infinispan.context.Flag;
import org.infinispan.context.InvocationContext;
import org.infinispan.context.InvocationContextContainer;
+
import static org.infinispan.distribution.ConsistentHashHelper.createConsistentHash;
+
import org.infinispan.factories.annotations.Inject;
import org.infinispan.factories.annotations.Start;
import org.infinispan.factories.annotations.Stop;
@@ -36,6 +38,7 @@
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.MembershipArithmetic;
import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
import org.rhq.helpers.pluginAnnotations.agent.DataType;
@@ -97,7 +100,7 @@
volatile boolean rehashInProgress = false;
volatile Address joiner;
static final AtomicReferenceFieldUpdater<DistributionManagerImpl, Address> JOINER_CAS =
- AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
+ AtomicReferenceFieldUpdater.newUpdater(DistributionManagerImpl.class, Address.class, "joiner");
private DataContainer dataContainer;
private InterceptorChain interceptorChain;
private InvocationContextContainer icc;
@@ -106,7 +109,7 @@
volatile boolean joinComplete = false;
final List<Address> leavers = new CopyOnWriteArrayList<Address>();
volatile Future<Void> leaveTaskFuture;
- final CountDownLatch startLatch = new CountDownLatch(1);
+ final ReclosableLatch startLatch = new ReclosableLatch(false);
@Inject
public void init(Configuration configuration, RpcManager rpcManager, CacheManagerNotifier notifier, CommandsFactory cf,
@@ -123,20 +126,26 @@
}
// needs to be AFTER the RpcManager
+
@Start(priority = 20)
public void start() throws Exception {
replCount = configuration.getNumOwners();
+ listener = new ViewChangeListener();
+ notifier.addListener(listener);
+ join();
+ }
+
+ private void join() throws Exception {
+ startLatch.close();
consistentHash = createConsistentHash(configuration, rpcManager.getTransport().getMembers());
self = rpcManager.getTransport().getAddress();
- listener = new ViewChangeListener();
- notifier.addListener(listener);
if (rpcManager.getTransport().getMembers().size() > 1) {
JoinTask joinTask = new JoinTask(rpcManager, cf, configuration, transactionLogger, dataContainer, this);
rehashExecutor.submit(joinTask);
} else {
joinComplete = true;
}
- startLatch.countDown();
+ startLatch.open();
}
@Stop(priority = 20)
@@ -214,7 +223,7 @@
if (consistentHash == null) {
Map<Object, List<Address>> m = new HashMap<Object, List<Address>>(keys.size());
List<Address> selfList = Collections.singletonList(self);
- for (Object k: keys) m.put(k, selfList);
+ for (Object k : keys) m.put(k, selfList);
return m;
}
return consistentHash.locateAll(keys, replCount);
@@ -230,7 +239,7 @@
ResponseFilter filter = new ClusteredGetResponseValidityFilter(locate(key));
List<Response> responses = rpcManager.invokeRemotely(locate(key), get, ResponseMode.SYNCHRONOUS,
- configuration.getSyncReplTimeout(), false, filter);
+ configuration.getSyncReplTimeout(), false, filter);
if (!responses.isEmpty()) {
for (Response r : responses) {
@@ -268,7 +277,8 @@
if (trace) log.trace("Allowing {0} to join", joiner);
return new LinkedList<Address>(consistentHash.getCaches());
} else {
- if (trace) log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
+ if (trace)
+ log.trace("Not allowing {0} to join since there is a join already in progress {1}", joiner, this.joiner);
return null;
}
}
@@ -328,12 +338,21 @@
public void handleViewChange(ViewChangedEvent e) {
boolean started = false;
// how long do we wait for a startup?
- try {
- started = startLatch.await(2, TimeUnit.MINUTES);
- if (started) rehash(e.getNewMembers(), e.getOldMembers());
- else log.warn("DistributionManager not started after waiting up to 2 minutes! Not rehashing!");
- } catch (InterruptedException ie) {
- log.warn("View change interrupted; not rehashing!");
+ if (e.isNeedsToRejoin()) {
+ try {
+ join();
+ } catch (Exception e1) {
+ log.fatal("Unable to recover from a partition merge!", e1);
+ }
+ } else {
+
+ try {
+ started = startLatch.await(2, TimeUnit.MINUTES);
+ if (started) rehash(e.getNewMembers(), e.getOldMembers());
+ else log.warn("DistributionManager not started after waiting up to 2 minutes! Not rehashing!");
+ } catch (InterruptedException ie) {
+ log.warn("View change interrupted; not rehashing!");
+ }
}
}
}
Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifier.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -19,7 +19,7 @@
* Notifies all registered listeners of a viewChange event. Note that viewChange notifications are ALWAYS sent
* immediately.
*/
- void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId);
+ void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean b);
void notifyCacheStarted(String cacheName);
Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -55,7 +55,7 @@
this.cacheManager = cacheManager;
}
- public void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId) {
+ public void notifyViewChange(List<Address> members, List<Address> oldMembers, Address myAddress, int viewId, boolean needsToRejoin) {
if (!viewChangedListeners.isEmpty()) {
EventImpl e = new EventImpl();
e.setLocalAddress(myAddress);
@@ -63,6 +63,7 @@
e.setNewMembers(members);
e.setOldMembers(oldMembers);
e.setCacheManager(cacheManager);
+ e.setNeedsToRejoin(needsToRejoin);
e.setType(Event.Type.VIEW_CHANGED);
for (ListenerInvocation listener : viewChangedListeners) listener.invoke(e);
}
Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/EventImpl.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -19,6 +19,7 @@
List<Address> newMembers, oldMembers;
Address localAddress;
int viewId;
+ private boolean needsToRejoin;
public EventImpl() {
}
@@ -96,13 +97,12 @@
EventImpl event = (EventImpl) o;
+ if (needsToRejoin != event.needsToRejoin) return false;
if (viewId != event.viewId) return false;
if (cacheName != null ? !cacheName.equals(event.cacheName) : event.cacheName != null) return false;
if (localAddress != null ? !localAddress.equals(event.localAddress) : event.localAddress != null) return false;
- if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null)
- return false;
- if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null)
- return false;
+ if (newMembers != null ? !newMembers.equals(event.newMembers) : event.newMembers != null) return false;
+ if (oldMembers != null ? !oldMembers.equals(event.oldMembers) : event.oldMembers != null) return false;
if (type != event.type) return false;
return true;
@@ -116,18 +116,27 @@
result = 31 * result + (oldMembers != null ? oldMembers.hashCode() : 0);
result = 31 * result + (localAddress != null ? localAddress.hashCode() : 0);
result = 31 * result + viewId;
+ result = 31 * result + (needsToRejoin ? 1 : 0);
return result;
}
@Override
public String toString() {
return "EventImpl{" +
- "cacheName='" + cacheName + '\'' +
- ", type=" + type +
- ", newMembers=" + newMembers +
- ", oldMembers=" + oldMembers +
- ", localAddress=" + localAddress +
- ", viewId=" + viewId +
- '}';
+ "type=" + type +
+ ", newMembers=" + newMembers +
+ ", oldMembers=" + oldMembers +
+ ", localAddress=" + localAddress +
+ ", viewId=" + viewId +
+ ", needsToRejoin=" + needsToRejoin +
+ '}';
}
+
+ public void setNeedsToRejoin(boolean needsToRejoin) {
+ this.needsToRejoin = needsToRejoin;
+ }
+
+ public boolean isNeedsToRejoin() {
+ return needsToRejoin;
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/ViewChangedEvent.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -41,5 +41,7 @@
Address getLocalAddress();
+ boolean isNeedsToRejoin();
+
int getViewId();
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -51,6 +51,7 @@
import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
+import org.jgroups.MergeView;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
@@ -492,7 +493,7 @@
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
if (needNotification && notifier != null) {
- notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId());
+ notifier.notifyViewChange(members, oldMembers, getAddress(), (int) newView.getVid().getId(), needsToRejoin(newView));
}
// Wake up any threads that are waiting to know about who the coordinator is
@@ -500,6 +501,23 @@
}
}
+ private boolean needsToRejoin(View v) {
+ if (v instanceof MergeView) {
+ MergeView mv = (MergeView) v;
+ org.jgroups.Address coord = v.getMembers().get(0);
+ View winningPartition = null;
+ for (View p: mv.getSubgroups()) {
+ if (p.getMembers().get(0).equals(coord)) {
+ winningPartition = p;
+ break;
+ }
+ }
+
+ if (!winningPartition.containsMember(channel.getAddress())) return true;
+ }
+ return false;
+ }
+
public void suspect(org.jgroups.Address suspected_mbr) {
// no-op
}
Modified: trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -16,6 +16,8 @@
import org.infinispan.test.fwk.TestCacheManagerFactory;
import org.jgroups.Channel;
import org.jgroups.protocols.DISCARD;
+import org.jgroups.protocols.TP;
+import org.jgroups.stack.ProtocolStack;
import org.testng.annotations.Test;
import java.util.Arrays;
@@ -36,32 +38,21 @@
@Override
protected void createCacheManagers() throws Throwable {
- Configuration c = getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC);
- c.setLockAcquisitionTimeout(1000);
+ caches = createClusteredCaches(2, "test", getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC));
- GlobalConfiguration gc = GlobalConfiguration.getClusteredDefault();
- amendMarshaller(gc);
- minimizeThreads(gc);
- Properties newTransportProps = new Properties();
- String jgc = JGroupsConfigBuilder.getJGroupsConfig();
- String discardString = "):DISCARD(use_gui=false";
- String newString = jgc.substring(0, jgc.indexOf("):")) + discardString + jgc.substring(jgc.indexOf("):"));
- newTransportProps.put(CONFIGURATION_STRING, newString);
- gc.setTransportProperties(newTransportProps);
- CacheManager cm1 = TestCacheManagerFactory.createCacheManager(gc, c, true, false, true);
- CacheManager cm2 = TestCacheManagerFactory.createCacheManager(gc, c, true, false, true);
- registerCacheManager(cm1, cm2);
- c1 = cm1.getCache();
- c2 = cm2.getCache();
- caches = Arrays.asList(c1, c2);
+ c1 = caches.get(0);
+ c2 = caches.get(1);
d1 = getDiscardForCache(c1);
d2 = getDiscardForCache(c2);
}
- private DISCARD getDiscardForCache(Cache<?, ?> c) {
+ private DISCARD getDiscardForCache(Cache<?, ?> c) throws Exception {
JGroupsTransport jgt = (JGroupsTransport) TestingUtil.extractComponent(c, Transport.class);
Channel ch = jgt.getChannel();
- return (DISCARD) ch.getProtocolStack().findProtocol(DISCARD.class);
+ ProtocolStack ps = ch.getProtocolStack();
+ DISCARD discard = new DISCARD();
+ ps.insertProtocol(discard, ProtocolStack.ABOVE, TP.class);
+ return discard;
}
public void testCachePartition() {
Modified: trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImplTest.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -29,7 +29,7 @@
public void testNotifyViewChanged() {
Address a = EasyMock.createNiceMock(Address.class);
List<Address> addresses = Collections.emptyList();
- n.notifyViewChange(addresses, addresses, a, 100);
+ n.notifyViewChange(addresses, addresses, a, 100, false);
assert cl.invocationCount == 1;
assert ((ViewChangedEvent) cl.getEvent()).getLocalAddress() == a;
Modified: trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2010-04-12 13:44:19 UTC (rev 1683)
+++ trunk/core/src/test/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2010-04-12 14:34:20 UTC (rev 1684)
@@ -9,7 +9,6 @@
import org.infinispan.test.AbstractInfinispanTest;
import org.infinispan.test.TestingUtil;
import org.infinispan.test.fwk.TestCacheManagerFactory;
-import org.infinispan.transaction.xa.TransactionTable;
import org.infinispan.transaction.xa.TransactionTable.StaleTransactionCleanup;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
@@ -46,7 +45,7 @@
CacheManagerNotifier mockNotifier = createMock(CacheManagerNotifier.class);
CacheManagerNotifier origNotifier = TestingUtil.replaceComponent(cm1, CacheManagerNotifier.class, mockNotifier, true);
try {
- mockNotifier.notifyViewChange(isA(List.class), isA(List.class), eq(myAddress), anyInt());
+ mockNotifier.notifyViewChange(isA(List.class), isA(List.class), eq(myAddress), anyInt(), false);
replay(mockNotifier);
// start a second cache.
Cache c2 = cm2.getCache("cache");
More information about the infinispan-commits
mailing list