[infinispan-commits] Infinispan SVN: r348 - trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed May 27 06:25:27 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-05-27 06:25:26 -0400 (Wed, 27 May 2009)
New Revision: 348
Modified:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Added a block to hold back RPC during a JOIN
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-05-27 10:25:26 UTC (rev 348)
@@ -60,7 +60,7 @@
protected boolean trace;
ExecutorService asyncExecutor;
InboundInvocationHandler inboundInvocationHandler;
- DistributedSync distributedSync;
+ JGroupsDistSync distributedSync;
long distributedSyncTimeout;
private Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
AtomicBoolean newCacheStarting = new AtomicBoolean(false);
@@ -73,7 +73,7 @@
JGroupsTransport transport,
ExecutorService asyncExecutor,
InboundInvocationHandler inboundInvocationHandler,
- DistributedSync distributedSync, long distributedSyncTimeout) {
+ JGroupsDistSync distributedSync, long distributedSyncTimeout) {
super(channel, transport, transport, transport);
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
@@ -222,6 +222,11 @@
if (oob) msg.setFlag(Message.OOB);
// Replay capability requires responses from all members!
int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
+
+ // if there is a JOIN in progress, wait for this to complete.
+ // See ISPN-83 for more details. Once ISPN-83 is addressed, this may no longer be needed.
+ distributedSync.blockUntilNoJoinsInProgress();
+
RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: {0}", retval);
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java 2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java 2009-05-27 10:25:26 UTC (rev 348)
@@ -27,8 +27,20 @@
private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+ private final ReclosableLatch joinInProgress = new ReclosableLatch(false);
private static final Log log = LogFactory.getLog(JGroupsDistSync.class);
+ public void blockUntilNoJoinsInProgress() {
+ while (true) {
+ try {
+ joinInProgress.await();
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
int initState = flushWaitGateCount.get();
while (true) {
@@ -93,4 +105,12 @@
if (log.isTraceEnabled()) log.trace("Did not own lock!");
}
}
+
+ public void signalJoinInProgress() {
+ joinInProgress.close();
+ }
+
+ public void signalJoinCompleted() {
+ joinInProgress.open();
+ }
}
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-05-27 10:25:26 UTC (rev 348)
@@ -403,11 +403,15 @@
}
public void block() {
- // no-op
+ // Set the DistSync to indicate that a JOIN is in progress
+ // See ISPN-83 for more details. Once ISPN-83 is addressed, this may no longer be needed.
+ flushTracker.signalJoinInProgress();
}
public void unblock() {
- // no-op
+ // Set the DistSync to indicate that a JOIN is in progress
+ // See ISPN-83 for more details. Once ISPN-83 is addressed, this may no longer be needed.
+ flushTracker.signalJoinCompleted();
}
public void receive(Message msg) {
More information about the infinispan-commits
mailing list