[jbosscache-commits] JBoss Cache SVN: r6879 - in core/branches/flat/src/main/java/org/jboss/starobrno: interceptors and 1 other directory.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Oct 8 12:05:25 EDT 2008


Author: mircea.markus
Date: 2008-10-08 12:05:24 -0400 (Wed, 08 Oct 2008)
New Revision: 6879

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
Log:
added

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/RPCManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno;
+
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.Fqn;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+   /**
+    * Disconnects and closes the underlying JGroups channel.
+    */
+   void disconnect();
+
+   /**
+    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
+    * still open/connected.
+    */
+   void stop();
+
+   /**
+    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
+    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+    * available before this method is called.
+    */
+   void start();
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * @return true if the current Channel is the coordinator of the cluster.
+    */
+   boolean isCoordinator();
+
+   /**
+    * @return the Address of the current coordinator.
+    */
+   Address getCoordinator();
+
+   /**
+    * Retrieves the local JGroups channel's address
+    *
+    * @return an Address
+    */
+   Address getLocalAddress();
+
+   /**
+    * Returns a defensively copied list of  members in the current cluster view.
+    */
+   List<Address> getMembers();
+
+   /**
+    * Retrieves partial state from remote instances.
+    *
+    * @param sources           sources to consider for a state transfer
+    * @param sourceTarget      Fqn on source to retrieve state for
+    * @param integrationTarget integration point on local cache to apply state
+    * @throws Exception in the event of problems
+    */
+   void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
+
+   /**
+    * Retrieves partial state from remote instances.
+    *
+    * @param sources sources to consider for a state transfer
+    * @param subtree Fqn subtree to retrieve.  Will be integrated at the same point.
+    * @throws Exception in the event of problems
+    */
+   void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
+
+   /**
+    * Retrieves the Channel
+    *
+    * @return a channel
+    */
+   Channel getChannel();
+}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java	2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.cache.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * Class providing some base functionality around manipulating transactions and global transactions withing invocation contexts.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ */
+public abstract class BaseTransactionalContextInterceptor extends CommandInterceptor
+{
+   protected TransactionTable txTable;
+   protected TransactionManager txManager;
+
+   @Inject
+   public void injectDependencies(TransactionTable txTable, TransactionManager txManager)
+   {
+      this.txManager = txManager;
+      this.txTable = txTable;
+   }
+
+   protected void copyInvocationScopeOptionsToTxScope(InvocationContext ctx)
+   {
+      // notify the transaction tCtx that this override is in place.
+      TransactionContext tCtx = txTable.get(ctx.getGlobalTransaction());
+      if (tCtx != null)
+      {
+         Option txScopeOption = new Option();
+         txScopeOption.setCacheModeLocal(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal());
+         txScopeOption.setSkipCacheStatusCheck(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isSkipCacheStatusCheck());
+         tCtx.setOption(txScopeOption);
+      }
+   }
+
+   protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, TransactionContext tCtx, InvocationContext ctx)
+   {
+      if (trace)
+      {
+         log.trace("Setting up transactional context.");
+         log.trace("Setting tx as " + tx + " and gtx as " + gtx);
+      }
+      ctx.setTransaction(tx);
+      ctx.setGlobalTransaction(gtx);
+      if (tCtx == null)
+      {
+         if (gtx != null)
+         {
+            ctx.setTransactionContext(txTable.get(gtx));
+         }
+         else if (tx == null)
+         {
+            // then nullify the transaction tCtx as well
+            ctx.setTransactionContext(null);
+         }
+      }
+      else
+      {
+         ctx.setTransactionContext(tCtx);
+      }
+   }
+
+   /**
+    * Returns true if transaction is rolling back, false otherwise
+    */
+   protected boolean isRollingBack(Transaction tx)
+   {
+      if (tx == null) return false;
+      int status;
+      try
+      {
+         status = tx.getStatus();
+         return status == Status.STATUS_ROLLING_BACK || status == Status.STATUS_ROLLEDBACK;
+      }
+      catch (SystemException e)
+      {
+         log.error("failed getting transaction status", e);
+         return false;
+      }
+   }
+}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+
+import org.jboss.cache.config.Option;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutMapCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.RPCManager;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
+public class InvocationContextInterceptor extends BaseTransactionalContextInterceptor
+{
+   private RPCManager rpcManager;
+
+   @Inject
+   public void setDependencies(RPCManager rpcManager)
+   {
+      this.rpcManager = rpcManager;
+   }
+
+   @Override
+   public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+   }
+
+   @Override
+   public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+   }
+
+   @Override
+   public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+   }
+
+   @Override
+   public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+   }
+
+   @Override
+   public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
+   }
+
+   @Override
+   public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
+   }
+
+   @Override
+   public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, command.getGlobalTransaction(), true);
+   }
+
+   @Override
+   public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+   {
+      return handleAll(ctx, command, null, false);
+   }
+
+   @SuppressWarnings("deprecation")
+   private Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx, boolean scrubContextOnCompletion) throws Throwable
+   {
+      Option optionOverride = ctx.getOptionOverrides();
+      boolean suppressExceptions = false;
+      Transaction suspendedTransaction = null;
+      boolean resumeSuspended = false;
+
+      if (trace) log.trace("Invoked with command " + command + " and InvocationContext [" + ctx + "]");
+
+      try
+      {
+         if (txManager != null)
+         {
+            Transaction tx = getTransaction();
+            GlobalTransaction realGtx = getGlobalTransaction(tx, gtx);
+            if (tx == null && realGtx != null && realGtx.isRemote()) tx = txTable.getLocalTransaction(gtx);
+            setTransactionalContext(tx, realGtx, null, ctx);
+         }
+         else
+         {
+            setTransactionalContext(null, null, null, ctx);
+         }
+
+         if (optionOverride != null)
+         {
+            if (optionOverride.isFailSilently())
+            {
+               log.debug("FAIL_SILENTLY Option is present - suspending any ongoing transaction.");
+               suppressExceptions = true;
+               if (ctx.getTransaction() != null)
+               {
+                  suspendedTransaction = txManager.suspend();
+                  setTransactionalContext(null, null, null, ctx);
+                  if (trace) log.trace("Suspending transaction " + suspendedTransaction);
+                  resumeSuspended = true;
+               }
+               else
+               {
+                  if (trace) log.trace("No ongoing transaction to suspend");
+               }
+            }
+         }
+
+         Object retval;
+         try
+         {
+            return invokeNextInterceptor(ctx, command);
+         }
+         catch (Throwable th)
+         {
+            retval = th;
+            // if fail silently return a null
+            if (suppressExceptions) return null;
+            Throwable t = (Throwable) retval;
+            if (t instanceof RuntimeException && t.getCause() != null)
+            {
+               throw t.getCause();
+            }
+            else
+            {
+               throw t;
+            }
+         }
+         // assume we're the first interceptor in the chain.  Handle the exception-throwing.
+      }
+      finally
+      {
+         // TODO: scope upgrading should happen transparently
+         /*
+          * we should scrub txs after every call to prevent race conditions
+          * basically any other call coming in on the same thread and hijacking any running tx's
+          * was highlighted in JBCACHE-606
+          */
+         if (scrubContextOnCompletion) setTransactionalContext(null, null, null, ctx);
+
+         // clean up any invocation-scope options set up
+         if (trace) log.trace("Resetting invocation-scope options");
+         ctx.getOptionOverrides().reset();
+
+         // if this is a prepare, opt prepare or
+
+         if (resumeSuspended)
+         {
+            txManager.resume(suspendedTransaction);
+         }
+         else
+         {
+            if (ctx.getTransaction() != null && (TransactionTable.isValid(ctx.getTransaction())))
+            {
+               copyInvocationScopeOptionsToTxScope(ctx);
+            }
+         }
+
+         // reset the context to prevent leakage of internals
+         ctx.setCommand(null);
+
+         // TODO: Calling ctx.reset() here breaks stuff.  Check whether this is just becuse UTs expect stuff in the ctx or whether this really breaks functionality.
+//         ctx.reset();
+         // instead, for now, just wipe contents of the looked up node map
+         ctx.clearLookedUpEntries();
+      }
+   }
+
+   private GlobalTransaction getGlobalTransaction(Transaction tx, GlobalTransaction gtx)
+   {
+      if (gtx == null) gtx = txTable.getCurrentTransaction(tx, false);
+      if (gtx != null) gtx.setRemote(isRemoteGlobalTx(gtx));
+      return gtx;
+   }
+
+   private Transaction getTransaction() throws SystemException
+   {
+      // this creates a context if one did not exist.
+      if (txManager == null)
+      {
+         if (trace) log.trace("no transaction manager configured, setting tx as null.");
+         return null;
+      }
+      else
+      {
+         return txManager.getTransaction();
+      }
+   }
+
+   /**
+    * Tests if a global transaction originated from a different cache in the cluster
+    *
+    * @param gtx
+    * @return true if the gtx is remote, false if it originated locally.
+    */
+   private boolean isRemoteGlobalTx(GlobalTransaction gtx)
+   {
+      return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getLocalAddress()));
+   }
+}
\ No newline at end of file




More information about the jbosscache-commits mailing list