[infinispan-commits] Infinispan SVN: r2496 - in branches/4.2.x/core/src: main/java/org/infinispan/distribution and 6 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Oct 8 17:09:38 EDT 2010
Author: vblagojevic at jboss.com
Date: 2010-10-08 17:09:37 -0400 (Fri, 08 Oct 2010)
New Revision: 2496
Modified:
branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
Log:
[ISPN-609] - Report Merge events via the listener API (Part 2)
Part 1: see change set 2337 on branch 4.2 2338 on trunk
Modified: branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/affinity/ListenerRegistration.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -2,6 +2,7 @@
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
@@ -21,6 +22,7 @@
}
@ViewChanged
+ @Merged
public void handleViewChange(ViewChangedEvent vce) {
keyAffinityService.handleViewChange(vce);
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/distribution/DistributionManagerImpl.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -27,6 +27,7 @@
import org.infinispan.loaders.CacheStore;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.CacheManagerNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.responses.ClusteredGetResponseValidityFilter;
@@ -391,7 +392,9 @@
@Listener
public class ViewChangeListener {
+
@ViewChanged
+ @Merged
public void handleViewChange(ViewChangedEvent e) {
log.trace("view change received. Needs to re-join? " + e.isNeedsToRejoin());
boolean started;
Modified: branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/loaders/decorators/SingletonStore.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -9,6 +9,7 @@
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.Event;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
@@ -326,6 +327,7 @@
* became the coordinator. This method will report any issues that could potentially arise from this push.
*/
@ViewChanged
+ @Merged
public void viewChange(ViewChangedEvent event) {
boolean tmp = isCoordinator(event.getNewMembers(), event.getLocalAddress());
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/CacheManagerNotifierImpl.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -7,10 +7,12 @@
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStarted;
import org.infinispan.notifications.cachemanagerlistener.annotation.CacheStopped;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStartedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.CacheStoppedEvent;
import org.infinispan.notifications.cachemanagerlistener.event.Event;
import org.infinispan.notifications.cachemanagerlistener.event.EventImpl;
+import org.infinispan.notifications.cachemanagerlistener.event.MergeEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.transport.Address;
import org.infinispan.util.logging.Log;
@@ -38,6 +40,7 @@
allowedListeners.put(CacheStarted.class, CacheStartedEvent.class);
allowedListeners.put(CacheStopped.class, CacheStoppedEvent.class);
allowedListeners.put(ViewChanged.class, ViewChangedEvent.class);
+ allowedListeners.put(Merged.class, MergeEvent.class);
}
final List<ListenerInvocation> cacheStartedListeners = new CopyOnWriteArrayList<ListenerInvocation>();
@@ -51,6 +54,7 @@
listenersMap.put(CacheStarted.class, cacheStartedListeners);
listenersMap.put(CacheStopped.class, cacheStoppedListeners);
listenersMap.put(ViewChanged.class, viewChangedListeners);
+ listenersMap.put(Merged.class, mergeListeners);
}
@Inject
Modified: branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/notifications/cachemanagerlistener/event/MergeEvent.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -10,6 +10,6 @@
* @author Manik Surtani
* @version 4.2
*/
-public interface MergeEvent {
+public interface MergeEvent extends ViewChangedEvent {
List<List<Address>> getSubgroupsMerged();
}
Modified: branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -475,14 +475,20 @@
public void viewAccepted(View newView) {
Vector<org.jgroups.Address> newMembers = newView.getMembers();
- List<Address> oldMembers = null;
+ List<Address> oldMembers = null;
+ boolean hasNotifier = notifier != null;
Notify n = null;
- if (newView instanceof MergeView) {
- if (log.isInfoEnabled()) log.info("Received new, MERGED cluster view: {0}", newView);
- if (notifier != null) n = new NotifyMerge();
- } else {
- if (log.isInfoEnabled()) log.info("Received new cluster view: {0}", newView);
- if (notifier != null) n = new NotifyViewChange();
+
+ if (hasNotifier) {
+ if (newView instanceof MergeView) {
+ if (log.isInfoEnabled())
+ log.info("Received new, MERGED cluster view: {0}", newView);
+ n = new NotifyMerge();
+ } else {
+ if (log.isInfoEnabled())
+ log.info("Received new cluster view: {0}", newView);
+ n = new NotifyViewChange();
+ }
}
synchronized (membersListLock) {
@@ -497,7 +503,7 @@
coordinator = (members != null && !members.isEmpty() && members.get(0).equals(getAddress()));
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
- if (needNotification && n != null) n.emitNotification(oldMembers, newView);
+ if (needNotification && hasNotifier) n.emitNotification(oldMembers, newView);
// Wake up any threads that are waiting to know about who the coordinator is
membersListLock.notifyAll();
Modified: branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java
===================================================================
--- branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/main/java/org/infinispan/transaction/xa/TransactionTable.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -14,6 +14,7 @@
import org.infinispan.manager.EmbeddedCacheManager;
import org.infinispan.notifications.Listener;
import org.infinispan.notifications.cachelistener.CacheNotifier;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
import org.infinispan.remoting.MembershipArithmetic;
@@ -91,7 +92,9 @@
@Listener
public class StaleTransactionCleanup {
+
@ViewChanged
+ @Merged
public void onViewChange(ViewChangedEvent vce) {
final List<Address> leavers = MembershipArithmetic.getMembersLeft(vce.getOldMembers(), vce.getNewMembers());
if (!leavers.isEmpty()) {
Modified: branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java
===================================================================
--- branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java 2010-10-04 23:48:00 UTC (rev 2495)
+++ branches/4.2.x/core/src/test/java/org/infinispan/distribution/rehash/RehashAfterPartitionMergeTest.java 2010-10-08 21:09:37 UTC (rev 2496)
@@ -1,32 +1,34 @@
package org.infinispan.distribution.rehash;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.infinispan.Cache;
-import org.infinispan.config.Configuration;
import org.infinispan.distribution.BaseDistFunctionalTest;
import org.infinispan.notifications.Listener;
+import org.infinispan.notifications.cachemanagerlistener.annotation.Merged;
import org.infinispan.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.infinispan.notifications.cachemanagerlistener.event.MergeEvent;
import org.infinispan.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.infinispan.test.MultipleCacheManagersTest;
import org.infinispan.test.TestingUtil;
import org.jgroups.protocols.DISCARD;
import org.testng.annotations.Test;
-import java.util.List;
-import java.util.concurrent.atomic.AtomicInteger;
+ at Test(groups = "functional", testName = "distribution.rehash.RehashAfterPartitionMergeTest", enabled = true, description = "Need to revisit after https://jira.jboss.org/browse/ISPN-493")
+public class RehashAfterPartitionMergeTest extends BaseDistFunctionalTest {
+
+ DISCARD d1, d2;
- at Test(groups = "functional", testName = "distribution.rehash.RehashAfterPartitionMergeTest", enabled = false, description = "Need to revisit after https://jira.jboss.org/browse/ISPN-493")
-public class RehashAfterPartitionMergeTest extends MultipleCacheManagersTest {
+
+ public RehashAfterPartitionMergeTest() {
+ super();
+ this.INIT_CLUSTER_SIZE = 2;
+ this.sync = true;
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
- Cache<Object, Object> c1, c2;
- List<Cache<Object, Object>> caches;
- DISCARD d1, d2;
-
@Override
protected void createCacheManagers() throws Throwable {
- caches = createClusteredCaches(2, "test", getDefaultClusteredConfig(Configuration.CacheMode.DIST_SYNC));
-
- c1 = caches.get(0);
- c2 = caches.get(1);
+ super.createCacheManagers();
d1 = TestingUtil.getDiscardForCache(c1);
d2 = TestingUtil.getDiscardForCache(c2);
}
@@ -35,11 +37,12 @@
c1.put("1", "value");
c2.put("2", "value");
- for (Cache<Object, Object> c: caches) {
+ for (Cache<Object, String> c: caches) {
assert "value".equals(c.get("1"));
assert "value".equals(c.get("2"));
assert manager(c).getMembers().size() == 2;
}
+
AtomicInteger ai = new AtomicInteger(0);
manager(c1).addListener(new ViewChangeListener(ai));
manager(c2).addListener(new ViewChangeListener(ai));
@@ -49,9 +52,11 @@
// Wait till *both* instances have seen the view change.
while (ai.get() < 2) TestingUtil.sleepThread(500);
-
+
+
+
// we should see a network partition
- for (Cache<Object, Object> c: caches) assert manager(c).getMembers().size() == 1;
+ for (Cache<Object, String> c: caches) assert manager(c).getMembers().size() == 1;
c1.put("3", "value");
c2.put("4", "value");
@@ -71,11 +76,9 @@
// wait till we see the view change
while (ai.get() < 2) TestingUtil.sleepThread(500);
- BaseDistFunctionalTest.RehashWaiter.waitForInitRehashToComplete(c1, c2);
-
c1.put("5", "value");
c2.put("6", "value");
- for (Cache<Object, Object> c: caches) {
+ for (Cache<Object, String> c: caches) {
assert "value".equals(c.get("5"));
assert "value".equals(c.get("6"));
assert manager(c).getMembers().size() == 2;
@@ -94,5 +97,10 @@
public void handle(ViewChangedEvent e) {
ai.getAndIncrement();
}
+
+ @Merged
+ public void merged(MergeEvent e){
+ ai.getAndIncrement();
+ }
}
}
More information about the infinispan-commits
mailing list