[jbosscache-commits] JBoss Cache SVN: r6941 - in core/branches/flat/src/main/java/org/jboss/starobrno: eviction and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Oct 14 13:43:45 EDT 2008


Author: mircea.markus
Date: 2008-10-14 13:43:45 -0400 (Tue, 14 Oct 2008)
New Revision: 6941

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
   core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
Log:
enabling replication 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,7 +23,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.EntryFactory;
@@ -164,7 +164,7 @@
     * @param fqn Fqn to lock
     * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
     * @throws InterruptedException if interrupted
-    * @throws org.jboss.cache.lock.TimeoutException
+    * @throws org.jboss.starobrno.lock.TimeoutException
     *                              if we are unable to acquire the lock after a specified timeout.
     */
    private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/eviction/BaseEvictionAlgorithm.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -25,7 +25,7 @@
 import org.apache.commons.logging.LogFactory;
 import org.jboss.cache.CacheSPI_Legacy;
 import org.jboss.cache.Fqn;
-import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.lock.TimeoutException;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.config.EvictionAlgorithmConfig;
 import org.jboss.starobrno.eviction.EvictionEvent.Type;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -26,7 +26,7 @@
 import org.jboss.cache.CacheStatus;
 import org.jboss.cache.Version;
 import org.jboss.cache.util.BeanUtils;
-import org.jboss.cache.util.reflect.ReflectionUtil;
+import org.jboss.starobrno.util.ReflectionUtil;
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.CacheSPI;
 import org.jboss.starobrno.config.Configuration;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,14 +22,6 @@
 package org.jboss.starobrno.factories;
 
 
-import org.jboss.cache.RegionRegistry;
-import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
-import org.jboss.cache.invocation.CacheInvocationDelegate;
-import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.marshall.VersionAwareMarshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
 import org.jboss.starobrno.batch.BatchContainer;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.config.ConfigurationException;
@@ -39,6 +31,16 @@
 import org.jboss.starobrno.invocation.InvocationContextContainer;
 import org.jboss.starobrno.notifications.Notifier;
 import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.remoting.RPCManagerImpl;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.cache.loader.CacheLoaderManager;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.cache.lock.LockStrategyFactory;
+import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
+import org.jboss.cache.RegionRegistry;
 
 /**
  * Simple factory that just uses reflection and an empty constructor of the component type.
@@ -47,7 +49,7 @@
  * @since 2.1.0
  */
 @DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
-      ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
+      ChannelMessageListener.class, CacheLoaderManager.class, ExtendedMarshaller.class, InvocationContextContainer.class,
       CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryCreator.class,
       LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
       ContextFactory.class, EntryFactory.class, CommandsFactory.class})
@@ -61,16 +63,20 @@
          if (componentType.isInterface())
          {
             Class componentImpl;
-            if (componentType.equals(Marshaller.class))
+            if (componentType.equals(ExtendedMarshaller.class))
             {
-               componentImpl = VersionAwareMarshaller.class;
+               componentImpl = CacheMarshallerStarobrno.class;
             }
+            else 
+            if (componentType.equals(RPCManager.class))
+            {
+                componentImpl = RPCManagerImpl.class;
+            }
             else
             {
                // add an "Impl" to the end of the class name and try again
                componentImpl = getClass().getClassLoader().loadClass(componentType.getName() + "Impl");
             }
-
             return componentType.cast(componentImpl.newInstance());
          }
          else

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ReplicationQueueFactory.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,7 +21,7 @@
  */
 package org.jboss.starobrno.factories;
 
-import org.jboss.cache.cluster.ReplicationQueue;
+import org.jboss.starobrno.cluster.ReplicationQueue;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -22,10 +22,10 @@
 package org.jboss.starobrno.factories;
 
 import org.jboss.cache.util.BeanUtils;
+import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.config.ConfigurationException;
 import org.jboss.starobrno.config.RuntimeConfig;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
-import org.jboss.starobrno.remoting.RPCManager;
 
 import java.lang.reflect.Method;
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/StateTransferManagerFactory.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -21,12 +21,12 @@
  */
 package org.jboss.starobrno.factories;
 
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
 
 /**
- * Constructs {@link org.jboss.cache.statetransfer.StateTransferManager} instances.
+ * Constructs {@link org.jboss.starobrno.statetransfer.StateTransferManager} instances.
  *
  * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
  * @since 3.0

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -34,9 +34,9 @@
 import org.jboss.starobrno.config.Option;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.RPCManager;
 
 import javax.transaction.SystemException;
 import javax.transaction.Transaction;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/MarshalledValueInterceptor.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -29,6 +29,7 @@
 import org.jboss.starobrno.interceptors.base.CommandInterceptor;
 import org.jboss.starobrno.marshall.MarshalledValue;
 import org.jboss.starobrno.marshall.MarshalledValueHelper;
+//import org.jboss.starobrno.marshall.MarshalledValueHelper;
 
 import java.io.IOException;
 import java.io.NotSerializableException;

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/ReplicationInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/ReplicationInterceptor.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,164 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.*;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.interceptors.base.BaseRpcInterceptor;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+
+/**
+ * Takes care of replicating modifications to other nodes in a cluster. Also
+ * listens for prepare(), commit() and rollback() messages which are received
+ * 'side-ways' (see docs/design/Refactoring.txt).
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class ReplicationInterceptor extends BaseRpcInterceptor
+{
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      if (!skipReplicationOfTransactionMethod(ctx))
+         replicateCall(ctx, command, configuration.isSyncCommitPhase(), ctx.getOptionOverrides(), true);
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+   {
+      Object retVal = invokeNextInterceptor(ctx, command);
+      TransactionContext transactionContext = ctx.getTransactionContext();
+      if (transactionContext.hasLocalModifications())
+      {
+         PrepareCommand replicablePrepareCommand = command.copy(); // makre sure we remove any "local" transactions
+         replicablePrepareCommand.removeModifications(transactionContext.getLocalModifications());
+         command = replicablePrepareCommand;
+      }
+
+      if (!skipReplicationOfTransactionMethod(ctx)) runPreparePhase(command, command.getGlobalTransaction(), ctx);
+      return retVal;
+   }
+
+   @Override
+   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      if (!skipReplicationOfTransactionMethod(ctx) && !ctx.isLocalRollbackOnly())
+      {
+         replicateCall(ctx, command, configuration.isSyncRollbackPhase(), ctx.getOptionOverrides());
+      }
+      return invokeNextInterceptor(ctx, command);
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      return handleCrudMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+   {
+      return handleCrudMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+   {
+      return handleCrudMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+   {
+      return handleCrudMethod(ctx, command);
+   }
+
+   @Override
+   public Object visitReplaceCommand(InvocationContext ctx, ReplaceCommand command) throws Throwable
+   {
+      return handleCrudMethod(ctx, command);
+   }
+
+   /**
+    * If we are within one transaction we won't do any replication as replication would only be performed at commit time.
+    * If the operation didn't originate locally we won't do any replication either.
+    */
+   private Object handleCrudMethod(InvocationContext ctx, VisitableCommand command)
+         throws Throwable
+   {
+      boolean local = isLocalModeForced(ctx);
+      if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
+      // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.
+      Object returnValue = invokeNextInterceptor(ctx, command);
+      if (ctx.getTransaction() == null && ctx.isOriginLocal())
+      {
+         if (trace)
+         {
+            log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+                  configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
+                  configuration.getSyncReplTimeout());
+         }
+
+         replicateCall(ctx, command, isSynchronous(ctx.getOptionOverrides()), ctx.getOptionOverrides());
+      }
+      else
+      {
+         if (local) ctx.getTransactionContext().addLocalModification(command);
+      }
+      return returnValue;
+   }
+
+   /**
+    * Calls prepare(GlobalTransaction,List,org.jgroups.Address,boolean)) in all members except self.
+    * Waits for all responses. If one of the members failed to prepare, its return value
+    * will be an exception. If there is one exception we rethrow it. This will mark the
+    * current transaction as rolled back, which will cause the
+    * afterCompletion(int) callback to have a status
+    * of <tt>MARKED_ROLLBACK</tt>. When we get that call, we simply roll back the
+    * transaction.<br/>
+    * If everything runs okay, the afterCompletion(int)
+    * callback will trigger the @link #runCommitPhase(GlobalTransaction)).
+    * <br/>
+    *
+    * @throws Exception
+    */
+   protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable
+   {
+      boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
+      if (trace)
+      {
+         log.trace("(" + rpcManager.getLocalAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
+      }
+
+      // this method will return immediately if we're the only member (because exclude_self=true)
+      replicateCall(ctx, prepareMethod, !async, ctx.getOptionOverrides());
+   }
+}
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java	2008-10-14 17:43:22 UTC (rev 6940)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -23,6 +23,7 @@
 
 import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.commands.VisitableCommand;
@@ -40,7 +41,7 @@
 import org.jboss.starobrno.jmx.annotations.ManagedOperation;
 import org.jboss.starobrno.lock.LockManager;
 import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.RPCManager;
+
 import org.jboss.starobrno.remoting.ReplicationException;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/cache/interceptors/BaseRpcInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	2008-10-14 17:43:45 UTC (rev 6941)
@@ -0,0 +1,208 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors.base;
+
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.cluster.ReplicationQueue;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
+
+import javax.transaction.Transaction;
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Acts as a base for all RPC calls - subclassed by
+ * {@link org.jboss.cache.interceptors.ReplicationInterceptor} and {@link OptimisticReplicationInterceptor}.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+public abstract class BaseRpcInterceptor extends CommandInterceptor
+{
+   private ReplicationQueue replicationQueue;
+   protected TransactionTable txTable;
+   private CommandsFactory commandsFactory;
+
+   protected RPCManager rpcManager;
+   protected boolean defaultSynchronous;
+
+   @Inject
+   public void injectComponents(RPCManager rpcManager, ReplicationQueue replicationQueue,
+                                TransactionTable txTable, CommandsFactory commandsFactory)
+   {
+      this.rpcManager = rpcManager;
+      this.replicationQueue = replicationQueue;
+      this.txTable = txTable;
+      this.commandsFactory = commandsFactory;
+   }
+
+   @Start
+   public void init()
+   {
+      defaultSynchronous = configuration.getCacheMode().isSynchronous();
+   }
+
+   /**
+    * Checks whether any of the responses are exceptions. If yes, re-throws
+    * them (as exceptions or runtime exceptions).
+    */
+   protected void checkResponses(List rsps) throws Throwable
+   {
+      if (rsps != null)
+      {
+         for (Object rsp : rsps)
+         {
+            if (rsp != null && rsp instanceof Throwable)
+            {
+               // lets print a stack trace first.
+               if (log.isDebugEnabled())
+                  log.debug("Received Throwable from remote node", (Throwable) rsp);
+               throw (Throwable) rsp;
+            }
+         }
+      }
+   }
+
+   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+   {
+      replicateCall(ctx, null, call, sync, o, useOutOfBandMessage);
+   }
+
+   protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, Option o) throws Throwable
+   {
+      replicateCall(ctx, null, call, sync, o, false);
+   }
+
+   protected void replicateCall(InvocationContext ctx, Vector<Address> recipients, ReplicableCommand c, boolean sync, Option o, boolean useOutOfBandMessage) throws Throwable
+   {
+      long syncReplTimeout = configuration.getSyncReplTimeout();
+
+      // test for option overrides
+      if (o != null)
+      {
+         if (o.isForceAsynchronous()) sync = false;
+         else if (o.isForceSynchronous()) sync = true;
+
+         if (o.getSyncReplTimeout() > 0) syncReplTimeout = o.getSyncReplTimeout();
+      }
+
+      // tx-level overrides are more important
+      Transaction tx = ctx.getTransaction();
+      if (tx != null)
+      {
+         TransactionContext transactionContext = ctx.getTransactionContext();
+         if (transactionContext != null)
+         {
+            if (transactionContext.isForceAsyncReplication()) sync = false;
+            else if (transactionContext.isForceSyncReplication()) sync = true;
+         }
+      }
+
+      replicateCall(recipients, c, sync, true, useOutOfBandMessage, false, syncReplTimeout);
+   }
+
+   protected void replicateCall(Vector<Address> recipients, ReplicableCommand call, boolean sync, boolean wrapCacheCommandInReplicateMethod, boolean useOutOfBandMessage, boolean isBroadcast, long timeout) throws Throwable
+   {
+      if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
+
+      if (!sync && replicationQueue != null)
+      {
+         if (log.isDebugEnabled()) log.debug("Putting call " + call + " on the replication queue.");
+         replicationQueue.add(commandsFactory.buildReplicateCommand(call));
+      }
+      else
+      {
+         Vector<Address> callRecipients = recipients;
+         if (callRecipients == null)
+         {
+            callRecipients = null;
+            if (trace)
+               log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
+         }
+
+         ReplicableCommand toCall = wrapCacheCommandInReplicateMethod ? commandsFactory.buildReplicateCommand(call) : call;
+
+         List rsps = rpcManager.callRemoteMethods(callRecipients,
+               toCall,
+               sync, // is synchronised?
+               timeout,
+               useOutOfBandMessage
+         );
+         if (trace) log.trace("responses=" + rsps);
+         if (sync) checkResponses(rsps);
+      }
+   }
+
+   /**
+    * It does not make sense replicating a transaction method(commit, rollback, prepare) if one of the following is true:
+    * <pre>
+    *    - call was not initiated here, but on other member of the cluster
+    *    - there is no transaction. Why broadcast a commit or rollback if there is no transaction going on?
+    *    - the current transaction did not modify any data
+    * </pre>
+    */
+   protected boolean skipReplicationOfTransactionMethod(InvocationContext ctx)
+   {
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      return ctx.getTransaction() == null || gtx == null || gtx.isRemote() || ctx.getOptionOverrides().isCacheModeLocal() || !ctx.getTransactionContext().hasModifications();
+   }
+
+   /**
+    * The call runs in a transaction and it was initiated on this node of the cluster.
+    */
+   protected boolean isTransactionalAndLocal(InvocationContext ctx)
+   {
+      GlobalTransaction gtx = ctx.getGlobalTransaction();
+      boolean isInitiatedHere = gtx != null && !gtx.isRemote();
+      return isInitiatedHere && (ctx.getTransaction() != null);
+   }
+
+   protected boolean isSynchronous(Option option)
+   {
+      if (option != null)
+      {
+         if (option.isForceSynchronous())
+            return true;
+         else if (option.isForceAsynchronous())
+            return false;
+      }
+      return defaultSynchronous;
+   }
+
+   protected boolean isLocalModeForced(InvocationContext ctx)
+   {
+      if (ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal())
+      {
+         if (log.isDebugEnabled()) log.debug("LOCAL mode forced on invocation.  Suppressing clustered events.");
+         return true;
+      }
+      return false;
+   }
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
___________________________________________________________________
Name: svn:keywords
   + Author Date Id Revision
Name: svn:eol-style
   + native




More information about the jbosscache-commits mailing list