[infinispan-commits] Infinispan SVN: r172 - trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Apr 27 11:15:45 EDT 2009


Author: manik.surtani at jboss.com
Date: 2009-04-27 11:15:44 -0400 (Mon, 27 Apr 2009)
New Revision: 172

Added:
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
Removed:
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java
Modified:
   trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Removed unnecessary flush based sync

Deleted: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java	2009-04-27 14:17:28 UTC (rev 171)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java	2009-04-27 15:15:44 UTC (rev 172)
@@ -1,96 +0,0 @@
-package org.infinispan.remoting.transport.jgroups;
-
-import net.jcip.annotations.ThreadSafe;
-import org.infinispan.logging.Log;
-import org.infinispan.logging.LogFactory;
-import org.infinispan.remoting.transport.DistributedSync;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.ReclosableLatch;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * A DistributedSync based on JGroups' FLUSH protocol
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at ThreadSafe
-public class FlushBasedDistributedSync implements DistributedSync {
-
-   private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
-   private final ReclosableLatch flushBlockGate = new ReclosableLatch();
-   private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
-   private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
-   private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
-   private static final Log log = LogFactory.getLog(FlushBasedDistributedSync.class);
-
-   public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
-      int initState = flushWaitGateCount.get();
-      while (true) {
-         try {
-            if (!flushWaitGate.await(timeout, timeUnit))
-               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
-
-            return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
-         } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-         }
-      }
-   }
-
-   public SyncResponse blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
-      int initState = flushBlockGateCount.get();
-      while (true) {
-         try {
-            if (!flushBlockGate.await(timeout, timeUnit))
-               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
-
-            return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
-         } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-         }
-      }
-   }
-
-   public void acquireSync() {
-      flushBlockGate.close();
-      flushWaitGateCount.incrementAndGet();
-      flushWaitGate.open();
-   }
-
-   public void releaseSync() {
-      flushWaitGate.close();
-      flushBlockGateCount.incrementAndGet();
-      flushBlockGate.open();
-   }
-
-   public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
-      while (true) {
-         try {
-            Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
-            if (!lock.tryLock(timeout, timeUnit))
-               throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
-            break;
-         } catch (InterruptedException ie) {
-            Thread.currentThread().interrupt();
-         }
-      }
-   }
-
-   public void releaseProcessingLock() {
-      try {
-         if (processingLock.isWriteLockedByCurrentThread()) {
-            processingLock.writeLock().unlock();
-         } else {
-            processingLock.readLock().unlock();
-         }
-      } catch (IllegalMonitorStateException imse) {
-         if (log.isTraceEnabled()) log.trace("Did not own lock!");
-      }
-   }
-}

Copied: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java (from rev 169, trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java	2009-04-27 15:15:44 UTC (rev 172)
@@ -0,0 +1,96 @@
+package org.infinispan.remoting.transport.jgroups;
+
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.logging.Log;
+import org.infinispan.logging.LogFactory;
+import org.infinispan.remoting.transport.DistributedSync;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A DistributedSync based on JGroups' FLUSH protocol
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at ThreadSafe
+public class JGroupsDistSync implements DistributedSync {
+
+   private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
+   private final ReclosableLatch flushBlockGate = new ReclosableLatch(true);
+   private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
+   private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
+   private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+   private static final Log log = LogFactory.getLog(JGroupsDistSync.class);
+
+   public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
+      int initState = flushWaitGateCount.get();
+      while (true) {
+         try {
+            if (!flushWaitGate.await(timeout, timeUnit))
+               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+
+            return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
+         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public SyncResponse blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
+      int initState = flushBlockGateCount.get();
+      while (true) {
+         try {
+            if (!flushBlockGate.await(timeout, timeUnit))
+               throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+
+            return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
+         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void acquireSync() {
+      flushBlockGate.close();
+      flushWaitGateCount.incrementAndGet();
+      flushWaitGate.open();
+   }
+
+   public void releaseSync() {
+      flushWaitGate.close();
+      flushBlockGateCount.incrementAndGet();
+      flushBlockGate.open();
+   }
+
+   public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
+      while (true) {
+         try {
+            Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
+            if (!lock.tryLock(timeout, timeUnit))
+               throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
+            break;
+         } catch (InterruptedException ie) {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   public void releaseProcessingLock() {
+      try {
+         if (processingLock.isWriteLockedByCurrentThread()) {
+            processingLock.writeLock().unlock();
+         } else {
+            processingLock.readLock().unlock();
+         }
+      } catch (IllegalMonitorStateException imse) {
+         if (log.isTraceEnabled()) log.trace("Did not own lock!");
+      }
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-04-27 14:17:28 UTC (rev 171)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java	2009-04-27 15:15:44 UTC (rev 172)
@@ -74,7 +74,7 @@
    ExecutorService asyncExecutor;
    CacheManagerNotifier notifier;
    final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
-   private final FlushBasedDistributedSync flushTracker = new FlushBasedDistributedSync();
+   private final JGroupsDistSync flushTracker = new JGroupsDistSync();
    long distributedSyncTimeout;
 
    // ------------------------------------------------------------------------------------------------------------------
@@ -401,11 +401,11 @@
    }
 
    public void block() {
-      flushTracker.acquireSync();
+      // no-op
    }
 
    public void unblock() {
-      flushTracker.releaseSync();
+      // no-op
    }
 
    public void receive(Message msg) {




More information about the infinispan-commits mailing list