Author: manik.surtani(a)jboss.com
Date: 2009-03-03 09:56:48 -0500 (Tue, 03 Mar 2009)
New Revision: 7829
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
core/branches/flat/src/main/java/org/horizon/config/Configuration.java
core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
core/branches/flat/src/main/java/org/horizon/util/Util.java
core/branches/flat/src/main/resources/config-samples/all.xml
core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Log:
More NBST fixes
Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -22,7 +22,6 @@
package org.horizon.commands.tx;
import org.horizon.commands.ReplicableCommand;
-import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
@@ -53,7 +52,7 @@
this.onePhaseCommit = onePhaseCommit;
}
- public void removeModifications(Collection<VisitableCommand>
modificationsToRemove) {
+ public void removeModifications(Collection<WriteCommand> modificationsToRemove)
{
if (modifications != null) modifications.removeAll(modificationsToRemove);
}
Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -58,6 +58,10 @@
return useAsyncSerialization;
}
+ public boolean isStateTransferEnabled() {
+ return fetchInMemoryState || (cacheLoaderManagerConfig != null &&
cacheLoaderManagerConfig.isFetchPersistentState());
+ }
+
/**
* Cache replication mode.
*/
Modified: core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -50,6 +50,7 @@
private short marshallVersion = DEFAULT_MARSHALL_VERSION;
private GlobalComponentRegistry gcr;
+ private long distributedSyncTimeout = 60000; // default
/**
* Behavior of the JVM shutdown hook registered by the cache
@@ -276,6 +277,15 @@
this.marshallVersion = Version.getVersionShort(marshallVersion);
}
+ public long getDistributedSyncTimeout() {
+ return distributedSyncTimeout;
+ }
+
+ public void setDistributedSyncTimeout(long distributedSyncTimeout) {
+ testImmutability("distributedSyncTimeout");
+ this.distributedSyncTimeout = distributedSyncTimeout;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -312,6 +322,7 @@
return false;
if (transportProperties != null ?
!transportProperties.equals(that.transportProperties) : that.transportProperties != null)
return false;
+ if (distributedSyncTimeout != that.distributedSyncTimeout) return false;
return true;
}
@@ -335,6 +346,7 @@
result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
result = 31 * result + (shutdownHookBehavior != null ?
shutdownHookBehavior.hashCode() : 0);
result = 31 * result + (int) marshallVersion;
+ result = (int) (31 * result + distributedSyncTimeout);
return result;
}
Modified:
core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -276,7 +276,7 @@
throw new ConfigurationException("Unable to configure eviction", e);
}
- if (p != null && !p.isEmpty()) XmlConfigHelper. setValues(cfg, p, false,
true);
+ if (p != null && !p.isEmpty()) XmlConfigHelper.setValues(cfg, p, false,
true);
evictionConfig.setAlgorithmConfig(cfg);
config.setEvictionConfig(evictionConfig);
@@ -369,6 +369,9 @@
tmp = getAttributeValue(e, "clusterName");
if (existsAttribute(tmp)) gc.setClusterName(tmp);
+ tmp = getAttributeValue(e, "distributedSyncTimeout");
+ if (existsAttribute(tmp)) gc.setDistributedSyncTimeout(getLong(tmp));
+
Properties p = XmlConfigHelper.extractProperties(e);
if (p != null) gc.setTransportProperties(p);
}
Modified: core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
*/
package org.horizon.context;
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.transaction.GlobalTransaction;
import javax.transaction.Transaction;
@@ -41,7 +41,7 @@
*
* @param command modification
*/
- void addModification(VisitableCommand command);
+ void addModification(WriteCommand command);
/**
* Returns all modifications. If there are no modifications in this transaction this
method will return an empty
@@ -49,7 +49,7 @@
*
* @return list of modifications.
*/
- List<VisitableCommand> getModifications();
+ List<WriteCommand> getModifications();
/**
* Adds a modification to the local modification list.
@@ -57,7 +57,7 @@
* @param command command to add to list. Should not be null.
* @throws NullPointerException if the command to be added is null.
*/
- void addLocalModification(VisitableCommand command);
+ void addLocalModification(WriteCommand command);
/**
* Returns all modifications that have been invoked with the LOCAL cache mode option.
These will also be in the
@@ -65,7 +65,7 @@
*
* @return list of LOCAL modifications, or an empty list.
*/
- List<VisitableCommand> getLocalModifications();
+ List<WriteCommand> getLocalModifications();
/**
* Adds the key that has been removed in the scope of the current transaction.
Modified:
core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
*/
package org.horizon.context;
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.container.MVCCEntry;
import org.horizon.transaction.GlobalTransaction;
import org.horizon.util.FastCopyHashMap;
@@ -50,12 +50,12 @@
/**
* List<VisitableCommand> of modifications. They will be replicated on
TX commit
*/
- private List<VisitableCommand> modificationList;
+ private List<WriteCommand> modificationList;
/**
* A list of modifications that have been encountered with a LOCAL mode option. These
will be removed from the
* modification list during replication.
*/
- private List<VisitableCommand> localModifications;
+ private List<WriteCommand> localModifications;
/**
* A list of dummy uninitialised entries created by the cache loader interceptor to
load data for a given entry in
@@ -97,24 +97,24 @@
lookedUpEntries.putAll(entries);
}
- public void addModification(VisitableCommand command) {
+ public void addModification(WriteCommand command) {
if (command == null) return;
- if (modificationList == null) modificationList = new
LinkedList<VisitableCommand>();
+ if (modificationList == null) modificationList = new
LinkedList<WriteCommand>();
modificationList.add(command);
}
- public List<VisitableCommand> getModifications() {
+ public List<WriteCommand> getModifications() {
if (modificationList == null) return Collections.emptyList();
return modificationList;
}
- public void addLocalModification(VisitableCommand command) {
+ public void addLocalModification(WriteCommand command) {
if (command == null) throw new NullPointerException("Command is null!");
- if (localModifications == null) localModifications = new
LinkedList<VisitableCommand>();
+ if (localModifications == null) localModifications = new
LinkedList<WriteCommand>();
localModifications.add(command);
}
- public List<VisitableCommand> getLocalModifications() {
+ public List<WriteCommand> getLocalModifications() {
if (localModifications == null) return Collections.emptyList();
return localModifications;
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -31,6 +31,7 @@
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.CacheLoaderManagerConfig;
import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
@@ -239,14 +240,14 @@
if (transactionContext == null) {
throw new Exception("transactionContext for transaction " + gtx +
" not found in transaction table");
}
- List<VisitableCommand> modifications =
transactionContext.getModifications();
+ List<WriteCommand> modifications = transactionContext.getModifications();
if (modifications.size() == 0) {
log.trace("Transaction has not logged any modifications!");
return;
}
log.trace("Cache loader modification list: {0}", modifications);
StoreModificationsBuilder modsBuilder = new
StoreModificationsBuilder(getStatisticsEnabled());
- for (VisitableCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx,
modsBuilder);
+ for (WriteCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx,
modsBuilder);
int numMods = modsBuilder.modifications.size();
log.trace("Converted method calls to cache loader modifications. List size:
{0}", numMods);
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -30,6 +30,7 @@
import org.horizon.commands.write.ClearCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.transaction.GlobalTransaction;
@@ -99,7 +100,7 @@
return handleAlterCacheMethod(ctx, command);
}
- private Object handleAlterCacheMethod(InvocationContext ctx, VisitableCommand
command)
+ private Object handleAlterCacheMethod(InvocationContext ctx, WriteCommand command)
throws Throwable {
Object result = invokeCommand(ctx, command);
if (ctx.isValidTransaction()) {
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -217,7 +217,8 @@
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
- if (log.isDebugEnabled()) log.debug("Cache [" +
rpcManager.getAddress() + "] replicating " + command);
+ if (log.isDebugEnabled())
+ log.debug("Cache [" + rpcManager.getTransport().getAddress() +
"] replicating " + command);
// voila, invalidated!
replicateCall(ctx, command, synchronous);
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -195,7 +195,7 @@
* @return true if the gtx is remote, false if it originated locally.
*/
private boolean isRemoteGlobalTx(GlobalTransaction gtx) {
- return gtx != null && (gtx.getAddress() != null) &&
(!gtx.getAddress().equals(rpcManager.getAddress()));
+ return gtx != null && (gtx.getAddress() != null) &&
(!gtx.getAddress().equals(rpcManager.getTransport().getAddress()));
}
private void copyInvocationScopeOptionsToTxScope(InvocationContext ctx) {
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -112,7 +112,7 @@
if (command.isSuccessful()) {
if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
if (trace) {
- log.trace("invoking method " +
command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() +
", mode=" +
+ log.trace("invoking method " +
command.getClass().getSimpleName() + ", members=" +
rpcManager.getTransport().getMembers() + ", mode=" +
configuration.getCacheMode() + ", exclude_self=" + true +
", timeout=" +
configuration.getSyncReplTimeout());
}
@@ -138,7 +138,7 @@
protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx,
InvocationContext ctx) throws Throwable {
boolean async = configuration.getCacheMode() ==
Configuration.CacheMode.REPL_ASYNC;
if (trace) {
- log.trace("(" + rpcManager.getAddress() + "): running remote
prepare for global tx " + gtx + " with async mode=" + async);
+ log.trace("(" + rpcManager.getTransport().getAddress() + "):
running remote prepare for global tx " + gtx + " with async mode=" +
async);
}
// this method will return immediately if we're the only member (because
exclude_self=true)
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -28,6 +28,7 @@
import org.horizon.commands.tx.CommitCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.tx.RollbackCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
import org.horizon.factories.ComponentRegistry;
@@ -42,6 +43,7 @@
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.remoting.ReplicationException;
import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
import org.horizon.transaction.TransactionTable;
import org.horizon.util.concurrent.ConcurrentHashSet;
@@ -74,6 +76,7 @@
private ComponentRegistry componentRegistry;
private ContextFactory contextFactory;
private CacheManager cacheManager;
+ private TransactionLog transactionLog;
/**
* List <Transaction>that we have registered for
@@ -89,7 +92,8 @@
@Inject
public void intialize(CacheManager cacheManager, ContextFactory contextFactory,
CacheNotifier notifier, InvocationContextContainer icc,
- CommandsFactory factory, ComponentRegistry componentRegistry,
LockManager lockManager) {
+ CommandsFactory factory, ComponentRegistry componentRegistry,
LockManager lockManager,
+ TransactionLog transactionLog) {
this.contextFactory = contextFactory;
this.commandsFactory = factory;
this.cacheManager = cacheManager;
@@ -97,6 +101,7 @@
this.invocationContextContainer = icc;
this.componentRegistry = componentRegistry;
this.lockManager = lockManager;
+ this.transactionLog = transactionLog;
setStatisticsEnabled(configuration.isExposeManagementStatistics());
}
@@ -226,7 +231,11 @@
@Override
public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws
Throwable {
try {
- return attachGtxAndPassUpChain(ctx, command);
+ Object retval = attachGtxAndPassUpChain(ctx, command);
+ // log non-transactional modification
+ if (command instanceof WriteCommand && ctx.getTransaction() == null)
+ transactionLog.logNoTxWrite((WriteCommand) command);
+ return retval;
}
catch (Throwable throwable) {
throwIfNeeded(ctx, throwable);
@@ -326,7 +335,9 @@
if (trace)
log.trace("Using one-phase prepare. Not propagating the prepare call
up the stack until called to do so by the sync handler.");
} else {
- // now pass up the prepare method itself.
+ // first log the transaction...
+ transactionLog.logPrepare(command);
+ // then pass up the prepare method itself.
invokeNextInterceptor(ctx, command);
}
// JBCACHE-361 Confirm that the transaction is ACTIVE
@@ -439,20 +450,24 @@
// Transaction phase runners
// --------------------------------------------------------------
- protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List
modifications, boolean onePhaseCommit) {
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx,
List<WriteCommand> modifications, boolean onePhaseCommit) {
return commandsFactory.buildPrepareCommand(gtx, modifications,
cacheManager.getAddress(), onePhaseCommit);
}
/**
* creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List
modifications, boolean onePhaseCommit) {
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx,
List<WriteCommand> modifications, boolean onePhaseCommit) {
try {
VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx,
modifications, true) : commandsFactory.buildCommitCommand(gtx);
if (trace) log.trace("Running commit for " + gtx);
handleCommitRollback(ctx, commitCommand);
+ if (onePhaseCommit)
+ transactionLog.logOnePhaseCommit(gtx, modifications);
+ else
+ transactionLog.logCommit(gtx);
}
catch (Throwable e) {
log.warn("Commit failed. Clearing stale locks.");
@@ -487,6 +502,7 @@
// JBCACHE-457
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
if (trace) log.trace(" running rollback for {0}", gtx);
+ transactionLog.rollback(gtx);
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
@@ -515,10 +531,12 @@
* Handles a local prepare - invoked by the sync handler. Tests if the current tx
matches the gtx passed in to the
* method call and passes the prepare() call up the chain.
*/
- public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<VisitableCommand> modifications) throws Throwable {
+ public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<WriteCommand> modifications) throws Throwable {
// running a 2-phase commit.
- VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ transactionLog.logPrepare(prepareCommand);
+
Object result;
// Is there a local transaction associated with GTX ?
@@ -624,7 +642,7 @@
private class RemoteSynchronizationHandler implements Synchronization {
Transaction tx = null;
GlobalTransaction gtx = null;
- List<VisitableCommand> modifications = null;
+ List<WriteCommand> modifications = null;
TransactionContext transactionContext = null;
protected InvocationContext ctx; // the context for this call.
Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -307,17 +307,17 @@
public List<Address> getMembers() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager == null ? null : rpcManager.getMembers();
+ return rpcManager == null ? null : rpcManager.getTransport().getMembers();
}
public Address getAddress() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager == null ? null : rpcManager.getAddress();
+ return rpcManager == null ? null : rpcManager.getTransport().getAddress();
}
public boolean isCoordinator() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager != null && rpcManager.isCoordinator();
+ return rpcManager != null && rpcManager.getTransport().isCoordinator();
}
private Cache createCache(String cacheName) {
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -25,6 +25,7 @@
import org.horizon.atomic.DeltaAware;
import org.horizon.commands.RemoteCommandFactory;
import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.io.ByteBuffer;
import org.horizon.io.ExposedByteArrayOutputStream;
import org.horizon.logging.Log;
@@ -32,6 +33,7 @@
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.jgroups.JGroupsAddress;
import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
import org.horizon.util.FastCopyHashMap;
import org.horizon.util.Immutables;
import org.jboss.util.NotImplementedException;
@@ -79,6 +81,7 @@
protected static final int MAGICNUMBER_OBJECT = 22;
protected static final int MAGICNUMBER_SINGLETON_LIST = 23;
protected static final int MAGICNUMBER_COMMAND = 24;
+ protected static final int MAGICNUMBER_TRANSACTION_LOG = 25;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
protected static final int MAGICNUMBER_REF = 101;
@@ -189,6 +192,11 @@
out.writeByte(MAGICNUMBER_STRING);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallString((String) o, out);
+ } else if (o instanceof TransactionLog.LogEntry) {
+ out.writeByte(MAGICNUMBER_TRANSACTION_LOG);
+ TransactionLog.LogEntry le = (TransactionLog.LogEntry) o;
+ marshallObject(le.getTransaction(), out, refMap);
+ marshallObject(le.getModifications(), out, refMap);
} else if (o instanceof Serializable) {
if (trace) {
log.trace("Warning: using object serialization for " +
o.getClass());
@@ -305,6 +313,9 @@
case MAGICNUMBER_JG_ADDRESS:
retVal = unmarshallJGroupsAddress(in);
return retVal;
+ case MAGICNUMBER_TRANSACTION_LOG:
+ retVal = new TransactionLog.LogEntry((GlobalTransaction) unmarshallObject(in,
refMap), (List<WriteCommand>) unmarshallObject(in, refMap));
+ return retVal;
case MAGICNUMBER_ARRAY:
return unmarshallArray(in, refMap);
case MAGICNUMBER_ARRAY_LIST:
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -44,8 +44,7 @@
}
if (!cr.getStatus().allowInvocations()) {
- if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in
a state to handle invocations. Its state is {1}", cacheName, cr.getStatus());
- return null;
+ throw new IllegalStateException("Cache named " + cacheName + "
exists but isn't in a state to handle invocations. Its state is " +
cr.getStatus());
}
InterceptorChain ic = cr.getComponent(InterceptorChain.class);
@@ -69,7 +68,7 @@
private StateTransferManager getStateTransferManager(String cacheName) throws
StateTransferException {
ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
if (cr == null) {
- String msg = "Cache named "+cacheName+" does not exist on this
cache manager!";
+ String msg = "Cache named " + cacheName + " does not exist on
this cache manager!";
log.info(msg);
throw new StateTransferException(msg);
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
import org.horizon.factories.scopes.Scopes;
import org.horizon.lifecycle.Lifecycle;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import org.horizon.statetransfer.StateTransferException;
import java.util.List;
@@ -48,78 +49,76 @@
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the
- * entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May
not be supported by all
- * implementations.
- * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication
exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages.
May not be supported by all
+ * implementations.
+ * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages
need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter, boolean stateTransferEnabled) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the
- * entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May
not be supported by all
- * implementations.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication
exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages.
May not be supported by all
+ * implementations.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages
need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean
stateTransferEnabled) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the
call is broadcast to the entire
- * cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication
exception.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages
need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws
Exception;
/**
- * @return true if the current Channel is the coordinator of the cluster.
- */
- boolean isCoordinator();
-
- /**
- * @return the Address of the current coordinator.
- */
- Address getCoordinator();
-
- /**
- * Retrieves the current cache instance's network address
+ * Initiates a state retrieval process from neighbouring caches. This method will
block until it either times out,
+ * or state is retrieved and applied.
*
- * @return an Address
+ * @param cacheName name of cache requesting state
+ * @param timeout length of time to try to retrieve state on each peer
+ * @throws org.horizon.statetransfer.StateTransferException
+ * in the event of problems
*/
- Address getAddress();
+ void retrieveState(String cacheName, long timeout) throws StateTransferException;
/**
- * Returns a list of members in the current cluster view.
- *
- * @return a list of members. Typically, this would be defensively copied.
+ * @return a reference to the underlying transport.
*/
- List<Address> getMembers();
+ Transport getTransport();
/**
- * Initiates a state retrieval process from neighbouring caches. This method will
block until it either times out,
- * or state is retrieved and applied.
+ * If {@link #retrieveState(String, long)} has been invoked and hasn't yet
returned (i.e., a state transfer is in
+ * progress), this method will return the current Address from which a state transfer
is being attempted. Otherwise,
+ * this method returns a null.
*
- * @param cacheName name of cache requesting state
- * @param timeout length of time to try to retrieve state on each peer
- * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+ * @return the current Address from which a state transfer is being attempted, if a
state transfer is in progress, or
+ * a null otherwise.
*/
- void retrieveState(String cacheName, long timeout) throws StateTransferException;
+ Address getCurrentStateTransferSource();
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -38,6 +38,7 @@
private final AtomicLong replicationFailures = new AtomicLong(0);
boolean statisticsEnabled = false; // by default, don't gather statistics.
private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
+ private volatile Address currentStateTransferSource;
@Inject
public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t,
InboundInvocationHandler handler,
@@ -45,7 +46,8 @@
@ComponentName(KnownComponentNames.ASYNC_SERIALIZATION_EXECUTOR) ExecutorService e,
CacheManagerNotifier notifier) {
this.t = t;
- this.t.initialize(globalConfiguration,
globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
+ this.t.initialize(globalConfiguration,
globalConfiguration.getTransportProperties(), marshaller, e, handler,
+ notifier, globalConfiguration.getDistributedSyncTimeout());
}
@Start(priority = 10)
@@ -58,36 +60,20 @@
t.stop();
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue,
responseFilter);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter, boolean stateTransferEnabled) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue,
responseFilter, stateTransferEnabled);
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue,
null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean
stateTransferEnabled) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue,
null, stateTransferEnabled);
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws
Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null,
stateTransferEnabled);
}
- public boolean isCoordinator() {
- return t.isCoordinator();
- }
-
- public Address getCoordinator() {
- return t.getCoordinator();
- }
-
- public Address getAddress() {
- return t.getAddress();
- }
-
- public List<Address> getMembers() {
- return t.getMembers();
- }
-
public void retrieveState(String cacheName, long timeout) throws
StateTransferException {
- List<Address> members = getMembers();
+ List<Address> members = t.getMembers();
if (members.size() < 2) {
if (log.isDebugEnabled())
log.debug("We're the only member in the cluster; no one to retrieve
state from. Not doing anything!");
@@ -95,38 +81,56 @@
}
boolean success = false;
- outer:
- for (int i = 0, wait = 1000; i < 5; i++) {
- for (Address member : members) {
- if (!member.equals(getAddress())) {
- try {
- if (log.isInfoEnabled()) log.info("Trying to fetch state from
{0}", member);
- if (t.retrieveState(cacheName, member, timeout)) {
- if (log.isInfoEnabled()) log.info("Successfully retrieved and
applied state from {0}", member);
- success = true;
- break outer;
+
+ try {
+
+ outer:
+ for (int i = 0, wait = 1000; i < 5; i++) {
+ for (Address member : members) {
+ if (!member.equals(t.getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from
{0}", member);
+ currentStateTransferSource = member;
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled()) log.info("Successfully retrieved
and applied state from {0}", member);
+ success = true;
+ break outer;
+ }
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state
from member " + member, e);
+ } finally {
+ currentStateTransferSource = null;
}
- } catch (StateTransferException e) {
- if (log.isDebugEnabled()) log.debug("Error while fetching state
from member " + member, e);
}
- }
- if (!success) {
- if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying");
+ if (!success) {
+ if (log.isWarnEnabled())
+ log.warn("Could not find available peer for state, backing off
and retrying");
- try {
- Thread.sleep(wait <<= 2);
+ try {
+ Thread.sleep(wait <<= 2);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
}
}
+ } finally {
+ currentStateTransferSource = null;
}
if (!success) throw new StateTransferException("Unable to fetch state on
startup");
}
+ public Transport getTransport() {
+ return t;
+ }
+
+ public Address getCurrentStateTransferSource() {
+ return currentStateTransferSource;
+ }
+
// -------------------------------------------- JMX information
-----------------------------------------------
@ManagedOperation
Modified: core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -67,6 +67,7 @@
private Configuration configuration;
private boolean enabled;
private CommandsFactory commandsFactory;
+ private boolean stateTransferEnabled;
public boolean isEnabled() {
return enabled;
@@ -86,6 +87,7 @@
*/
@Start
public synchronized void start() {
+ stateTransferEnabled = configuration.isStateTransferEnabled();
long interval = configuration.getReplQueueInterval();
log.trace("Starting replication queue, with interval {0} and maxElements
{1}", interval, maxElements);
this.maxElements = configuration.getReplQueueMaxElements();
@@ -142,7 +144,7 @@
log.trace("Flushing {0} elements", toReplicateSize);
ReplicateCommand replicateCommand =
commandsFactory.buildReplicateCommand(toReplicate);
// send to all live caches in the cluster
- rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS,
configuration.getSyncReplTimeout());
+ rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS,
configuration.getSyncReplTimeout(), stateTransferEnabled);
}
catch (Throwable t) {
log.error("failed replicating " + toReplicate.size() + "
elements in replication queue", t);
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -0,0 +1,68 @@
+package org.horizon.remoting.transport;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is an abstraction of a cluster-wide synchronization. Its purpose is to maintain
a set of locks that are aware
+ * of block and unblock commands issued across a cluster. In addition to these block and
unblock phases, sub-phases
+ * such as a start processing
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+@ThreadSafe
+public interface DistributedSync {
+
+ /**
+ * @return the number of syncs that have occured
+ */
+ int getSyncCount();
+
+ /**
+ * Blocks until the start of a sync - either by the current RPCManager instance or a
remote one - is received. This
+ * should return immediately if sync is already in progress.
+ *
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for the sync times out.
+ */
+ void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ /**
+ * Blocks until an ongoing sync ends. This is returns immediately if there is no
ongoing sync.
+ *
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for an ongoing sync to end times out.
+ */
+ void blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ /**
+ * Acquires the sync. This could be from a local or remote source.
+ */
+ void acquireSync();
+
+ /**
+ * Releases the sync. This could be from a local or remote source.
+ */
+ void releaseSync();
+
+ /**
+ * Acquires a processing lock. This is typically acquired after the sync is acquired,
and is meant for local (not
+ * remote) use.
+ *
+ * @param exclusive whether the lock is exclusive
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for the lock times out
+ */
+ void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws
TimeoutException;
+
+ /**
+ * Releases any processing locks that may be held by the current thread.
+ */
+ void releaseProcessingLock();
+}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -31,14 +31,16 @@
/**
* Initializes the transport with global cache configuration and transport-specific
properties.
*
- * @param c global cache-wide configuration
- * @param p properties to set
- * @param marshaller marshaller to use for marshalling and unmarshalling
- * @param asyncExecutor executor to use for asynchronous calls
- * @param handler handler for invoking remotely originating calls on the local
cache
+ * @param c global cache-wide configuration
+ * @param p properties to set
+ * @param marshaller marshaller to use for marshalling and unmarshalling
+ * @param asyncExecutor executor to use for asynchronous calls
+ * @param handler handler for invoking remotely originating calls on
the local cache
+ * @param notifier notifier to use
+ * @param distributedSyncTimeout timeout to wait for distributed syncs
*/
void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller,
ExecutorService asyncExecutor,
- InboundInvocationHandler handler, CacheManagerNotifier notifier);
+ InboundInvocationHandler handler, CacheManagerNotifier notifier, long
distributedSyncTimeout);
/**
* Invokes an RPC call on other caches in the cluster.
@@ -51,10 +53,11 @@
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May
not be supported by all
* implementations.
* @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param supportReplay whether replays of missed messages is supported
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter, boolean supportReplay) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
@@ -81,14 +84,34 @@
List<Address> getMembers();
/**
- * Initiates a state retrieval from a specific cache (by typically invoking {@link
org.horizon.remoting.InboundInvocationHandler#generateState(String,
java.io.OutputStream)}),
- * and applies this state to the current cache via the {@link
InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+ * Initiates a state retrieval from a specific cache (by typically invoking {@link
+ * org.horizon.remoting.InboundInvocationHandler#generateState(String,
java.io.OutputStream)}), and applies this
+ * state to the current cache via the {@link
InboundInvocationHandler#applyState(String, java.io.InputStream)}
+ * callback.
*
* @param cacheName name of cache for which to retrieve state
- * @param address address of remote cache from which to retrieve state
- * @param timeout state retrieval timeout in milliseconds
- * @throws org.horizon.statetransfer.StateTransferException if state cannot be
retrieved from the specific cache
+ * @param address address of remote cache from which to retrieve state
+ * @param timeout state retrieval timeout in milliseconds
* @return true if state was transferred and applied successfully, false if it timed
out.
+ * @throws org.horizon.statetransfer.StateTransferException
+ * if state cannot be retrieved from the specific cache
*/
boolean retrieveState(String cacheName, Address address, long timeout) throws
StateTransferException;
+
+ /**
+ * @return an instance of a DistributedSync that can be used to wait for
synchronization events across a cluster.
+ */
+ DistributedSync getDistributedSync();
+
+ /**
+ * Blocks all RPC calls to and between a set of Addresses. If a null is passed in,
the entire cluster is blocked.
+ *
+ * @param addresses addresses to block
+ */
+ void blockRPC(Address... addresses);
+
+ /**
+ * Releases a block performed by calling {@link #blockRPC(Address[])}
+ */
+ void unblockRPC();
}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.remoting.InboundInvocationHandler;
+import org.horizon.remoting.transport.DistributedSync;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Message;
@@ -38,10 +39,12 @@
import org.jgroups.util.RspList;
import java.io.NotSerializableException;
+import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
@@ -53,7 +56,10 @@
protected boolean trace;
ExecutorService asyncExecutor;
InboundInvocationHandler inboundInvocationHandler;
+ DistributedSync distributedSync;
+ long distributedSyncTimeout;
private Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
+ private static final RequestIgnoredResponse REQUEST_IGNORED_RESPONSE = new
RequestIgnoredResponse();
public CommandAwareRpcDispatcher() {
}
@@ -61,12 +67,14 @@
public CommandAwareRpcDispatcher(Channel channel,
JGroupsTransport transport,
ExecutorService asyncExecutor,
- InboundInvocationHandler inboundInvocationHandler) {
+ InboundInvocationHandler inboundInvocationHandler,
+ DistributedSync distributedSync, long
distributedSyncTimeout) {
super(channel, transport, transport, transport);
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
-
+ this.distributedSync = distributedSync;
trace = log.isTraceEnabled();
+ this.distributedSyncTimeout = distributedSyncTimeout;
}
protected boolean isValid(Message req) {
@@ -83,9 +91,9 @@
* org.jgroups.blocks.RspFilter)} except that this version is aware of {@link
ReplicableCommand} objects.
*/
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand
command, int mode, long timeout,
- boolean anycasting, boolean oob, RspFilter
filter)
+ boolean anycasting, boolean oob, RspFilter filter,
boolean supportReplay)
throws NotSerializableException, ExecutionException, InterruptedException {
- ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout,
anycasting, filter);
+ ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout,
anycasting, filter, supportReplay);
if (mode == GroupRequest.GET_NONE) {
asyncExecutor.submit(task);
@@ -131,8 +139,40 @@
protected Object executeCommand(RPCCommand cmd, Message req) throws Throwable {
if (cmd == null) throw new NullPointerException("Unable to execute a null
command! Message was " + req);
- if (trace) log.trace("Executing command: {0} [sender={1}]", cmd,
req.getSrc());
- return inboundInvocationHandler.handle(cmd);
+ if (trace) log.trace("Attempting to execute command: {0} [sender={1}]",
cmd, req.getSrc());
+
+ boolean unlock = false;
+ try {
+
+ int flushCount = distributedSync.getSyncCount();
+ distributedSync.acquireProcessingLock(false, distributedSyncTimeout,
MILLISECONDS);
+ unlock = true;
+
+ distributedSync.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
+
+ // If this thread blocked during a NBST flush, then inform the sender
+ // it needs to replay ignored messages
+ boolean replayIgnored = distributedSync.getSyncCount() != flushCount;
+
+ Object retval;
+ try {
+ retval = inboundInvocationHandler.handle(cmd);
+ } catch (IllegalStateException ise) {
+ if (trace) log.trace("Unable to execute command, cache not in a
receptive state");
+ // cache not in a started state, request replay
+ return REQUEST_IGNORED_RESPONSE;
+ }
+
+ if (replayIgnored) {
+ ExtendedResponse extended = new ExtendedResponse(retval);
+ extended.setReplayIgnoredRequests(true);
+ retval = extended;
+ }
+ return retval;
+
+ } finally {
+ if (unlock) distributedSync.releaseProcessingLock();
+ }
}
@Override
@@ -149,10 +189,11 @@
private long timeout;
private boolean anycasting;
private RspFilter filter;
+ boolean supportReplay = false;
private ReplicationTask(ReplicableCommand command, boolean oob,
Vector<Address> dests,
int mode, long timeout,
- boolean anycasting, RspFilter filter) {
+ boolean anycasting, RspFilter filter, boolean
supportReplay) {
this.command = command;
this.oob = oob;
this.dests = dests;
@@ -160,6 +201,7 @@
this.timeout = timeout;
this.anycasting = anycasting;
this.filter = filter;
+ this.supportReplay = supportReplay;
}
public RspList call() throws Exception {
@@ -174,6 +216,8 @@
Message msg = new Message();
msg.setBuffer(buf);
if (oob) msg.setFlag(Message.OOB);
+ // Replay capability requires responses from all members!
+ int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: {0}", retval);
@@ -183,6 +227,30 @@
if (retval == null)
throw new NotSerializableException("RpcDispatcher returned a null. This
is most often caused by args for "
+ command.getClass().getSimpleName() + " not being
serializable.");
+
+ if (supportReplay) {
+ boolean replay = false;
+ Vector<Address> ignorers = new Vector<Address>();
+ for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
+ Object value = entry.getValue().getValue();
+ if (value instanceof RequestIgnoredResponse) {
+ ignorers.add(entry.getKey());
+ } else if (value instanceof ExtendedResponse) {
+ ExtendedResponse extended = (ExtendedResponse) value;
+ replay |= extended.isReplayIgnoredRequests();
+ entry.getValue().setValue(extended.getResponse());
+ }
+ }
+
+ if (replay && ignorers.size() > 0) {
+ if (trace)
+ log.trace("Replaying message to ignoring senders: " +
ignorers);
+ RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL,
timeout, anycasting, filter);
+ if (responses != null)
+ retval.putAll(responses);
+ }
+ }
+
return retval;
}
}
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -0,0 +1,50 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * A response with extended information
+ *
+ * @author Jason T. Greene
+ */
+public class ExtendedResponse implements Serializable {
+ private boolean replayIgnoredRequests;
+ private final Object response;
+
+ public ExtendedResponse(Object response) {
+ this.response = response;
+ }
+
+ public boolean isReplayIgnoredRequests() {
+ return replayIgnoredRequests;
+ }
+
+ public void setReplayIgnoredRequests(boolean replayIgnoredRequests) {
+ this.replayIgnoredRequests = replayIgnoredRequests;
+ }
+
+ public Object getResponse() {
+ return response;
+ }
+}
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -0,0 +1,98 @@
+package org.horizon.remoting.transport.jgroups;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.util.Util;
+import org.horizon.util.concurrent.ReclosableLatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A DistributedSync based on JGroups' FLUSH protocol
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+@ThreadSafe
+public class FlushBasedDistributedSync implements DistributedSync {
+
+ private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ private final AtomicInteger flushCompletionCount = new AtomicInteger();
+ private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+ private static final Log log = LogFactory.getLog(FlushBasedDistributedSync.class);
+
+ public int getSyncCount() {
+ return flushCompletionCount.get();
+ }
+
+ public void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws
TimeoutException {
+ while (true) {
+ try {
+ if (!flushWaitGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync
to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void blockUntilReleased(long timeout, TimeUnit timeUnit) throws
TimeoutException {
+ while (true) {
+ try {
+ if (!flushBlockGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync
to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void acquireSync() {
+ flushBlockGate.close();
+ flushWaitGate.open();
+ }
+
+ public void releaseSync() {
+ flushWaitGate.close();
+ flushCompletionCount.incrementAndGet();
+ flushBlockGate.open();
+ }
+
+ public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit)
throws TimeoutException {
+ Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
+ while (true) {
+ try {
+ if (!lock.tryLock(timeout, timeUnit))
+ throw new TimeoutException("Could not obtain " + (exclusive ?
"exclusive" : "shared") + " processing lock");
+
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void releaseProcessingLock() {
+ try {
+ if (processingLock.isWriteLockedByCurrentThread()) {
+ processingLock.writeLock().unlock();
+ } else {
+ processingLock.readLock().unlock();
+ }
+ } catch (IllegalMonitorStateException imse) {
+ if (log.isTraceEnabled()) log.trace("Did not own lock!");
+ }
+ }
+}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -14,15 +14,16 @@
import org.horizon.remoting.ResponseFilter;
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.DistributedSync;
import org.horizon.remoting.transport.Transport;
import org.horizon.statetransfer.StateTransferException;
import org.horizon.util.FileLookup;
import org.horizon.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
@@ -40,6 +41,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* An encapsulation of a JGroups transport
@@ -47,7 +50,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class JGroupsTransport implements Transport, MembershipListener,
ExtendedMessageListener {
+public class JGroupsTransport implements Transport, ExtendedMembershipListener,
ExtendedMessageListener {
public static final String CONFIGURATION_STRING = "configurationString";
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
@@ -68,26 +71,24 @@
ExecutorService asyncExecutor;
CacheManagerNotifier notifier;
final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new
ConcurrentHashMap<String, StateTransferMonitor>();
+ private final FlushBasedDistributedSync flushTracker = new
FlushBasedDistributedSync();
+ volatile List<org.jgroups.Address> membersBlocked;
+ AtomicBoolean flushInProgress = new AtomicBoolean(false);
+ long distributedSyncTimeout;
- /**
- * Reference to an exception that was raised during state installation on this node.
- */
- protected volatile Exception setStateException;
- private final Object stateLock = new Object();
-
-
//
------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
//
------------------------------------------------------------------------------------------------------------------
public void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller,
ExecutorService asyncExecutor,
- InboundInvocationHandler inboundInvocationHandler,
CacheManagerNotifier notifier) {
+ InboundInvocationHandler inboundInvocationHandler,
CacheManagerNotifier notifier, long distributedSyncTimeout) {
this.c = c;
this.p = p;
this.marshaller = marshaller;
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
this.notifier = notifier;
+ this.distributedSyncTimeout = distributedSyncTimeout;
}
public void start() {
@@ -137,7 +138,7 @@
channel.setOpt(Channel.AUTO_GETSTATE, false);
channel.setOpt(Channel.BLOCK, true);
dispatcher = new CommandAwareRpcDispatcher(channel, this,
- asyncExecutor,
inboundInvocationHandler);
+ asyncExecutor, inboundInvocationHandler,
flushTracker, distributedSyncTimeout);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
dispatcher.setRequestMarshaller(adapter);
dispatcher.setResponseMarshaller(adapter);
@@ -228,7 +229,7 @@
cleanup = true;
((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout,
false);
mon.waitForState();
- return true;
+ return mon.getSetStateException() == null;
} catch (StateTransferException ste) {
throw ste;
} catch (Exception e) {
@@ -239,6 +240,72 @@
}
}
+ public DistributedSync getDistributedSync() {
+ return flushTracker;
+ }
+
+ public void blockRPC(Address... addresses) {
+ if (flushInProgress.compareAndSet(false, true)) {
+ // TODO make these configurable!!
+ int retries = 5;
+ int sleepBetweenRetries = 250;
+ int sleepIncreaseFactor = 2;
+ if (trace) log.trace("Attempting a partial flush on members {0} with up to
{1} retries.", members, retries);
+
+ boolean success = false;
+ int i;
+ for (i = 1; i <= retries; i++) {
+ if (trace) log.trace("Attempt number " + i);
+ try {
+
+ if (addresses == null) {
+ success = channel.startFlush(false);
+ } else {
+ membersBlocked = toJGroupsAddressList(addresses);
+ success = channel.startFlush(membersBlocked, false);
+ }
+
+ if (success) break;
+ if (trace) log.trace("Channel.startFlush() returned false!");
+ } catch (Exception e) {
+ if (trace) log.trace("Caught exception attempting a partial
flush", e);
+ }
+ try {
+ if (trace)
+ log.trace("Partial state transfer failed. Backing off for "
+ sleepBetweenRetries + " millis and retrying");
+ Thread.sleep(sleepBetweenRetries);
+ sleepBetweenRetries *= sleepIncreaseFactor;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (success) {
+ if (log.isDebugEnabled()) log.debug("Partial flush between {0}
succeeded!", membersBlocked);
+ } else {
+ flushInProgress.set(false);
+ throw new CacheException("Could initiate partial flush between " +
membersBlocked + "!");
+ }
+ } else {
+ throw new CacheException("Cannot block RPC; a block is already in
progress!");
+ }
+ }
+
+ public void unblockRPC() {
+ if (flushInProgress.get()) {
+ try {
+ if (membersBlocked == null) {
+ channel.stopFlush();
+ } else {
+ channel.stopFlush(membersBlocked);
+ membersBlocked = null;
+ }
+ } finally {
+ flushInProgress.set(false);
+ }
+ }
+ }
+
public Address getAddress() {
if (address == null) {
address = new JGroupsAddress(channel.getLocalAddress());
@@ -251,7 +318,8 @@
// outbound RPC
//
------------------------------------------------------------------------------------------------------------------
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter
responseFilter)
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand
rpcCommand, ResponseMode mode, long timeout,
+ boolean usePriorityQueue, ResponseFilter
responseFilter, boolean supportReplay)
throws Exception {
if (recipients != null && recipients.isEmpty()) {
@@ -262,39 +330,51 @@
log.trace("dests={0}, command={1}, mode={2}, timeout={3}", recipients,
rpcCommand, mode, timeout);
- RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients),
rpcCommand, toJGroupsMode(mode),
- timeout, false, usePriorityQueue,
- toJGroupsFilter(responseFilter));
+ // Acquire a "processing" lock so that any other code is made aware of a
network call in progress
+ // make sure this is non-exclusive since concurrent network calls are valid for
most situations.
+ flushTracker.acquireProcessingLock(false, distributedSyncTimeout, MILLISECONDS);
+ boolean unlock = true;
+ // if there is a FLUSH in progress, block till it completes
+ flushTracker.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
- if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async
case
+ try {
+ RspList rsps =
dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand,
toJGroupsMode(mode),
+ timeout, false,
usePriorityQueue,
+ toJGroupsFilter(responseFilter),
supportReplay);
- if (trace)
- log.trace("Cache [{0}]: responses for command {1}:\n{2}",
getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
+ if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async
case
- // short-circuit no-return-value calls.
- if (rsps == null) return Collections.emptyList();
- List<Object> retval = new ArrayList<Object>(rsps.size());
+ if (trace)
+ log.trace("Cache [{0}]: responses for command {1}:\n{2}",
getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
- for (Rsp rsp : rsps.values()) {
- if (rsp.wasSuspected() || !rsp.wasReceived()) {
- CacheException ex;
- if (rsp.wasSuspected()) {
- ex = new SuspectException("Suspected member: " +
rsp.getSender());
+ // short-circuit no-return-value calls.
+ if (rsps == null) return Collections.emptyList();
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+
+ for (Rsp rsp : rsps.values()) {
+ if (rsp.wasSuspected() || !rsp.wasReceived()) {
+ CacheException ex;
+ if (rsp.wasSuspected()) {
+ ex = new SuspectException("Suspected member: " +
rsp.getSender());
+ } else {
+ ex = new TimeoutException("Replication timeout for " +
rsp.getSender());
+ }
+ retval.add(new ReplicationException("rsp=" + rsp, ex));
} else {
- ex = new TimeoutException("Replication timeout for " +
rsp.getSender());
+ Object value = rsp.getValue();
+ if (value instanceof Exception && !(value instanceof
ReplicationException)) {
+ // if we have any application-level exceptions make sure we throw
them!!
+ if (trace) log.trace("Recieved exception'" + value +
"' from " + rsp.getSender());
+ throw (Exception) value;
+ }
+ retval.add(value);
}
- retval.add(new ReplicationException("rsp=" + rsp, ex));
- } else {
- Object value = rsp.getValue();
- if (value instanceof Exception && !(value instanceof
ReplicationException)) {
- // if we have any application-level exceptions make sure we throw them!!
- if (trace) log.trace("Recieved exception'" + value +
"' from " + rsp.getSender());
- throw (Exception) value;
- }
- retval.add(value);
}
+ return retval;
+ } finally {
+ // release the "processing" lock so that other threads are aware of
the network call having completed
+ if (unlock) flushTracker.releaseProcessingLock();
}
- return retval;
}
private int toJGroupsMode(ResponseMode mode) {
@@ -360,9 +440,13 @@
}
public void block() {
- // a no-op
+ flushTracker.acquireSync();
}
+ public void unblock() {
+ flushTracker.releaseSync();
+ }
+
public void receive(Message msg) {
// no-op
}
@@ -388,13 +472,14 @@
}
public void getState(String cacheName, OutputStream ostream) {
- if (trace) log.trace("Received request to generate state for cache {0}.
Attempting to generate state.", cacheName);
+ if (trace)
+ log.trace("Received request to generate state for cache named
'{0}'. Attempting to generate state.", cacheName);
try {
inboundInvocationHandler.generateState(cacheName, ostream);
} catch (StateTransferException e) {
log.error("Caught while responding to state transfer request", e);
} finally {
- Util.closeStream(ostream);
+ Util.flushAndCloseStream(ostream);
}
}
@@ -403,14 +488,15 @@
}
public void setState(String cacheName, InputStream istream) {
- if (trace) log.trace("Received state for cache {0}. Attempting to apply
state.", cacheName);
- StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+ StateTransferMonitor mon = null;
try {
+ if (trace) log.trace("Received state for cache named '{0}'.
Attempting to apply state.", cacheName);
+ mon = stateTransfersInProgress.get(cacheName);
inboundInvocationHandler.applyState(cacheName, istream);
mon.notifyStateReceiptSucceeded();
- } catch (StateTransferException e) {
+ } catch (Exception e) {
log.error("Failed setting state", e);
- mon.notifyStateReceiptFailed(e);
+ mon.notifyStateReceiptFailed(e instanceof StateTransferException ?
(StateTransferException) e : new StateTransferException(e));
} finally {
Util.closeStream(istream);
}
@@ -433,6 +519,18 @@
return retval;
}
+ private List<org.jgroups.Address> toJGroupsAddressList(Address... addresses) {
+ if (addresses == null) return null;
+ if (addresses.length == 0) return Collections.emptyList();
+
+ List<org.jgroups.Address> retval = new
ArrayList<org.jgroups.Address>(addresses.length);
+ for (Address a : addresses) {
+ JGroupsAddress ja = (JGroupsAddress) a;
+ retval.add(ja.address);
+ }
+ return retval;
+ }
+
private org.jgroups.Address toJGroupsAddress(Address a) {
return ((JGroupsAddress) a).address;
}
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * Indicates that the request was ignored,
+ *
+ * @author Jason T. Greene
+ */
+public class RequestIgnoredResponse implements Serializable {
+ @Override
+ public String toString() {
+ return "RequestIgnoredResponse";
+ }
+}
Modified:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -22,12 +22,18 @@
package org.horizon.statetransfer;
import org.horizon.AdvancedCache;
-import org.horizon.transaction.TransactionLog;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.Configuration;
import org.horizon.container.DataContainer;
+import org.horizon.context.InvocationContext;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
+import org.horizon.interceptors.InterceptorChain;
+import org.horizon.invocation.InvocationContextContainer;
import org.horizon.invocation.Options;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheLoaderManager;
import org.horizon.loader.CacheStore;
@@ -36,6 +42,9 @@
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.remoting.transport.Transport;
+import org.horizon.transaction.TransactionLog;
import org.horizon.util.Util;
import java.io.IOException;
@@ -44,8 +53,9 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class StateTransferManagerImpl implements StateTransferManager {
@@ -57,13 +67,18 @@
CacheStore cs;
Marshaller marshaller;
TransactionLog transactionLog;
+ InvocationContextContainer invocationContextContainer;
+ InterceptorChain interceptorChain;
private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+ private static final boolean trace = log.isTraceEnabled();
private static final Delimiter DELIMITER = new Delimiter();
+ boolean transientState, persistentState;
+
@Inject
public void injectDependencies(RPCManager rpcManager, AdvancedCache cache,
Configuration configuration,
DataContainer dataContainer, CacheLoaderManager clm,
Marshaller marshaller,
- TransactionLog transactionLog) {
+ TransactionLog transactionLog, InterceptorChain
interceptorChain, InvocationContextContainer invocationContextContainer) {
this.rpcManager = rpcManager;
this.cache = cache;
this.configuration = configuration;
@@ -71,12 +86,17 @@
this.clm = clm;
this.marshaller = marshaller;
this.transactionLog = transactionLog;
+ this.invocationContextContainer = invocationContextContainer;
+ this.interceptorChain = interceptorChain;
}
@Start(priority = 14)
// it is imperative that this starts *after* the RPCManager does.
public void start() throws StateTransferException {
- cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null :
clm.getCacheStore();
+ log.trace("Data container is {0}",
System.identityHashCode(dataContainer));
+ cs = clm == null ? null : clm.getCacheStore();
+ transientState = configuration.isFetchInMemoryState();
+ persistentState = cs != null && clm.isEnabled() &&
clm.isFetchPersistentState() && !clm.isShared();
long startTime = 0;
if (log.isDebugEnabled()) {
@@ -93,43 +113,162 @@
}
public void generateState(OutputStream out) throws StateTransferException {
- if (log.isDebugEnabled()) log.debug("Generating state");
+ ObjectOutputStream oos = null;
+ boolean txLogActivated = false;
+ try {
+ boolean canProvideState = (transientState || persistentState)
+ && (txLogActivated = transactionLog.activate());
+ if (log.isDebugEnabled()) log.debug("Generating state. Can provide?
{0}", canProvideState);
+ oos = new ObjectOutputStream(out);
+ marshaller.objectToObjectStream(canProvideState, oos);
+ if (canProvideState) {
+ delimit(oos);
+ if (transientState) generateInMemoryState(oos);
+ delimit(oos);
+ if (persistentState) generatePersistentState(oos);
+ delimit(oos);
+ generateTransactionLog(oos);
+
+ if (log.isDebugEnabled()) log.debug("State generated, closing object
stream");
+ } else {
+ if (log.isDebugEnabled()) log.debug("Not providing state!");
+ }
+
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ } finally {
+ Util.flushAndCloseStream(oos);
+ if (txLogActivated) transactionLog.deactivate();
+ }
+ }
+
+ private void generateTransactionLog(ObjectOutputStream oos) throws Exception {
+ // todo this should be configurable
+ int maxNonProgressingLogWrites = 100;
+ int flushTimeout = 60000;
+
+ DistributedSync distributedSync = rpcManager.getTransport().getDistributedSync();
+
try {
- ObjectOutputStream oos = new ObjectOutputStream(out);
+ if (trace) log.trace("Transaction log size is {0}",
transactionLog.size());
+ for (int nonProgress = 0, size = transactionLog.size(); size > 0;) {
+ if (trace) log.trace("Tx Log remaining entries = " + size);
+ transactionLog.writeCommitLog(marshaller, oos);
+ int newSize = transactionLog.size();
+
+ // If size did not decrease then we did not make progress, and could be
wasting
+ // our time. Limit this to the specified max.
+ if (newSize >= size && ++nonProgress >=
maxNonProgressingLogWrites)
+ break;
+
+ size = newSize;
+ }
+
+ // Wait on incoming and outgoing threads to line-up in front of
+ // the distributed sync.
+ distributedSync.acquireProcessingLock(true,
configuration.getStateRetrievalTimeout(), MILLISECONDS);
+
+ // Signal to sender that we need a flush to get a consistent view
+ // of the remaining transactions.
delimit(oos);
- generateInMemoryState(oos);
+ oos.flush();
+ if (trace) log.trace("Waiting for a distributed sync block");
+ distributedSync.blockUntilAcquired(flushTimeout, MILLISECONDS);
+ if (trace) log.trace("Distributed sync block received, proceeding with
writing commit log");
+ // Write remaining transactions
+ transactionLog.writeCommitLog(marshaller, oos);
delimit(oos);
- generatePersistentState(oos);
+
+ // Write all non-completed prepares
+ transactionLog.writePendingPrepares(marshaller, oos);
delimit(oos);
oos.flush();
- oos.close();
- if (log.isDebugEnabled()) log.debug("State generated, closing object
stream");
- // just close the object stream but do NOT close the underlying stream
- } catch (StateTransferException ste) {
- throw ste;
- } catch (Exception e) {
- throw new StateTransferException(e);
}
+ finally {
+ distributedSync.releaseProcessingLock();
+ }
}
+ private void processCommitLog(ObjectInputStream ois) throws Exception {
+ Object object = marshaller.objectFromObjectStream(ois);
+ while (object instanceof TransactionLog.LogEntry) {
+ List<WriteCommand> mods = ((TransactionLog.LogEntry)
object).getModifications();
+ if (trace) log.trace("Mods = {0}", mods);
+ for (WriteCommand mod : mods) {
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ ctx.setOptions(Options.CACHE_MODE_LOCAL, Options.SKIP_CACHE_STATUS_CHECK);
+ interceptorChain.invoke(ctx, mod);
+ }
+
+ object = marshaller.objectFromObjectStream(ois);
+ }
+
+ assertDelimited(object);
+ }
+
+ private void applyTransactionLog(ObjectInputStream ois) throws Exception {
+ if (trace) log.trace("Integrating transaction log");
+
+ processCommitLog(ois);
+ Transport t = rpcManager.getTransport();
+ t.blockRPC(rpcManager.getTransport().getAddress(),
rpcManager.getCurrentStateTransferSource());
+
+ try {
+ if (trace)
+ log.trace("Retrieving/Applying post-flush commits");
+ processCommitLog(ois);
+
+ if (trace)
+ log.trace("Retrieving/Applying pending prepares");
+ Object object = marshaller.objectFromObjectStream(ois);
+ while (object instanceof PrepareCommand) {
+ PrepareCommand command = (PrepareCommand) object;
+ if (!transactionLog.hasPendingPrepare(command)) {
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ ctx.setOptions(Options.CACHE_MODE_LOCAL,
Options.SKIP_CACHE_STATUS_CHECK);
+ interceptorChain.invoke(ctx, command);
+ }
+ object = marshaller.objectFromObjectStream(ois);
+ }
+ assertDelimited(object);
+ }
+ finally {
+ if (trace) log.trace("Stopping partial flush");
+ t.unblockRPC();
+ }
+ }
+
public void applyState(InputStream in) throws StateTransferException {
if (log.isDebugEnabled()) log.debug("Applying state");
-
+ ObjectInputStream ois = null;
try {
- ObjectInputStream ois = new ObjectInputStream(in);
- assertDelimited(ois);
- applyInMemoryState(ois);
- assertDelimited(ois);
- applyPersistentState(ois);
- assertDelimited(ois);
- if (log.isDebugEnabled()) log.debug("State applied, closing object
stream");
- ois.close();
- // just close the object stream but do NOT close the underlying stream
+ ois = new ObjectInputStream(in);
+ boolean canProvideState = (Boolean) marshaller.objectFromObjectStream(ois);
+ if (canProvideState) {
+ assertDelimited(ois);
+ if (transientState) applyInMemoryState(ois);
+ assertDelimited(ois);
+ if (persistentState) applyPersistentState(ois);
+ assertDelimited(ois);
+ applyTransactionLog(ois);
+ if (log.isDebugEnabled()) log.debug("State applied, closing object
stream");
+ } else {
+ String msg = "Provider cannot provide state!";
+ if (log.isDebugEnabled()) log.debug(msg);
+ throw new StateTransferException(msg);
+ }
} catch (StateTransferException ste) {
throw ste;
} catch (Exception e) {
throw new StateTransferException(e);
+ } finally {
+ // just close the object stream but do NOT close the underlying stream
+ Util.closeStream(ois);
}
}
@@ -137,7 +276,8 @@
dataContainer.clear();
try {
Set<StoredEntry> set = (Set<StoredEntry>)
marshaller.objectFromObjectStream(i);
- for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(),
se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+ for (StoredEntry se : set)
+ cache.put(se.getKey(), se.getValue(), se.getLifespan(), MILLISECONDS,
Options.CACHE_MODE_LOCAL);
} catch (Exception e) {
dataContainer.clear();
throw new StateTransferException(e);
@@ -149,6 +289,7 @@
// TODO is it safe enough to get these from the data container directly?
try {
Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+ if (log.isDebugEnabled()) log.debug("Writing {0} StoredEntries to
stream", s.size());
marshaller.objectToObjectStream(s, o);
} catch (Exception e) {
throw new StateTransferException(e);
@@ -156,40 +297,39 @@
}
private void applyPersistentState(ObjectInputStream i) throws StateTransferException
{
- if (cs == null) {
- if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent
state, or no cache store configured. Skipping applying persistent state.");
- } else {
- try {
- cs.fromStream(i);
- } catch (CacheLoaderException cle) {
- throw new StateTransferException(cle);
- }
+ try {
+ // always use the unclosable stream delegate to ensure the impl doesn't
close the stream
+ cs.fromStream(new UnclosableObjectInputStream(i));
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
}
}
private void generatePersistentState(ObjectOutputStream o) throws
StateTransferException {
- if (cs == null) {
- if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent
state, or no cache store configured. Skipping generating persistent state.");
- } else {
- try {
- cs.toStream(o);
- } catch (CacheLoaderException cle) {
- throw new StateTransferException(cle);
- }
+ try {
+ // always use the unclosable stream delegate to ensure the impl doesn't
close the stream
+ cs.toStream(new UnclosableObjectOutputStream(o));
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
}
}
private void delimit(ObjectOutputStream o) throws IOException {
- o.writeObject(DELIMITER);
+ marshaller.objectToObjectStream(DELIMITER, o);
}
private void assertDelimited(ObjectInputStream i) throws StateTransferException {
Object o;
try {
- o = i.readObject();
+ o = marshaller.objectFromObjectStream(i);
} catch (Exception e) {
throw new StateTransferException(e);
}
+ assertDelimited(o);
+ }
+
+ private void assertDelimited(Object o) throws StateTransferException {
+ if (o instanceof Exception) throw new StateTransferException((Exception) o);
if ((o == null) || !(o instanceof Delimiter))
throw new StateTransferException("Expected a delimiter, recieved " +
o);
}
Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -41,43 +41,36 @@
*
* @author Jason T. Greene
*/
-public class TransactionLog
-{
+public class TransactionLog {
private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new
ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
private final BlockingQueue<LogEntry> entries = new
LinkedBlockingQueue<LogEntry>();
private AtomicBoolean active = new AtomicBoolean();
- public static class LogEntry
- {
+ public static class LogEntry {
private final GlobalTransaction transaction;
private final List<WriteCommand> modifications;
- public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications)
- {
+ public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications) {
this.transaction = transaction;
this.modifications = modifications;
}
- public GlobalTransaction getTransaction()
- {
+ public GlobalTransaction getTransaction() {
return transaction;
}
- public List<WriteCommand> getModifications()
- {
+ public List<WriteCommand> getModifications() {
return modifications;
}
}
private static Log log = LogFactory.getLog(TransactionLog.class);
- public void logPrepare(PrepareCommand command)
- {
+ public void logPrepare(PrepareCommand command) {
pendingPrepares.put(command.getGlobalTransaction(), command);
}
- public void logCommit(GlobalTransaction gtx)
- {
+ public void logCommit(GlobalTransaction gtx) {
PrepareCommand command = pendingPrepares.remove(gtx);
// it is perfectly normal for a prepare not to be logged for this gtx, for example
if a transaction did not
// modify anything, then beforeCompletion() is not invoked and logPrepare() will
not be called to register the
@@ -85,38 +78,32 @@
if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
}
- private void addEntry(LogEntry entry)
- {
- if (! isActive())
+ private void addEntry(LogEntry entry) {
+ if (!isActive())
return;
- for (;;)
- {
- try
- {
+ for (; ;) {
+ try {
if (log.isTraceEnabled())
log.trace("Added commit entry to tx log" + entry);
entries.put(entry);
break;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications)
- {
+ public final void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications) {
// Just in case...
if (gtx != null) pendingPrepares.remove(gtx);
if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
}
- public void logNoTxWrite(WriteCommand write)
- {
- if (! isActive())
+ public final void logNoTxWrite(WriteCommand write) {
+ if (!isActive())
return;
ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
@@ -124,55 +111,46 @@
addEntry(new LogEntry(null, list));
}
- public void rollback(GlobalTransaction gtx)
- {
+ public void rollback(GlobalTransaction gtx) {
pendingPrepares.remove(gtx);
}
- public boolean isActive()
- {
+ public final boolean isActive() {
return active.get();
}
- public boolean activate()
- {
+ public final boolean activate() {
return active.compareAndSet(false, true);
}
- public void deactivate()
- {
+ public final void deactivate() {
active.set(false);
if (entries.size() > 0)
- log.error("Unprocessed Transaction Log Entries! = " +
entries.size());
+ log.error("Unprocessed Transaction Log Entries! = {0}",
entries.size());
entries.clear();
}
- public int size()
- {
+ public int size() {
return entries.size();
}
- public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws
Exception
- {
- List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+ public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws
Exception {
+ List<LogEntry> buffer = new ArrayList<LogEntry>(10);
- while (entries.drainTo(buffer, 10) > 0)
- {
- for (LogEntry entry : buffer)
- marshaller.objectToObjectStream(entry, out);
+ while (entries.drainTo(buffer, 10) > 0) {
+ for (LogEntry entry : buffer)
+ marshaller.objectToObjectStream(entry, out);
- buffer.clear();
- }
+ buffer.clear();
+ }
}
- public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws
Exception
- {
+ public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws
Exception {
for (PrepareCommand entry : pendingPrepares.values())
marshaller.objectToObjectStream(entry, out);
}
- public boolean hasPendingPrepare(PrepareCommand command)
- {
+ public boolean hasPendingPrepare(PrepareCommand command) {
return pendingPrepares.containsKey(command.getGlobalTransaction());
}
}
Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -26,11 +26,13 @@
import org.horizon.context.TransactionContext;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.annotations.Start;
import org.horizon.factories.context.ContextFactory;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.remoting.RPCManager;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import javax.transaction.Status;
import javax.transaction.SystemException;
@@ -64,6 +66,7 @@
private TransactionManager transactionManager = null;
private RPCManager rpcManager;
+ private Transport transport;
private ContextFactory contextFactory;
@@ -74,6 +77,13 @@
this.contextFactory = contextFactory;
}
+ @Start(priority = 12)
+ // needs to happen after RPCManager
+ public void start() {
+ transport = rpcManager == null ? null : rpcManager.getTransport();
+ }
+
+
/**
* Returns the number of local transactions.
*/
@@ -312,7 +322,7 @@
}
private Address getAddress() {
- return rpcManager == null ? null : rpcManager.getAddress();
+ return transport == null ? null : transport.getAddress();
}
public TransactionContext getTransactionContext(GlobalTransaction gtx) {
Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-03-03 13:01:12 UTC
(rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-03-03 14:56:48 UTC
(rev 7829)
@@ -157,18 +157,26 @@
}
public static void closeStream(InputStream i) {
+ if (i == null) return;
try {
- if (i != null) i.close();
+ i.close();
} catch (Exception e) {
}
}
- public static void closeStream(OutputStream o) {
- try {
- if (o != null) o.close();
+ public static void flushAndCloseStream(OutputStream o) {
+ if (o == null) return;
+ try {
+ o.flush();
} catch (Exception e) {
}
+
+ try {
+ o.close();
+ } catch (Exception e) {
+
+ }
}
}
Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml 2009-03-03 13:01:12 UTC
(rev 7828)
+++ core/branches/flat/src/main/resources/config-samples/all.xml 2009-03-03 14:56:48 UTC
(rev 7829)
@@ -33,7 +33,8 @@
There is no added cost to defining a transport but not creating a cache that
uses one, since the transport
is created and initialized lazily.
-->
- <transport
transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport"
clusterName="horizon-cluster">
+ <transport
transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport"
clusterName="horizon-cluster"
+ distributedSyncTimeout="50000">
<!-- Note that the JGroups transport uses sensible defaults if no
configuration property is defined. -->
<property name="configurationFile" value="udp.xml"/>
<!-- See the JGroupsTransport javadocs for more options -->
Modified: core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
===================================================================
--- core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-03-03
14:56:48 UTC (rev 7829)
@@ -139,6 +139,7 @@
</xs:sequence>
<xs:attribute name="transportClass" type="xs:string"/>
<xs:attribute name="clusterName" type="xs:string"/>
+ <xs:attribute name="distributedSyncTimeout"
type="xs:long"/>
</xs:complexType>
<xs:complexType name="syncType">
Modified:
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -106,7 +106,7 @@
// specify what we expectWithTx called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
// setting the mock object to expectWithTx the "sync" param to be
false.
expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(),
- eq(ResponseMode.ASYNCHRONOUS), anyLong(),
anyBoolean(), (ResponseFilter) isNull())).andReturn(null);
+ eq(ResponseMode.ASYNCHRONOUS), anyLong(),
anyBoolean(), (ResponseFilter) isNull(), anyBoolean())).andReturn(null);
replay(mockAddress1, mockAddress2, mockTransport);
@@ -160,7 +160,7 @@
expect(mockTransport.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(),
anyResponseMode(),
- anyLong(), anyBoolean(), (ResponseFilter)
anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter)
anyObject(), anyBoolean()))
.andThrow(new RuntimeException("Barf!")).anyTimes();
replay(mockTransport);
Modified:
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -182,7 +182,7 @@
expect(mockTransport.getAddress()).andReturn(addressOne).anyTimes();
expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(),
eq(isSync ? ResponseMode.SYNCHRONOUS :
ResponseMode.ASYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter)
anyObject())).andReturn(null).anyTimes();
+ anyLong(), anyBoolean(), (ResponseFilter)
anyObject(), anyBoolean())).andReturn(null).anyTimes();
replay(mockTransport);
cache1.put("k", "v");
Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-03-03
13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -145,7 +145,7 @@
rpcManager.setTransport(mockTransport);
expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(), eq(ResponseMode.SYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter)
anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter)
anyObject(), anyBoolean()))
.andReturn(Collections.emptyList()).once();
replay(mockTransport);
@@ -157,7 +157,7 @@
expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
expect(mockTransport.invokeRemotely((List<Address>) anyObject(),
(RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter)
anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter)
anyObject(), anyBoolean()))
.andReturn(Collections.emptyList()).once();
replay(mockTransport);
Added:
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -0,0 +1,88 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.loader.CacheLoader;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.dummy.DummyInMemoryCacheStore;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+@Test(groups = "functional", testName =
"statetransfer.StateTransferCacheLoaderFunctionalTest", enabled = false)
+public class StateTransferCacheLoaderFunctionalTest extends StateTransferFunctionalTest
{
+ int id;
+ ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ @Override
+ protected CacheManager createCacheManager() {
+ // increment the DIMCS store id
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ CacheLoaderConfig clc = new DummyInMemoryCacheStore.Cfg("store number " +
id++);
+ clmc.addCacheLoaderConfig(clc);
+ clc.setFetchPersistentState(true);
+ clmc.setShared(sharedCacheLoader.get());
+ config.setCacheLoaderManagerConfig(clmc);
+ return super.createCacheManager();
+ }
+
+ @Override
+ protected void writeInitialData(final Cache<Object, Object> c) {
+ super.writeInitialData(c);
+ c.evict(A_B_NAME);
+ c.evict(A_B_AGE);
+ c.evict(A_C_NAME);
+ c.evict(A_C_AGE);
+ c.evict(A_D_NAME);
+ c.evict(A_D_AGE);
+ }
+
+ protected void verifyInitialDataOnLoader(Cache<Object, Object> c) throws
Exception {
+ CacheLoader l = TestingUtil.extractComponent(c,
CacheLoaderManager.class).getCacheLoader();
+ assert l.containsKey(A_B_AGE);
+ assert l.containsKey(A_B_NAME);
+ assert l.containsKey(A_C_AGE);
+ assert l.containsKey(A_C_NAME);
+ assert l.load(A_B_AGE).getValue().equals(TWENTY);
+ assert l.load(A_B_NAME).getValue().equals(JOE);
+ assert l.load(A_C_AGE).getValue().equals(FORTY);
+ assert l.load(A_C_NAME).getValue().equals(BOB);
+ }
+
+ protected void verifyNoData(Cache<Object, Object> c) {
+ assert c.isEmpty() : "Cache should be empty!";
+ }
+
+ protected void verifyNoDataOnLoader(Cache<Object, Object> c) throws Exception {
+ CacheLoader l = TestingUtil.extractComponent(c,
CacheLoaderManager.class).getCacheLoader();
+ assert !l.containsKey(A_B_AGE);
+ assert !l.containsKey(A_B_NAME);
+ assert !l.containsKey(A_C_AGE);
+ assert !l.containsKey(A_C_NAME);
+ assert !l.containsKey(A_D_AGE);
+ assert !l.containsKey(A_D_NAME);
+ }
+
+
+ public void testSharedLoader() throws Exception {
+ sharedCacheLoader.set(true);
+ Cache<Object, Object> c1 = createCacheManager().getCache(cacheName);
+ writeInitialData(c1);
+
+ // starting the second cache would initialize an in-memory state transfer but not a
persistent one since the loader is shared
+ Cache<Object, Object> c2 = createCacheManager().getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+ verifyInitialDataOnLoader(c1);
+ verifyInitialData(c1);
+
+ verifyNoDataOnLoader(c2);
+ verifyNoData(c2);
+ }
+}
Modified:
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03
13:01:12 UTC (rev 7828)
+++
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03
14:56:48 UTC (rev 7829)
@@ -1,13 +1,13 @@
package org.horizon.statetransfer;
import org.horizon.Cache;
-import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.config.Configuration;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.manager.CacheManager;
import org.horizon.test.MultipleCacheManagersTest;
import org.horizon.test.TestingUtil;
+import org.horizon.transaction.DummyTransactionManagerLookup;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
@@ -34,7 +34,7 @@
public static final Integer FORTY = 40;
Configuration config;
- private static final String cacheName = "nbst";
+ protected static final String cacheName = "nbst";
private volatile int testCount = 0;
@@ -54,9 +54,9 @@
config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
}
- private CacheManager createCacheManager() {
+ protected CacheManager createCacheManager() {
CacheManager cm = addClusterEnabledCacheManager();
- cm.defineCache(cacheName, config);
+ cm.defineCache(cacheName, config.clone());
return cm;
}
@@ -86,17 +86,18 @@
}
- private static class WritingRunner implements Runnable {
+ private static class WritingThread extends Thread {
private final Cache<Object, Object> cache;
private final boolean tx;
private volatile boolean stop;
private volatile int result;
private TransactionManager tm;
- WritingRunner(Cache<Object, Object> cache, boolean tx) {
+ WritingThread(Cache<Object, Object> cache, boolean tx) {
this.cache = cache;
this.tx = tx;
if (tx) tm = TestingUtil.getTransactionManager(cache);
+ setDaemon(true);
}
public int result() {
@@ -107,19 +108,27 @@
int c = 0;
while (!stop) {
try {
- if (tx) tm.begin();
- cache.put("test" + c, c++);
- if (tx) tm.commit();
+ if (c % 1000 == 0) {
+ if (tx) tm.begin();
+ for (int i = 0; i < 1000; i++) cache.remove("test" + c);
+ if (tx) tm.commit();
+ c = 0;
+ } else {
+ if (tx) tm.begin();
+ cache.put("test" + c, c++);
+ if (tx) tm.commit();
+ }
}
catch (Exception e) {
- e.printStackTrace();
- log.error(e);
+// e.printStackTrace();
+// log.error(e);
+ stopThread();
}
}
result = c;
}
- public void stop() {
+ public void stopThread() {
stop = true;
}
}
@@ -163,6 +172,7 @@
cm3.getCache(cacheName);
}
});
+ t1.setName("CacheStarter-Cache3");
t1.start();
Thread t2 = new Thread(new Runnable() {
@@ -170,6 +180,7 @@
cm4.getCache(cacheName);
}
});
+ t2.setName("CacheStarter-Cache4");
t2.start();
t1.join();
@@ -178,7 +189,7 @@
cache3 = cm3.getCache(cacheName);
cache4 = cm4.getCache(cacheName);
- TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+ TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3, cache4);
verifyInitialData(cache3);
verifyInitialData(cache4);
log.info("testConcurrentStateTransfer end - " + testCount);
@@ -205,6 +216,7 @@
log.info("testSTWithWritingNonTxThread end - " + testCount);
}
+ @Test(invocationCount = 10)
public void testSTWithWritingTxThread() throws Exception {
testCount++;
log.info("testSTWithWritingTxThread start - " + testCount);
@@ -222,8 +234,7 @@
// Delay the transient copy, so that we get a more thorough log test
cache1.put("delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache3, tx);
- Thread writerThread = new Thread(writer);
+ WritingThread writerThread = new WritingThread(cache3, tx);
writerThread.start();
cache2 = createCacheManager().getCache(cacheName);
@@ -231,25 +242,25 @@
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
- writer.stop();
+ writerThread.stopThread();
writerThread.join();
verifyInitialData(cache2);
- int count = writer.result();
+ int count = writerThread.result();
for (int c = 0; c < count; c++)
assert cache2.get("test" + c).equals(c);
}
- private void verifyInitialData(Cache<Object, Object> c) {
+ protected void verifyInitialData(Cache<Object, Object> c) {
assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " +
A_B_NAME;
assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " +
A_B_AGE;
assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " +
A_C_NAME;
assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " +
A_C_AGE;
}
- private void writeInitialData(final Cache<Object, Object> c) {
+ protected void writeInitialData(final Cache<Object, Object> c) {
c.put(A_B_NAME, JOE);
c.put(A_B_AGE, TWENTY);
c.put(A_C_NAME, BOB);
@@ -261,25 +272,24 @@
cache1 = createCacheManager().getCache(cacheName);
writeInitialData(cache1);
-
// Delay the transient copy, so that we get a more thorough log test
cache1.put("delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache1, tx);
- Thread writerThread = new Thread(writer);
+ WritingThread writerThread = new WritingThread(cache1, tx);
writerThread.start();
-
+ verifyInitialData(cache1);
cache2 = createCacheManager().getCache(cacheName);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
- writer.stop();
+ writerThread.stopThread();
writerThread.join();
+ verifyInitialData(cache1);
verifyInitialData(cache2);
- int count = writer.result();
+ int count = writerThread.result();
for (int c = 0; c < count; c++)
assert cache2.get("test" + c).equals(c);