Author: jason.greene(a)jboss.com
Date: 2009-02-04 18:54:51 -0500 (Wed, 04 Feb 2009)
New Revision: 7649
Added:
core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/RPCManager.java
core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
core/trunk/src/test/resources/log4j.xml
Log:
Implement NBST
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2009-02-04 23:48:43 UTC (rev
7648)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManager.java 2009-02-04 23:54:51 UTC (rev
7649)
@@ -22,12 +22,14 @@
package org.jboss.cache;
import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.lock.TimeoutException;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.blocks.RspFilter;
import java.util.List;
import java.util.Vector;
+import java.util.concurrent.TimeUnit;
/**
* Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
@@ -150,4 +152,8 @@
* @return a channel
*/
Channel getChannel();
+
+ public void waitForFlush(long timeout);
+
+ public Address getLastStateTransferSource();
}
Modified: core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-04 23:48:43 UTC
(rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/RPCManagerImpl.java 2009-02-04 23:54:51 UTC
(rev 7649)
@@ -51,8 +51,10 @@
import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jgroups.Address;
import org.jgroups.Channel;
+import org.jgroups.ChannelClosedException;
import org.jgroups.ChannelException;
import org.jgroups.ChannelFactory;
+import org.jgroups.ChannelNotConnectedException;
import org.jgroups.ExtendedMembershipListener;
import org.jgroups.JChannel;
import org.jgroups.StateTransferException;
@@ -100,7 +102,18 @@
* Thread gate used to block Dispatcher during JGroups FLUSH protocol
*/
private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+
/**
+ * Thread gate used by NBST to wait for a flush
+ */
+ private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+
+ /**
+ * The most recent state transfer source
+ */
+ volatile Address lastStateTransferSource;
+
+ /**
* JGroups RpcDispatcher in use.
*/
private CommandAwareRpcDispatcher rpcDispatcher = null;
@@ -109,7 +122,7 @@
* JGroups message listener.
*/
private ChannelMessageListener messageListener;
- private Configuration configuration;
+ Configuration configuration;
private Notifier notifier;
private CacheSPI spi;
private InvocationContextContainer invocationContextContainer;
@@ -148,6 +161,7 @@
@Start(priority = 15)
public void start()
{
+
switch (configuration.getCacheMode())
{
case LOCAL:
@@ -161,56 +175,131 @@
case INVALIDATION_SYNC:
isInLocalMode = false;
isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null
&& configuration.getBuddyReplicationConfig().isEnabled();
- if (log.isDebugEnabled()) log.debug("Cache mode is " +
configuration.getCacheMode());
+ if (log.isDebugEnabled())
+ log.debug("Cache mode is " + configuration.getCacheMode());
boolean fetchState = shouldFetchStateOnStartup();
- initialiseChannelAndRpcDispatcher(fetchState);
+ boolean nonBlocking = configuration.isNonBlockingStateTransfer();
+ initialiseChannelAndRpcDispatcher(fetchState && !nonBlocking);
- if (fetchState)
+ if (!fetchState || nonBlocking)
{
try
{
- long start = System.currentTimeMillis();
- // connect and state transfer
- channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
- //if I am not the only and the first member than wait for a state to
arrive
- if (getMembers().size() > 1) messageListener.waitForState();
-
- if (log.isDebugEnabled())
- log.debug("connected, state was retrieved successfully (in
" + (System.currentTimeMillis() - start) + " milliseconds)");
+ // Allow commands to be ACKed during state transfer
+ if (nonBlocking)
+ componentRegistry.setBlockInStarting(false);
+ channel.connect(configuration.getClusterName());
+ if (log.isInfoEnabled())
+ log.info("Cache local address is " + getLocalAddress());
}
- catch (StateTransferException ste)
- {
- // make sure we disconnect from the channel before we throw this
exception!
- // JBCACHE-761
- disconnect();
- throw new CacheException("Unable to fetch state on startup",
ste);
- }
catch (ChannelException e)
{
throw new CacheException("Unable to connect to JGroups
channel", e);
}
- catch (Exception ex)
- {
- throw new CacheException("Unable to fetch state on startup",
ex);
- }
+
+ if (!fetchState)
+ return;
}
+
+
+ List<Address> members = getMembers();
+
+ long start = System.currentTimeMillis();
+ if (nonBlocking)
+ {
+ startNonBlockStateTransfer(members);
+ }
else
{
- //otherwise just connect
try
{
- channel.connect(configuration.getClusterName());
+ channel.connect(configuration.getClusterName(), null, null,
configuration.getStateRetrievalTimeout());
+ if (log.isInfoEnabled())
+ log.info("Cache local address is " + getLocalAddress());
+
+ if (members.size() > 1)
+ messageListener.waitForState();
}
catch (ChannelException e)
{
throw new CacheException("Unable to connect to JGroups
channel", e);
}
+ catch (Exception ex)
+ {
+ // make sure we disconnect from the channel before we throw this
exception!
+ // JBCACHE-761
+ disconnect();
+ throw new CacheException("Unable to fetch state on startup",
ex);
+ }
}
- if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
+
+ if (log.isDebugEnabled())
+ log.debug("state was retrieved successfully (in " +
(System.currentTimeMillis() - start) + " milliseconds)");
}
+
}
+ private void startNonBlockStateTransfer(List<Address> members)
+ {
+
+ if (members.size() < 2)
+ return;
+
+ boolean success = false;
+
+ outer:
+ for (int i = 0, wait = 1000; i < 5; i++)
+ {
+ for (Address member : members)
+ {
+ if (member.equals(getLocalAddress()))
+ continue;
+
+ try
+ {
+ if (log.isTraceEnabled())
+ log.trace("Trying to fetch state from: " + member);
+ if (getState(null, member))
+ {
+ messageListener.waitForState();
+ success = true;
+ break outer;
+ }
+ }
+ catch (Exception e)
+ {
+ if (log.isTraceEnabled())
+ log.trace("Error while fetching state", e);
+ }
+ }
+
+ if (!success)
+ {
+ if (trace)
+ log.trace("Could not find available peer for state, backing off and
retrying");
+
+ try
+ {
+ Thread.sleep(wait <<= 2);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ }
+
+ if (!success)
+ {
+ disconnect();
+ throw new CacheException("Unable to fetch state on startup");
+ }
+
+ componentRegistry.setBlockInStarting(true);
+ }
+
public void disconnect()
{
if (channel != null && channel.isOpen())
@@ -316,12 +405,12 @@
if (configuration.isUseRegionBasedMarshalling())
{
rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener,
new MembershipListenerAdaptor(),
- spi, invocationContextContainer, interceptorChain, componentRegistry);
+ spi, invocationContextContainer, interceptorChain, componentRegistry,
flushBlockGate);
}
else
{
rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry);
+ invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry, flushBlockGate);
}
checkAppropriateConfig();
rpcDispatcher.setRequestMarshaller(marshaller);
@@ -556,7 +645,7 @@
if (log.isDebugEnabled())
log.debug("Node " + getLocalAddress() + " fetching partial
state " + stateId + " from member " + target);
messageListener.setStateSet(false);
- successfulTransfer = channel.getState(target, stateId,
configuration.getStateRetrievalTimeout());
+ successfulTransfer = getState(stateId, target);
if (successfulTransfer)
{
try
@@ -588,6 +677,31 @@
}
+ private boolean getState(String stateId, Address target) throws
ChannelNotConnectedException, ChannelClosedException
+ {
+ lastStateTransferSource = target;
+ return ((JChannel)channel).getState(target, stateId,
configuration.getStateRetrievalTimeout(), !configuration.isNonBlockingStateTransfer());
+ }
+
+ public void waitForFlush(long timeout)
+ {
+ for (;;)
+ {
+ try
+ {
+ if (channel.flushSupported() && !flushWaitGate.await(timeout,
TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for
flush to block. (timeout = " + timeout+ " millis) ");
+ }
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
// ------------ END: Partial state transfer methods ------------
// ------------ START: Informational methods ------------
@@ -599,6 +713,11 @@
return address == null ? "null" : address.toString();
}
+ public Address getLastStateTransferSource()
+ {
+ return lastStateTransferSource;
+ }
+
public Address getLocalAddress()
{
return channel != null ? channel.getLocalAddress() : null;
@@ -734,6 +853,7 @@
try
{
flushBlockGate.close();
+ flushWaitGate.open();
if (log.isDebugEnabled()) log.debug("Block received at " +
getLocalAddress());
notifier.notifyCacheBlocked(true);
notifier.notifyCacheBlocked(false);
@@ -754,6 +874,7 @@
{
try
{
+ flushWaitGate.close();
if (log.isDebugEnabled()) log.debug("UnBlock received at " +
getLocalAddress());
notifier.notifyCacheUnblocked(true);
@@ -862,4 +983,4 @@
}
}
}
-}
\ No newline at end of file
+}
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2009-02-04 23:48:43
UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2009-02-04 23:54:51
UTC (rev 7649)
@@ -187,6 +187,7 @@
private boolean exposeManagementStatistics = true;
@Dynamic
private boolean fetchInMemoryState = true;
+ private boolean nonBlockingStateTransfer = false;
private short replicationVersion = DEFAULT_REPLICATION_VERSION;
@Dynamic
private long lockAcquisitionTimeout = 10000;
@@ -474,7 +475,7 @@
/**
* Sets the queue size of the bounded queue used to store async serialization events
on. This defaults to 50,000.
- *
+ *
* @param serializationExecutorQueueSize queue size to use
*/
public void setSerializationExecutorQueueSize(int serializationExecutorQueueSize)
@@ -1141,4 +1142,14 @@
return null;
}
+ public boolean isNonBlockingStateTransfer()
+ {
+ return nonBlockingStateTransfer;
+ }
+
+ public void setNonBlockingStateTransfer(boolean nonBlockingStateTransfer)
+ {
+ this.nonBlockingStateTransfer = nonBlockingStateTransfer;
+ }
+
}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-04
23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -105,6 +105,8 @@
*/
private boolean invokedFromShutdownHook;
+ private volatile boolean blockInStarting = true;
+
/**
* Creates an instance of the component registry. The configuration passed in is
automatically registered.
*
@@ -202,6 +204,7 @@
s.add(RegionManagerFactory.class);
s.add(NodeMetaFactory.class);
s.add(CommandsMetaFactory.class);
+ s.add(TransactionLogFactory.class);
return s;
}
@@ -883,7 +886,7 @@
log.trace("Is remotely originating.");
// else if this is a remote call and the status is STARTING, wait until the cache
starts.
- if (state == CacheStatus.STARTING)
+ if (state == CacheStatus.STARTING && blockInStarting)
{
log.trace("Cache is starting; block.");
try
@@ -1023,4 +1026,9 @@
HashSet<Component> defensiveCopy = new
HashSet<Component>(componentLookup.values());
return Collections.unmodifiableSet(defensiveCopy);
}
+
+ public void setBlockInStarting(boolean blockInStarting)
+ {
+ this.blockInStarting = blockInStarting;
+ }
}
Added: core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
(rev 0)
+++
core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -0,0 +1,40 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.cache.factories;
+
+import org.jboss.cache.factories.annotations.DefaultFactoryFor;
+import org.jboss.cache.transaction.TransactionLog;
+
+/**
+ * Constructs {@link org.jboss.cache.transaction.TransactionLog} instances.
+ *
+ * @author Jason T. Greene
+ * @since 3.0
+ */
+@DefaultFactoryFor(classes = TransactionLog.class)
+public class TransactionLogFactory extends ComponentFactory
+{
+ protected <T> T construct(Class<T> componentType)
+ {
+ return componentType.cast(new TransactionLog());
+ }
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/factories/TransactionLogFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2009-02-04
23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -21,6 +21,20 @@
*/
package org.jboss.cache.interceptors;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
import org.jboss.cache.CacheException;
import org.jboss.cache.InvocationContext;
import org.jboss.cache.RPCManager;
@@ -51,21 +65,11 @@
import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.transaction.GlobalTransaction;
import org.jboss.cache.transaction.TransactionContext;
+import org.jboss.cache.transaction.TransactionLog;
import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.cache.util.Immutables;
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
-import javax.transaction.InvalidTransactionException;
-import javax.transaction.Status;
-import javax.transaction.Synchronization;
-import javax.transaction.SystemException;
-import javax.transaction.Transaction;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
/**
* This interceptor is the new default at the head of all interceptor chains,
* and makes transactional attributes available to all interceptors in the chain.
@@ -83,12 +87,14 @@
private InvocationContextContainer invocationContextContainer;
private ComponentRegistry componentRegistry;
private ContextFactory contextFactory;
+ private TransactionLog transactionLog;
/**
* List <Transaction>that we have registered for
*/
private final Set<Transaction> transactions = new
ConcurrentHashSet<Transaction>();
private final Map<Transaction, GlobalTransaction> rollbackTransactions = new
ConcurrentHashMap<Transaction, GlobalTransaction>(16);
+
private long prepares = 0;
private long commits = 0;
private long rollbacks = 0;
@@ -99,9 +105,11 @@
@Inject
public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
Notifier notifier, InvocationContextContainer icc,
- CommandsFactory factory, ComponentRegistry componentRegistry,
LockManager lockManager)
+ TransactionLog transactionLog, CommandsFactory factory,
+ ComponentRegistry componentRegistry, LockManager lockManager)
{
this.contextFactory = contextFactory;
+ this.transactionLog = transactionLog;
this.commandsFactory = factory;
this.rpcManager = rpcManager;
this.notifier = notifier;
@@ -117,10 +125,12 @@
Object result = null;
// this is a prepare, commit, or rollback.
- if (trace) log.trace("Got gtx from invocation context " +
ctx.getGlobalTransaction());
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ if (trace) log.trace("Got gtx from invocation context " + gtx);
+
try
{
- if (ctx.getGlobalTransaction().isRemote())
+ if (gtx.isRemote())
{
result = handleRemotePrepare(ctx, command);
if (getStatisticsEnabled()) prepares++;
@@ -142,6 +152,8 @@
@Override
public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws
Throwable
{
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+
if (!ctx.getGlobalTransaction().isRemote())
{
if (trace) log.trace("received my own message (discarding it)");
@@ -150,7 +162,7 @@
try
{
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
- GlobalTransaction gtx = ctx.getGlobalTransaction();
+
Transaction ltx = txTable.getLocalTransaction(gtx, true);
// disconnect if we have a current tx associated
Transaction currentTx = txManager.getTransaction();
@@ -194,7 +206,9 @@
@Override
public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command)
throws Throwable
{
- if (!ctx.getGlobalTransaction().isRemote())
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+ if (!gtx.isRemote())
{
if (trace) log.trace("received my own message (discarding it)");
return null;
@@ -202,7 +216,6 @@
try
{
if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call
on command [" + command + "]");
- GlobalTransaction gtx = ctx.getGlobalTransaction();
Transaction ltx = txTable.getLocalTransaction(gtx);
if (ltx == null)
{
@@ -268,7 +281,12 @@
{
try
{
- return attachGtxAndPassUpChain(ctx, command);
+ Object ret = attachGtxAndPassUpChain(ctx, command);
+
+ if (command instanceof WriteCommand && ctx.getTransaction() == null)
+ transactionLog.logNoTxWrite((WriteCommand)command);
+
+ return ret;
}
catch (Throwable throwable)
{
@@ -382,6 +400,8 @@
}
else
{
+ transactionLog.logPrepare(command);
+
// now pass up the prepare method itself.
invokeNextInterceptor(ctx, command);
}
@@ -544,11 +564,22 @@
{
try
{
- VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx,
modifications, true) : commandsFactory.buildCommitCommand(gtx);
-
if (trace) log.trace("Running commit for " + gtx);
+ VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx,
modifications, true)
+ :
commandsFactory.buildCommitCommand(gtx);
+
+
handleCommitRollback(ctx, commitCommand);
+
+ if (onePhaseCommit)
+ {
+ transactionLog.logOnePhaseCommit(gtx, modifications);
+ }
+ else
+ {
+ transactionLog.logCommit(gtx);
+ }
}
catch (Throwable e)
{
@@ -595,6 +626,8 @@
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
if (trace) log.trace(" running rollback for " + gtx);
+ transactionLog.rollback(gtx);
+
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
rollbackTransactions.put(tx, gtx);
@@ -630,8 +663,10 @@
public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx,
List<WriteCommand> modifications) throws Throwable
{
// running a 2-phase commit.
- VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ transactionLog.logPrepare(prepareCommand);
+
Object result;
// Is there a local transaction associated with GTX ?
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-04
23:48:43 UTC (rev 7648)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/CacheMarshaller200.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -21,21 +21,6 @@
*/
package org.jboss.cache.marshall;
-import org.jboss.cache.CacheException;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.Region;
-import org.jboss.cache.Region.Status;
-import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.factories.annotations.Inject;
-import org.jboss.cache.optimistic.DefaultDataVersion;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.util.FastCopyHashMap;
-import org.jboss.cache.util.Immutables;
-import org.jgroups.Address;
-import org.jgroups.stack.IpAddress;
-
import java.io.Externalizable;
import java.io.IOException;
import java.io.ObjectInputStream;
@@ -53,6 +38,23 @@
import java.util.TreeMap;
import java.util.TreeSet;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Region;
+import org.jboss.cache.Region.Status;
+import org.jboss.cache.buddyreplication.GravitateResult;
+import org.jboss.cache.commands.CommandsFactory;
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.factories.annotations.Inject;
+import org.jboss.cache.optimistic.DefaultDataVersion;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.transaction.TransactionLog.LogEntry;
+import org.jboss.cache.util.FastCopyHashMap;
+import org.jboss.cache.util.Immutables;
+import org.jgroups.Address;
+import org.jgroups.stack.IpAddress;
+
/**
* An enhanced marshaller for RPC calls between CacheImpl instances.
*
@@ -90,10 +92,12 @@
protected static final int MAGICNUMBER_FLOAT = 27;
protected static final int MAGICNUMBER_DOUBLE = 28;
protected static final int MAGICNUMBER_OBJECT = 29;
+ protected static final int MAGICNUMBER_TXLOG_ENTRY = 50;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
protected static final int MAGICNUMBER_REF = 101;
+
protected static final InactiveRegionException IRE = new
InactiveRegionException("Cannot unmarshall to an inactive region");
public CacheMarshaller200()
@@ -343,6 +347,11 @@
if (useRefs) writeReference(out, createReference(o, refMap));
marshallGlobalTransaction((GlobalTransaction) o, out, refMap);
}
+ else if (o instanceof LogEntry)
+ {
+ out.writeByte(MAGICNUMBER_TXLOG_ENTRY);
+ marshallLogEntry((LogEntry)o, out, refMap);
+ }
else if (o instanceof IpAddress)
{
out.writeByte(MAGICNUMBER_IPADDRESS);
@@ -455,6 +464,12 @@
}
}
+ private void marshallLogEntry(LogEntry log, ObjectOutputStream out, Map<Object,
Integer> refMap) throws Exception
+ {
+ marshallObject(log.getTransaction(), out, refMap);
+ marshallObject(log.getModifications(), out, refMap);
+ }
+
private void marshallGravitateResult(GravitateResult gravitateResult,
ObjectOutputStream out, Map<Object, Integer> refMap) throws Exception
{
marshallObject(gravitateResult.isDataFound(), out, refMap);
@@ -605,6 +620,8 @@
retVal = unmarshallGlobalTransaction(in, refMap);
if (useRefs) refMap.putReferencedObject(reference, retVal);
return retVal;
+ case MAGICNUMBER_TXLOG_ENTRY:
+ return unmarshallLogEntry(in, refMap);
case MAGICNUMBER_IPADDRESS:
retVal = unmarshallIpAddress(in);
return retVal;
@@ -666,6 +683,15 @@
throw new Exception("Unknown magic number " + magicNumber);
}
+ @SuppressWarnings("unchecked")
+ private Object unmarshallLogEntry(ObjectInputStream in, UnmarshalledReferences refMap)
throws Exception
+ {
+ GlobalTransaction gtx = (GlobalTransaction)unmarshallObject(in, refMap);
+ List<WriteCommand> mods = (List<WriteCommand>)unmarshallObject(in,
refMap);
+
+ return new LogEntry(gtx, mods);
+ }
+
private FastCopyHashMap unmarshallFastCopyHashMap(ObjectInputStream in,
UnmarshalledReferences refMap) throws Exception
{
FastCopyHashMap map = new FastCopyHashMap();
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/CommandAwareRpcDispatcher.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -21,6 +21,16 @@
*/
package org.jboss.cache.marshall;
+import java.io.NotSerializableException;
+import java.util.Vector;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.cache.InvocationContext;
import org.jboss.cache.commands.ReplicableCommand;
import org.jboss.cache.commands.VisitableCommand;
@@ -31,7 +41,9 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.lock.TimeoutException;
import org.jboss.cache.util.concurrent.BoundedExecutors;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.Address;
import org.jgroups.Channel;
@@ -44,16 +56,6 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import java.io.NotSerializableException;
-import java.util.Vector;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* A JGroups RPC dispatcher that knows how to deal with {@link
org.jboss.cache.commands.ReplicableCommand}s.
*
@@ -69,24 +71,27 @@
private ExecutorService replicationProcessor;
private AtomicInteger replicationProcessorCount;
private boolean asyncSerial;
+ private Configuration configuration;
+ private ReclosableLatch flushGate;
private ReplicationObserver replicationObserver;
- public CommandAwareRpcDispatcher()
- {
- }
+ public CommandAwareRpcDispatcher() {}
- public CommandAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2, Object serverObj,
- InvocationContextContainer container,
InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry)
+ public CommandAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2,
+ Object serverObj, InvocationContextContainer
container, InterceptorChain interceptorChain,
+ ComponentRegistry componentRegistry, ReclosableLatch
flushGate)
{
super(channel, l, l2, serverObj);
this.invocationContextContainer = container;
this.componentRegistry = componentRegistry;
this.interceptorChain = interceptorChain;
+ this.flushGate = flushGate;
+
trace = log.isTraceEnabled();
// what sort of a repl processor do we need?
Configuration c = componentRegistry.getComponent(Configuration.class);
+ this.configuration = c;
replicationProcessor = c.getRuntimeConfig().getAsyncSerializationExecutor();
if (c.getCacheMode().isSynchronous() ||
(replicationProcessor == null && c.getSerializationExecutorPoolSize()
< 1) || requireSyncMarshalling(c)) // if an executor has not been injected and the pool
size is set
@@ -125,7 +130,7 @@
/**
* Serial(sync) marshalling should be enabled for async optimistic caches. That is
because optimistic async is a 2PC,
* which might cause the Commit command to be send before the Prepare command, so
replication will fail. This is not
- * the same for async <b>pessimistic/mvcc</b> replication, as this uses a
1PC.
+ * the same for async <b>pessimistic/mvcc</b> replication, as this uses a
1PC.
*/
private boolean requireSyncMarshalling(Configuration c)
{
@@ -253,6 +258,11 @@
if (cmd == null) throw new NullPointerException("Unable to execute a null
command! Message was " + req);
if (trace) log.trace("Executing command: " + cmd + "
[sender=" + req.getSrc() + "]");
+ if (channel.flushSupported() &&
!flushGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+ {
+ throw new TimeoutException("State retrieval timed out waiting for flush
unblock. (timeout = " + configuration.getStateRetrievalTimeout() + " millis)
");
+ }
+
if (cmd instanceof VisitableCommand)
{
InvocationContext ctx = invocationContextContainer.get();
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/main/java/org/jboss/cache/marshall/InactiveRegionAwareRpcDispatcher.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -25,6 +25,7 @@
import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.interceptors.InterceptorChain;
import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jgroups.Channel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
@@ -46,9 +47,9 @@
*/
public InactiveRegionAwareRpcDispatcher(Channel channel, MessageListener l,
MembershipListener l2, Object serverObj,
InvocationContextContainer container,
InterceptorChain interceptorChain,
- ComponentRegistry componentRegistry)
+ ComponentRegistry componentRegistry,
ReclosableLatch flushBlockGate)
{
- super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry);
+ super(channel, l, l2, serverObj, container, interceptorChain, componentRegistry,
flushBlockGate);
}
@Override
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -21,25 +21,29 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
import org.jboss.cache.InternalNode;
import org.jboss.cache.Node;
+import org.jboss.cache.RPCManager;
import org.jboss.cache.Version;
+import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
+import org.jboss.cache.transaction.TransactionLog;
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
public class DefaultStateTransferGenerator implements StateTransferGenerator
{
@@ -48,13 +52,27 @@
private Log log = LogFactory.getLog(getClass().getName());
private CacheSPI cache;
+ private RPCManager rpcManager;
private Set<Fqn> internalFqns;
+ private boolean nonBlocking;
+ private long flushTimeout;
+ private int maxNonProgressingLogWrites = 5;
+ private TransactionLog txLog;
+
+
+
@Inject
- public void inject(CacheSPI cache)
+ public void inject(CacheSPI cache, RPCManager rpcManager, Configuration configuration,
TransactionLog txLog)
{
this.cache = cache;
+ this.nonBlocking = true;
+
+ this.flushTimeout = configuration.getStateRetrievalTimeout();
+ this.nonBlocking = configuration.isNonBlockingStateTransfer();
+ this.txLog = txLog;
+ this.rpcManager = rpcManager;
}
@Start(priority = 14)
@@ -72,6 +90,16 @@
cache.getMarshaller().objectToObjectStream(STATE_TRANSFER_VERSION, out);
if (generateTransient)
{
+ if (nonBlocking)
+ {
+ if (! txLog.activate())
+ throw new CacheException("Busy performing state transfer for
someone else");
+
+ if (log.isTraceEnabled())
+ log.trace("Transaction log activated!");
+
+ }
+
//transient + marker
if (log.isTraceEnabled())
{
@@ -128,12 +156,49 @@
}
}
delimitStream(out);
+
+ if (nonBlocking && generateTransient)
+ {
+ for (int nonProgress = 0, size = txLog.size(); nonProgress <
maxNonProgressingLogWrites && size > 0;)
+ {
+ if (log.isTraceEnabled())
+ log.trace("Tx Log remaining entries = " + size);
+ txLog.writeCommitLog(cache.getMarshaller(), out);
+ int newSize = txLog.size();
+
+ // If size did not decrease then we did not make progress, and could be
wasting
+ // our time. Limit this to the specified max.
+ if (newSize >= size)
+ nonProgress++;
+
+ size = newSize;
+ }
+
+ // Signal to sender that we need a flush to get a consistent view
+ // of the remaining transactions.
+ delimitStream(out);
+ out.flush();
+ rpcManager.waitForFlush(flushTimeout);
+
+ // Write remaining transactions
+ txLog.writeCommitLog(cache.getMarshaller(), out);
+ delimitStream(out);
+
+ // Write all non-completed prepares
+ txLog.writePendingPrepares(cache.getMarshaller(), out);
+ delimitStream(out);
+ }
+
}
catch (Exception e)
{
cache.getMarshaller().objectToObjectStream(new NodeDataExceptionMarker(e,
cache.getLocalAddress()), out);
throw e;
}
+ finally
+ {
+ txLog.deactivate();
+ }
}
private Fqn getFqn(Object o)
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -21,6 +21,16 @@
*/
package org.jboss.cache.statetransfer;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheException;
@@ -30,26 +40,27 @@
import org.jboss.cache.InvocationContext;
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
+import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyManager;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.ComponentRegistry;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.Start;
+import org.jboss.cache.interceptors.InterceptorChain;
+import org.jboss.cache.invocation.InvocationContextContainer;
import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.NodeData;
import org.jboss.cache.marshall.NodeDataExceptionMarker;
import org.jboss.cache.marshall.NodeDataMarker;
import org.jboss.cache.notifications.event.NodeModifiedEvent;
+import org.jboss.cache.transaction.TransactionLog;
+import org.jboss.cache.transaction.TransactionLog.LogEntry;
+import org.jgroups.Address;
+import org.jgroups.Channel;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
public class DefaultStateTransferIntegrator implements StateTransferIntegrator
{
@@ -60,13 +71,25 @@
private Set<Fqn> internalFqns;
private Configuration cfg;
+ private RPCManager manager;
+ private TransactionLog txLog;
private boolean needToPersistState; // for JBCACHE-131
+ private boolean nonBlocking;
+ private InvocationContextContainer container;
+ private InterceptorChain chain;
+ private ComponentRegistry registry;
@Inject
- public void inject(CacheSPI<?, ?> cache, Configuration cfg)
+ public void inject(CacheSPI<?, ?> cache, Configuration cfg, RPCManager
rpcManager, TransactionLog txLog, InvocationContextContainer container, InterceptorChain
chain, ComponentRegistry registry)
{
this.cache = cache;
this.cfg = cfg;
+ this.manager = rpcManager;
+ this.nonBlocking = cfg.isNonBlockingStateTransfer();
+ this.txLog = txLog;
+ this.container = container;
+ this.chain = chain;
+ this.registry = registry;
}
@Start(priority = 14)
@@ -89,8 +112,103 @@
{
integratePersistentState(ois, targetRoot);
}
+
+ // Delimiter
+ verifyMarker(cache.getMarshaller().objectFromObjectStream(ois));
+
+ if (nonBlocking)
+ integrateTxLog(ois);
}
+ private void integrateTxLog(ObjectInputStream ois) throws Exception
+ {
+ if (trace)
+ log.trace("Integrating transaction log");
+
+ processCommitLog(ois);
+
+ Channel channel = manager.getChannel();
+
+ List<Address> targets = new ArrayList<Address>(2);
+ targets.add(channel.getLocalAddress());
+ targets.add(manager.getLastStateTransferSource());
+
+ if (trace)
+ log.trace("Flushing targets: " + targets);
+
+ if (!channel.startFlush(targets, false))
+ throw new CacheException("Could not flush channel! State-transfer
failed!");
+
+ try
+ {
+ if (trace)
+ log.trace("Retrieving/Applying post-flush commits");
+ processCommitLog(ois);
+
+ if (trace)
+ log.trace("Retrieving/Applying pending prepares");
+ Object object = cache.getMarshaller().objectFromObjectStream(ois);
+ while (object instanceof PrepareCommand)
+ {
+ PrepareCommand command = (PrepareCommand)object;
+ if (! txLog.hasPendingPrepare(command))
+ {
+ InvocationContext ctx = container.get();
+ ctx.setOriginLocal(false);
+ ctx.getOptionOverrides().setCacheModeLocal(true);
+ ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
+ chain.invoke(ctx, command);
+ }
+ object = cache.getMarshaller().objectFromObjectStream(ois);
+ }
+ verifyMarker(object);
+
+ // Block all remote commands once transfer is complete,
+ // and before FLUSH completes
+ registry.setBlockInStarting(true);
+ }
+ finally
+ {
+ if (trace)
+ log.trace("Stopping flush");
+ channel.stopFlush(targets);
+ }
+ }
+
+ private void processCommitLog(ObjectInputStream ois) throws Exception
+ {
+ Object object = cache.getMarshaller().objectFromObjectStream(ois);
+ while (object instanceof LogEntry)
+ {
+ List<WriteCommand> mods = ((LogEntry)object).getModifications();
+ log.trace("Mods = " + mods);
+ for (WriteCommand mod : mods)
+ {
+ InvocationContext ctx = container.get();
+ ctx.setOriginLocal(false);
+ ctx.getOptionOverrides().setCacheModeLocal(true);
+ ctx.getOptionOverrides().setSkipCacheStatusCheck(true);
+ chain.invoke(ctx, mod);
+ }
+
+ object = cache.getMarshaller().objectFromObjectStream(ois);
+ }
+ verifyMarker(object);
+ }
+
+ private void verifyMarker(Object object)
+ {
+ if (object instanceof NodeDataExceptionMarker)
+ {
+ NodeDataExceptionMarker e = (NodeDataExceptionMarker)object;
+ throw new CacheException("Error in state transfer stream",
e.getCause());
+ }
+ else if (! (object instanceof NodeDataMarker))
+ {
+ throw new CacheException("Invalid object unmarshalled");
+ }
+ }
+
protected void integrateTransientState(ObjectInputStream in, InternalNode target)
throws Exception
{
boolean transientSet = false;
Modified:
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -51,7 +51,6 @@
protected static final boolean trace = log.isTraceEnabled();
public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
-
public static final String PARTIAL_STATE_DELIMITER =
"_PARTIAL_STATE_DELIMITER";
protected CacheSPI cache;
Added: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
(rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -0,0 +1,179 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+*/
+package org.jboss.cache.transaction;
+
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.marshall.Marshaller;
+
+/**
+ * Logs transactions and writes for Non-Blocking State Transfer
+ *
+ * @author Jason T. Greene
+ */
+public class TransactionLog
+{
+ private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new
ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
+ private final BlockingQueue<LogEntry> entries = new
LinkedBlockingQueue<LogEntry>();
+ private AtomicBoolean active = new AtomicBoolean();
+
+ public static class LogEntry
+ {
+ private final GlobalTransaction transaction;
+ private final List<WriteCommand> modifications;
+
+ public LogEntry(GlobalTransaction transaction, List<WriteCommand>
modifications)
+ {
+ this.transaction = transaction;
+ this.modifications = modifications;
+ }
+
+ public GlobalTransaction getTransaction()
+ {
+ return transaction;
+ }
+
+ public List<WriteCommand> getModifications()
+ {
+ return modifications;
+ }
+ }
+
+ private Log log = LogFactory.getLog(getClass().getName());
+
+ public void logPrepare(PrepareCommand command)
+ {
+ pendingPrepares.put(command.getGlobalTransaction(), command);
+ }
+
+ public void logCommit(GlobalTransaction gtx)
+ {
+ PrepareCommand command = pendingPrepares.remove(gtx);
+ if (command == null)
+ {
+ log.error("Could not find matching prepare for commit: " + gtx);
+ return;
+ }
+
+ addEntry(new LogEntry(gtx, command.getModifications()));
+ }
+
+ private void addEntry(LogEntry entry)
+ {
+ if (! isActive())
+ return;
+
+ for (;;)
+ {
+ try
+ {
+ if (log.isTraceEnabled())
+ log.trace("Added commit entry to tx log" + entry);
+
+ entries.put(entry);
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand>
modifications)
+ {
+ // Just in case...
+ if (gtx != null) pendingPrepares.remove(gtx);
+ addEntry(new LogEntry(gtx, modifications));
+ }
+
+ public void logNoTxWrite(WriteCommand write)
+ {
+ if (! isActive())
+ return;
+
+ ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
+ list.add(write);
+ addEntry(new LogEntry(null, list));
+ }
+
+ public void rollback(GlobalTransaction gtx)
+ {
+ pendingPrepares.remove(gtx);
+ }
+
+ public boolean isActive()
+ {
+ return active.get();
+ }
+
+ public boolean activate()
+ {
+ return active.compareAndSet(false, true);
+ }
+
+ public void deactivate()
+ {
+ active.set(false);
+ entries.clear();
+ }
+
+ public int size()
+ {
+ return entries.size();
+ }
+
+ public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws
Exception
+ {
+ List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+
+ while (entries.drainTo(buffer, 10) > 0)
+ {
+ for (LogEntry entry : buffer)
+ marshaller.objectToObjectStream(entry, out);
+
+ buffer.clear();
+ }
+ }
+
+ public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws
Exception
+ {
+ for (PrepareCommand entry : pendingPrepares.values())
+ marshaller.objectToObjectStream(entry, out);
+ }
+
+ public boolean hasPendingPrepare(PrepareCommand command)
+ {
+ return pendingPrepares.containsKey(command.getGlobalTransaction());
+ }
+}
Property changes on:
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added:
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
(rev 0)
+++
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -0,0 +1,247 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at
gnu.org.
+ */
+
+package org.jboss.cache.statetransfer;
+
+import static org.testng.AssertJUnit.assertEquals;
+
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.UnitTestCacheFactory;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Configuration.CacheMode;
+import org.jboss.cache.factories.UnitTestConfigurationFactory;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+@Test(groups="functional")
+public class NonBlockingStateTransferTest
+{
+ public static final Fqn A = Fqn.fromString("/a");
+ public static final Fqn B = Fqn.fromString("/b");
+ public static final Fqn C = Fqn.fromString("/c");
+ protected static final String ADDRESS_CLASSNAME =
"org.jboss.cache.marshall.data.Address";
+ protected static final String PERSON_CLASSNAME =
"org.jboss.cache.marshall.data.Person";
+ public static final Fqn A_B = Fqn.fromString("/a/b");
+ public static final Fqn A_C = Fqn.fromString("/a/c");
+ public static final Fqn A_D = Fqn.fromString("/a/d");
+ public static final String JOE = "JOE";
+ public static final String BOB = "BOB";
+ public static final String JANE = "JANE";
+ public static final Integer TWENTY = 20;
+ public static final Integer FORTY = 40;
+
+ public static class DelayTransfer implements Serializable
+ {
+ private transient int count;
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException
+ {
+ in.defaultReadObject();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException
+ {
+ out.defaultWriteObject();
+
+ // RPC is first serialization, ST is second
+ if (count++ == 0)
+ return;
+
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ }
+
+ }
+ private static class WritingRunner implements Runnable
+ {
+ private final Cache<Object,Object> cache;
+ private final boolean tx;
+ private volatile boolean stop;
+ private volatile int result;
+
+ WritingRunner(Cache<Object, Object> cache, boolean tx)
+ {
+ this.cache = cache;
+ this.tx = tx;
+ }
+
+ public int result()
+ {
+ return result;
+ }
+
+ public void run()
+ {
+ int c = 0;
+ while (!stop)
+ {
+ try
+ {
+ if (tx)
+
cache.getConfiguration().getRuntimeConfig().getTransactionManager().begin();
+ cache.put("/test" + c, "test", c++);
+ if (tx)
+
cache.getConfiguration().getRuntimeConfig().getTransactionManager().commit();
+ }
+ catch (Exception e)
+ {
+ }
+ }
+ result = c;
+ }
+
+ public void stop()
+ {
+ stop = true;
+ }
+ }
+
+ private CacheSPI<Object, Object> createCache(String name)
+ {
+ Configuration config =
UnitTestConfigurationFactory.createConfiguration(CacheMode.REPL_SYNC);
+ config.setClusterName(name + "-" + Thread.currentThread().getName());
+ config.setNonBlockingStateTransfer(true);
+ CacheSPI<Object, Object> cache = (CacheSPI<Object, Object>) new
UnitTestCacheFactory<Object, Object>().createCache(config, false, getClass());
+
+ // Use marshaller
+
+ cache.create();
+ cache.start();
+ return cache;
+ }
+
+ public void testInitialStateTransfer() throws Exception
+ {
+ CacheSPI<Object, Object> cache1 = createCache("nbst");
+
+ writeInitialData(cache1);
+
+ CacheSPI<Object, Object> cache2 = createCache("nbst");
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ verifyInitialData(cache2);
+
+ TestingUtil.killCaches(cache1, cache2);
+ }
+
+
+ public void testSTWithThirdWritingNonTxCache() throws Exception
+ {
+ thirdWritingCacheTest(false, "nbst1");
+ }
+
+ public void testSTWithThirdWritingTxCache() throws Exception
+ {
+ thirdWritingCacheTest(true, "nbst2");
+ }
+
+ public void testSTWithWritingNonTxThread() throws Exception
+ {
+ writingThreadTest(false, "nbst3");
+ }
+
+ public void testSTWithWritingTxThread() throws Exception
+ {
+ writingThreadTest(true, "nbst4");
+ }
+
+
+ private void thirdWritingCacheTest(boolean tx, String name) throws
InterruptedException
+ {
+ final CacheSPI<Object, Object> cache1 = createCache(name);
+ final CacheSPI<Object, Object> cache3 = createCache(name);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("/delay", "delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache3, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ CacheSPI<Object, Object> cache2 = createCache(name);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2, cache3},
60000);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assertEquals(c, cache2.get("/test" + c, "test"));
+
+ TestingUtil.killCaches(cache1, cache2, cache3);
+ }
+
+ private void verifyInitialData(CacheSPI<Object, Object> cache2)
+ {
+ assertEquals("Incorrect name for /a/b", JOE, cache2.get(A_B,
"name"));
+ assertEquals("Incorrect age for /a/b", TWENTY, cache2.get(A_B,
"age"));
+ assertEquals("Incorrect name for /a/c", BOB, cache2.get(A_C,
"name"));
+ assertEquals("Incorrect age for /a/c", FORTY, cache2.get(A_C,
"age"));
+ }
+
+ private void writeInitialData(final CacheSPI<Object, Object> cache1)
+ {
+ cache1.put(A_B, "name", JOE);
+ cache1.put(A_B, "age", TWENTY);
+ cache1.put(A_C, "name", BOB);
+ cache1.put(A_C, "age", FORTY);
+ }
+
+ private void writingThreadTest(boolean tx, String name) throws InterruptedException
+ {
+ final CacheSPI<Object, Object> cache1 = createCache(name);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("/delay", "delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache1, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ CacheSPI<Object, Object> cache2 = createCache(name);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assertEquals(c, cache2.get("/test" + c, "test"));
+
+ TestingUtil.killCaches(cache1, cache2);
+ }
+}
Property changes on:
core/trunk/src/test/java/org/jboss/cache/statetransfer/NonBlockingStateTransferTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java
===================================================================
---
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2009-02-04
23:48:43 UTC (rev 7648)
+++
core/trunk/src/test/java/org/jboss/cache/transaction/PrepareCommitContentionTest.java 2009-02-04
23:54:51 UTC (rev 7649)
@@ -156,5 +156,15 @@
{
return delegate.getChannel();
}
+
+ public Address getLastStateTransferSource()
+ {
+ return delegate.getLastStateTransferSource();
+ }
+
+ public void waitForFlush(long timeout)
+ {
+ delegate.waitForFlush(timeout);
+ }
}
}
Modified: core/trunk/src/test/resources/log4j.xml
===================================================================
--- core/trunk/src/test/resources/log4j.xml 2009-02-04 23:48:43 UTC (rev 7648)
+++ core/trunk/src/test/resources/log4j.xml 2009-02-04 23:54:51 UTC (rev 7649)
@@ -57,7 +57,7 @@
<!-- ================ -->
<category name="org.jboss.cache">
- <priority value="WARN"/>
+ <priority value="TRACE"/>
</category>
<!-- these two are in separate sections since they