Author: manik.surtani(a)jboss.com
Date: 2009-03-16 16:37:26 -0400 (Mon, 16 Mar 2009)
New Revision: 7904
Added:
core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java
core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java
core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
Log:
Basic refactorings
Modified: core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java 2009-03-12
09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactory.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -37,6 +37,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
@@ -139,4 +140,6 @@
* @return a newly constructed cache command
*/
ReplicableCommand fromStream(int id, Object[] parameters);
+
+ StateTransferControlCommand buildStateTransferControlCommand(boolean b);
}
Modified: core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java 2009-03-12
09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/commands/CommandsFactoryImpl.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -43,6 +43,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
import org.jboss.cache.commands.tx.PrepareCommand;
@@ -102,6 +103,11 @@
this.buddyFqnTransformer = buddyFqnTransformer;
}
+ public StateTransferControlCommand buildStateTransferControlCommand(boolean enabled)
+ {
+ return new StateTransferControlCommand(enabled);
+ }
+
public PutDataMapCommand buildPutDataMapCommand(GlobalTransaction gtx, Fqn fqn, Map
data)
{
PutDataMapCommand cmd = new PutDataMapCommand(gtx, fqn, data);
@@ -489,6 +495,13 @@
command = returnValue;
break;
}
+ case StateTransferControlCommand.METHOD_ID:
+ {
+ StateTransferControlCommand cmd = new StateTransferControlCommand();
+ cmd.init(rpcManager);
+ command = cmd;
+ break;
+ }
default:
throw new CacheException("Unknown command id " + id +
"!");
}
Added:
core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/commands/remote/StateTransferControlCommand.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -0,0 +1,63 @@
+package org.jboss.cache.commands.remote;
+
+import org.jboss.cache.InvocationContext;
+import org.jboss.cache.RPCManager;
+import org.jboss.cache.commands.ReplicableCommand;
+
+/**
+ * A control command for communication between peers for non-blocking state transfer
+ *
+ * @author Manik Surtani
+ */
+public class StateTransferControlCommand implements ReplicableCommand
+{
+ public static final int METHOD_ID = 49;
+ RPCManager rpcManager;
+ boolean enabled;
+
+ public StateTransferControlCommand()
+ {
+ }
+
+ public StateTransferControlCommand(boolean enabled)
+ {
+ this.enabled = enabled;
+ }
+
+ public void init(RPCManager rpcManager)
+ {
+ this.rpcManager = rpcManager;
+ }
+
+ public Object perform(InvocationContext ctx) throws Throwable
+ {
+ if (enabled)
+ rpcManager.getFlushTracker().block();
+ else
+ rpcManager.getFlushTracker().unblock();
+ return null;
+ }
+
+ public int getCommandId()
+ {
+ return METHOD_ID;
+ }
+
+ public Object[] getParameters()
+ {
+ return new Object[]{enabled};
+ }
+
+ public void setParameters(int commandId, Object[] parameters)
+ {
+ enabled = (Boolean) parameters[0];
+ }
+
+ @Override
+ public String toString()
+ {
+ return "StateTransferControlCommand{" +
+ "enabled=" + enabled +
+ '}';
+ }
+}
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2009-03-12
09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/InterceptorChain.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -286,6 +286,11 @@
{
return command.acceptVisitor(ctx, firstInChain);
}
+ catch (InterruptedException ie)
+ {
+ Thread.currentThread().interrupt();
+ return null;
+ }
catch (CacheException e)
{
throw e;
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2009-03-12
09:38:02 UTC (rev 7903)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -40,6 +40,7 @@
import org.jboss.cache.commands.remote.DataGravitationCleanupCommand;
import org.jboss.cache.commands.remote.RemoveFromBuddyGroupCommand;
import org.jboss.cache.commands.remote.ReplicateCommand;
+import org.jboss.cache.commands.remote.StateTransferControlCommand;
import org.jboss.cache.commands.tx.AbstractTransactionCommand;
import org.jboss.cache.commands.tx.CommitCommand;
import org.jboss.cache.commands.tx.OptimisticPrepareCommand;
@@ -189,6 +190,7 @@
case AnnounceBuddyPoolNameCommand.METHOD_ID:
case AssignToBuddyGroupCommand.METHOD_ID:
case RemoveFromBuddyGroupCommand.METHOD_ID:
+ case StateTransferControlCommand.METHOD_ID:
break;
// possible when we have a replication queue.
Modified: core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java 2009-03-12
09:38:02 UTC (rev 7903)
+++
core/trunk/src/main/java/org/jboss/cache/util/concurrent/ReclosableLatch.java 2009-03-16
20:37:26 UTC (rev 7904)
@@ -86,4 +86,9 @@
{
return tryAcquireSharedNanos(1, unit.toNanos(time)); // the 1 is a dummy value that
is not used.
}
+
+ public boolean isOpen()
+ {
+ return getState() == OPEN_STATE;
+ }
}