[jbosscache-commits] JBoss Cache SVN: r6941 - in core/branches/flat/src/main/java/org/jboss/starobrno: eviction and 3 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Tue Oct 14 13:43:45 EDT 2008
Author: mircea.markus
Date: 2008-10-14 13:43:45 -0400 (Tue, 14 Oct 2008)
New Revision: 6941
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
Log:
enabling replication
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,7 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.EntryFactory;
@@ -164,7 +164,7 @@
* @param fqn Fqn to lock
* @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
* @throws InterruptedException if interrupted
- * @throws org.jboss.cache.lock.TimeoutException
+ * @throws org.jboss.starobrno.lock.TimeoutException
* if we are unable to acquire the lock after a specified timeout.
*/
private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -25,7 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.CacheSPI_Legacy;
import org.jboss.cache.Fqn;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.EvictionAlgorithmConfig;
import org.jboss.starobrno.eviction.EvictionEvent.Type;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -26,7 +26,7 @@
import org.jboss.cache.CacheStatus;
import org.jboss.cache.Version;
import org.jboss.cache.util.BeanUtils;
-import org.jboss.cache.util.reflect.ReflectionUtil;
+import org.jboss.starobrno.util.ReflectionUtil;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
import org.jboss.starobrno.config.Configuration;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,14 +22,6 @@
package org.jboss.starobrno.factories;
-import org.jboss.cache.RegionRegistry;
-import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
-import org.jboss.cache.invocation.CacheInvocationDelegate;
-import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.VersionAwareMarshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.config.ConfigurationException;
@@ -39,6 +31,16 @@
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.remoting.RPCManagerImpl;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.cache.lock.LockStrategyFactory;
+import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
+import org.jboss.cache.RegionRegistry;
/**
* Simple factory that just uses reflection and an empty constructor of the component type.
@@ -47,7 +49,7 @@
* @since 2.1.0
*/
@DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
- ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
+ ChannelMessageListener.class, CacheLoaderManager.class, ExtendedMarshaller.class, InvocationContextContainer.class,
CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryCreator.class,
LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
ContextFactory.class, EntryFactory.class, CommandsFactory.class})
@@ -61,16 +63,20 @@
if (componentType.isInterface())
{
Class componentImpl;
- if (componentType.equals(Marshaller.class))
+ if (componentType.equals(ExtendedMarshaller.class))
{
- componentImpl = VersionAwareMarshaller.class;
+ componentImpl = CacheMarshallerStarobrno.class;
}
+ else
+ if (componentType.equals(RPCManager.class))
+ {
+ componentImpl = RPCManagerImpl.class;
+ }
else
{
// add an "Impl" to the end of the class name and try again
componentImpl = getClass().getClassLoader().loadClass(componentType.getName() + "Impl");
}
-
return componentType.cast(componentImpl.newInstance());
}
else
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,7 +21,7 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.cache.cluster.ReplicationQueue;
+import org.jboss.starobrno.cluster.ReplicationQueue;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,10 +22,10 @@
package org.jboss.starobrno.factories;
import org.jboss.cache.util.BeanUtils;
+import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.RuntimeConfig;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
-import org.jboss.starobrno.remoting.RPCManager;
import java.lang.reflect.Method;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,12 +21,12 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
/**
- * Constructs {@link org.jboss.cache.statetransfer.StateTransferManager} instances.
+ * Constructs {@link org.jboss.starobrno.statetransfer.StateTransferManager} instances.
*
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @since 3.0
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -34,9 +34,9 @@
import org.jboss.starobrno.config.Option;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.RPCManager;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -29,6 +29,7 @@
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.marshall.MarshalledValue;
import org.jboss.starobrno.marshall.MarshalledValueHelper;
+//import org.jboss.starobrno.marshall.MarshalledValueHelper;
import java.io.IOException;
import java.io.NotSerializableException;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.*;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.interceptors.base.BaseRpcInterceptor;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+
+/**
+ * Takes care of replicating modifications to other nodes in a cluster. Also
+ * listens for prepare(), commit() and rollback() messages which are received
+ * 'side-ways' (see docs/design/Refactoring.txt).
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class ReplicationInterceptor extends BaseRpcInterceptor
+{
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ if (!skipReplicationOfTransactionMethod(ctx))
+ replicateCall(ctx, command, configuration.isSyncCommitPhase(), ctx.getOptionOverrides(), true);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+ {
+ Object retVal = invokeNextInterceptor(ctx, command);
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext.hasLocalModifications())
+ {
+ PrepareCommand replicablePrepareCommand = command.copy(); // makre sure we remove any "local" transactions
+ replicablePrepareCommand.removeModifications(transactionContext.getLocalModifications());
+ command = replicablePrepareCommand;
+ }
+
+ if (!skipReplicationOfTransactionMethod(ctx)) runPreparePhase(command, command.getGlobalTransaction(), ctx);
+ return retVal;
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ if (!skipReplicationOfTransactionMethod(ctx) && !ctx.isLocalRollbackOnly())
+ {
+ replicateCall(ctx, command, configuration.isSyncRollbackPhase(), ctx.getOptionOverrides());
+ }
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ @Override
+ public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable
+ {
+ return handleCrudMethod(ctx, command);
+ }
+
+ /**
+ * If we are within one transaction we won't do any replication as replication would only be performed at commit time.
+ * If the operation didn't originate locally we won't do any replication either.
+ */
+ private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command)
+ throws Throwable
+ {
+ boolean local = isLocalModeForced(ctx);
+ if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
+ // FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally do we attempt to replicate.
+ Object returnValue = invokeNextInterceptor(ctx, command);
+ if (ctx.getTransaction() == null && ctx.isOriginLocal())
+ {
+ if (trace)
+ {
+ log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+ configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
+ configuration.getSyncReplTimeout());
+ }
+
+ replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
+ }
+ else
+ {
+ if (local) ctx.getTransactionContext().addLocalModification(command);
+ }
+ return returnValue;
+ }
+
+ /**
+ * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self.
+ * Waits for all responses. If one of the members failed to prepare, its return value
+ * will be an exception. If there is one exception we rethrow it. This will mark the
+ * current transaction as rolled back, which will cause the
+ * afterCompletion(int) callback to have a status
+ * of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
+ * transaction.<br/>
+ * If everything runs okay, the afterCompletion(int)
+ * callback will trigger the @link #runCommitPhase(GlobalTransaction)).
+ * <br/>
+ *
+ * @throws Exception
+ */
+ protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
+ {
+ boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+ if (trace)
+ {
+ log.trace("(" + rpcManager.getLocalAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
+ }
+
+ // this method will return immediately if we're the only member (because exclude_self=true)
+ replicateCall(ctx, prepareMethod, !async, ctx.getOptionOverrides());
+ }
+}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,6 +23,7 @@
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.commands.VisitableCommand;
@@ -40,7 +41,7 @@
import org.jboss.starobrno.jmx.annotations.ManagedOperation;
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.RPCManager;
+
import org.jboss.starobrno.remoting.ReplicationException;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,208 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors.base;
+
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.cluster.ReplicationQueue;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
+
+import javax.transaction.Transaction;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Acts as a base for all RPC calls - subclassed by
+ * {@link org.jboss.cache.interceptors.ReplicationInterceptor} and {@link OptimisticReplicationInterceptor}.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+public abstract class BaseRpcInterceptor extends CommandInterceptor
+{
+ private ReplicationQueue replicationQueue;
+ protected TransactionTable txTable;
+ private CommandsFactory commandsFactory;
+
+ protected RPCManager rpcManager;
+ protected boolean defaultSynchronous;
+
+ @Inject
+ public void injectComponents(RPCManager rpcManager, ReplicationQueue replicationQueue,
+ TransactionTable txTable, CommandsFactory commandsFactory)
+ {
+ this.rpcManager = rpcManager;
+ this.replicationQueue = replicationQueue;
+ this.txTable = txTable;
+ this.commandsFactory = commandsFactory;
+ }
+
+ @Start
+ public void init()
+ {
+ defaultSynchronous = configuration.getCacheMode().isSynchronous();
+ }
+
+ /**
+ * Checks whether any of the responses are exceptions. If yes, re-throws
+ * them (as exceptions or runtime exceptions).
+ */
+ protected void checkResponses(List rsps) throws Throwable
+ {
+ if (rsps != null)
+ {
+ for (Object rsp : rsps)
+ {
+ if (rsp != null && rsp instanceof Throwable)
+ {
+ // lets print a stack trace first.
+ if (log.isDebugEnabled())
+ log.debug("Received Throwable from remote node", (Throwable) rsp);
+ throw (Throwable) rsp;
+ }
+ }
+ }
+ }
+
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+ {
+ replicateCall(ctx, null, call, sync, o, useOutOfBandMessage);
+ }
+
+ protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable
+ {
+ replicateCall(ctx, null, call, sync, o, false);
+ }
+
+ protected void replicateCall(InvocationContext ctx, Vector<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+ {
+ long syncReplTimeout = configuration.getSyncReplTimeout();
+
+ // test for option overrides
+ if (o != null)
+ {
+ if (o.isForceAsynchronous()) sync = false;
+ else if (o.isForceSynchronous()) sync = true;
+
+ if (o.getSyncReplTimeout() > 0) syncReplTimeout = o.getSyncReplTimeout();
+ }
+
+ // tx-level overrides are more important
+ Transaction tx = ctx.getTransaction();
+ if (tx != null)
+ {
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext != null)
+ {
+ if (transactionContext.isForceAsyncReplication()) sync = false;
+ else if (transactionContext.isForceSyncReplication()) sync = true;
+ }
+ }
+
+ replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
+ }
+
+ protected void replicateCall(Vector<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable
+ {
+ if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
+
+ if (!sync && replicationQueue != null)
+ {
+ if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
+ replicationQueue.add(commandsFactory.buildReplicateCommand(call));
+ }
+ else
+ {
+ Vector<Address> callRecipients = recipients;
+ if (callRecipients == null)
+ {
+ callRecipients = null;
+ if (trace)
+ log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
+ }
+
+ ReplicableCommand toCall = wrapCacheCommandInReplicateMethod ? commandsFactory.buildReplicateCommand(call) : call;
+
+ List rsps = rpcManager.callRemoteMethods(callRecipients,
+ toCall,
+ sync, // is synchronised?
+ timeout,
+ useOutOfBandMessage
+ );
+ if (trace) log.trace("responses=" + rsps);
+ if (sync) checkResponses(rsps);
+ }
+ }
+
+ /**
+ * It does not make sense replicating a transaction method(commit, rollback, prepare) if one of the following is true:
+ * <pre>
+ * - call was not initiated here, but on other member of the cluster
+ * - there is no transaction. Why broadcast a commit or rollback if there is no transaction going on?
+ * - the current transaction did not modify any data
+ * </pre>
+ */
+ protected boolean skipReplicationOfTransactionMethod(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ return ctx.getTransaction() == null || gtx == null || gtx.isRemote() || ctx.getOptionOverrides().isCacheModeLocal() || !ctx.getTransactionContext().hasModifications();
+ }
+
+ /**
+ * The call runs in a transaction and it was initiated on this node of the cluster.
+ */
+ protected boolean isTransactionalAndLocal(InvocationContext ctx)
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ boolean isInitiatedHere = gtx != null && !gtx.isRemote();
+ return isInitiatedHere && (ctx.getTransaction() != null);
+ }
+
+ protected boolean isSynchronous(Option option)
+ {
+ if (option != null)
+ {
+ if (option.isForceSynchronous())
+ return true;
+ else if (option.isForceAsynchronous())
+ return false;
+ }
+ return defaultSynchronous;
+ }
+
+ protected boolean isLocalModeForced(InvocationContext ctx)
+ {
+ if (ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal())
+ {
+ if (log.isDebugEnabled()) log.debug("LOCAL mode forced on invocation. Suppressing clustered events.");
+ return true;
+ }
+ return false;
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
More information about the jbosscache-commits
mailing list