[infinispan-commits] Infinispan SVN: r348 - trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed May 27 06:25:27 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-05-27 06:25:26 -0400 (Wed, 27 May 2009)
New Revision: 348

Modified:
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Added a block to hold back RPC during a JOIN

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/CommandAwareRpcDispatcher.java	2009-05-27 10:25:26 UTC (rev 348)
@@ -60,7 +60,7 @@
    protected boolean trace;
    ExecutorService asyncExecutor;
    InboundInvocationHandler inboundInvocationHandler;
-   DistributedSync distributedSync;
+   JGroupsDistSync distributedSync;
    long distributedSyncTimeout;
    private Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
    AtomicBoolean newCacheStarting = new AtomicBoolean(false);
@@ -73,7 +73,7 @@
                                     JGroupsTransport transport,
                                     ExecutorService asyncExecutor,
                                     InboundInvocationHandler inboundInvocationHandler,
-                                    DistributedSync distributedSync, long distributedSyncTimeout) {
+                                    JGroupsDistSync distributedSync, long distributedSyncTimeout) {
       super(channel, transport, transport, transport);
       this.asyncExecutor = asyncExecutor;
       this.inboundInvocationHandler = inboundInvocationHandler;
@@ -222,6 +222,11 @@
          if (oob) msg.setFlag(Message.OOB);
          // Replay capability requires responses from all members!
          int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
+
+         // if there is a JOIN in progress, wait for this to complete.
+         // See ISPN-83 for more details.  Once ISPN-83 is addressed, this may no longer be needed.
+         distributedSync.blockUntilNoJoinsInProgress();
+
          RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
          if (trace) log.trace("responses: {0}", retval);
 

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java	2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java	2009-05-27 10:25:26 UTC (rev 348)
@@ -27,8 +27,20 @@
    private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
    private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
    private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+   private final ReclosableLatch joinInProgress = new ReclosableLatch(false);
    private static final Log log = LogFactory.getLog(JGroupsDistSync.class);
 
+   public void blockUntilNoJoinsInProgress() {
+      while (true) {
+         try {
+            joinInProgress.await();
+            return;
+         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
    public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
       int initState = flushWaitGateCount.get();
       while (true) {
@@ -93,4 +105,12 @@
          if (log.isTraceEnabled()) log.trace("Did not own lock!");
       }
    }
+
+   public void signalJoinInProgress() {
+      joinInProgress.close();
+   }
+
+   public void signalJoinCompleted() {
+      joinInProgress.open();
+   }
 }

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-05-27 09:04:50 UTC (rev 347)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-05-27 10:25:26 UTC (rev 348)
@@ -403,11 +403,15 @@
    }
 
    public void block() {
-      // no-op
+      // Set the DistSync to indicate that a JOIN is in progress
+      // See ISPN-83 for more details.  Once ISPN-83 is addressed, this may no longer be needed.
+      flushTracker.signalJoinInProgress();
    }
 
    public void unblock() {
-      // no-op
+      // Set the DistSync to indicate that a JOIN is in progress
+      // See ISPN-83 for more details.  Once ISPN-83 is addressed, this may no longer be needed.
+      flushTracker.signalJoinCompleted();
    }
 
    public void receive(Message msg) {




More information about the infinispan-commits mailing list