[infinispan-commits] Infinispan SVN: r172 - trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Apr 27 11:15:45 EDT 2009
Author: manik.surtani at jboss.com
Date: 2009-04-27 11:15:44 -0400 (Mon, 27 Apr 2009)
New Revision: 172
Added:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
Removed:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java
Modified:
trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
Log:
Removed unnecessary flush based sync
Deleted: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java 2009-04-27 14:17:28 UTC (rev 171)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java 2009-04-27 15:15:44 UTC (rev 172)
@@ -1,96 +0,0 @@
-package org.infinispan.remoting.transport.jgroups;
-
-import net.jcip.annotations.ThreadSafe;
-import org.infinispan.logging.Log;
-import org.infinispan.logging.LogFactory;
-import org.infinispan.remoting.transport.DistributedSync;
-import org.infinispan.util.Util;
-import org.infinispan.util.concurrent.ReclosableLatch;
-
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-/**
- * A DistributedSync based on JGroups' FLUSH protocol
- *
- * @author Manik Surtani
- * @since 4.0
- */
- at ThreadSafe
-public class FlushBasedDistributedSync implements DistributedSync {
-
- private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
- private final ReclosableLatch flushBlockGate = new ReclosableLatch();
- private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
- private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
- private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
- private static final Log log = LogFactory.getLog(FlushBasedDistributedSync.class);
-
- public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
- int initState = flushWaitGateCount.get();
- while (true) {
- try {
- if (!flushWaitGate.await(timeout, timeUnit))
- throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
-
- return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public SyncResponse blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
- int initState = flushBlockGateCount.get();
- while (true) {
- try {
- if (!flushBlockGate.await(timeout, timeUnit))
- throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
-
- return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void acquireSync() {
- flushBlockGate.close();
- flushWaitGateCount.incrementAndGet();
- flushWaitGate.open();
- }
-
- public void releaseSync() {
- flushWaitGate.close();
- flushBlockGateCount.incrementAndGet();
- flushBlockGate.open();
- }
-
- public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
- while (true) {
- try {
- Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
- if (!lock.tryLock(timeout, timeUnit))
- throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
- break;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- public void releaseProcessingLock() {
- try {
- if (processingLock.isWriteLockedByCurrentThread()) {
- processingLock.writeLock().unlock();
- } else {
- processingLock.readLock().unlock();
- }
- } catch (IllegalMonitorStateException imse) {
- if (log.isTraceEnabled()) log.trace("Did not own lock!");
- }
- }
-}
Copied: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java (from rev 169, trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/FlushBasedDistributedSync.java)
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java 2009-04-27 15:15:44 UTC (rev 172)
@@ -0,0 +1,96 @@
+package org.infinispan.remoting.transport.jgroups;
+
+import net.jcip.annotations.ThreadSafe;
+import org.infinispan.logging.Log;
+import org.infinispan.logging.LogFactory;
+import org.infinispan.remoting.transport.DistributedSync;
+import org.infinispan.util.Util;
+import org.infinispan.util.concurrent.ReclosableLatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A DistributedSync based on JGroups' FLUSH protocol
+ *
+ * @author Manik Surtani
+ * @since 4.0
+ */
+ at ThreadSafe
+public class JGroupsDistSync implements DistributedSync {
+
+ private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch(true);
+ private final AtomicInteger flushBlockGateCount = new AtomicInteger(0);
+ private final AtomicInteger flushWaitGateCount = new AtomicInteger(0);
+ private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+ private static final Log log = LogFactory.getLog(JGroupsDistSync.class);
+
+ public SyncResponse blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ int initState = flushWaitGateCount.get();
+ while (true) {
+ try {
+ if (!flushWaitGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+
+ return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public SyncResponse blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ int initState = flushBlockGateCount.get();
+ while (true) {
+ try {
+ if (!flushBlockGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+
+ return initState == flushWaitGateCount.get() ? SyncResponse.STATE_PREEXISTED : SyncResponse.STATE_ACHIEVED;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void acquireSync() {
+ flushBlockGate.close();
+ flushWaitGateCount.incrementAndGet();
+ flushWaitGate.open();
+ }
+
+ public void releaseSync() {
+ flushWaitGate.close();
+ flushBlockGateCount.incrementAndGet();
+ flushBlockGate.open();
+ }
+
+ public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
+ while (true) {
+ try {
+ Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
+ if (!lock.tryLock(timeout, timeUnit))
+ throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
+ break;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void releaseProcessingLock() {
+ try {
+ if (processingLock.isWriteLockedByCurrentThread()) {
+ processingLock.writeLock().unlock();
+ } else {
+ processingLock.readLock().unlock();
+ }
+ } catch (IllegalMonitorStateException imse) {
+ if (log.isTraceEnabled()) log.trace("Did not own lock!");
+ }
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsDistSync.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-04-27 14:17:28 UTC (rev 171)
+++ trunk/core/src/main/java/org/infinispan/remoting/transport/jgroups/JGroupsTransport.java 2009-04-27 15:15:44 UTC (rev 172)
@@ -74,7 +74,7 @@
ExecutorService asyncExecutor;
CacheManagerNotifier notifier;
final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
- private final FlushBasedDistributedSync flushTracker = new FlushBasedDistributedSync();
+ private final JGroupsDistSync flushTracker = new JGroupsDistSync();
long distributedSyncTimeout;
// ------------------------------------------------------------------------------------------------------------------
@@ -401,11 +401,11 @@
}
public void block() {
- flushTracker.acquireSync();
+ // no-op
}
public void unblock() {
- flushTracker.releaseSync();
+ // no-op
}
public void receive(Message msg) {
More information about the infinispan-commits
mailing list