[jbosscache-commits] JBoss Cache SVN: r6879 - in core/branches/flat/src/main/java/org/jboss/starobrno: interceptors and 1 other directory.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Oct 8 12:05:25 EDT 2008
Author: mircea.markus
Date: 2008-10-08 12:05:24 -0400 (Wed, 08 Oct 2008)
New Revision: 6879
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
Log:
added
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/RPCManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno;
+
+import org.jboss.cache.commands.ReplicableCommand;
+import org.jboss.cache.Fqn;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster. For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+ /**
+ * Disconnects and closes the underlying JGroups channel.
+ */
+ void disconnect();
+
+ /**
+ * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying JGroups channel if this is
+ * still open/connected.
+ */
+ void stop();
+
+ /**
+ * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication). Connecting
+ * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+ * available before this method is called.
+ */
+ void start();
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * @return true if the current Channel is the coordinator of the cluster.
+ */
+ boolean isCoordinator();
+
+ /**
+ * @return the Address of the current coordinator.
+ */
+ Address getCoordinator();
+
+ /**
+ * Retrieves the local JGroups channel's address
+ *
+ * @return an Address
+ */
+ Address getLocalAddress();
+
+ /**
+ * Returns a defensively copied list of members in the current cluster view.
+ */
+ List<Address> getMembers();
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param sourceTarget Fqn on source to retrieve state for
+ * @param integrationTarget integration point on local cache to apply state
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param subtree Fqn subtree to retrieve. Will be integrated at the same point.
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
+
+ /**
+ * Retrieves the Channel
+ *
+ * @return a channel
+ */
+ Channel getChannel();
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/BaseTransactionalContextInterceptor.java 2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,112 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.cache.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+
+import javax.transaction.Status;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+
+/**
+ * Class providing some base functionality around manipulating transactions and global transactions withing invocation contexts.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ */
+public abstract class BaseTransactionalContextInterceptor extends CommandInterceptor
+{
+ protected TransactionTable txTable;
+ protected TransactionManager txManager;
+
+ @Inject
+ public void injectDependencies(TransactionTable txTable, TransactionManager txManager)
+ {
+ this.txManager = txManager;
+ this.txTable = txTable;
+ }
+
+ protected void copyInvocationScopeOptionsToTxScope(InvocationContext ctx)
+ {
+ // notify the transaction tCtx that this override is in place.
+ TransactionContext tCtx = txTable.get(ctx.getGlobalTransaction());
+ if (tCtx != null)
+ {
+ Option txScopeOption = new Option();
+ txScopeOption.setCacheModeLocal(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal());
+ txScopeOption.setSkipCacheStatusCheck(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isSkipCacheStatusCheck());
+ tCtx.setOption(txScopeOption);
+ }
+ }
+
+ protected void setTransactionalContext(Transaction tx, GlobalTransaction gtx, TransactionContext tCtx, InvocationContext ctx)
+ {
+ if (trace)
+ {
+ log.trace("Setting up transactional context.");
+ log.trace("Setting tx as " + tx + " and gtx as " + gtx);
+ }
+ ctx.setTransaction(tx);
+ ctx.setGlobalTransaction(gtx);
+ if (tCtx == null)
+ {
+ if (gtx != null)
+ {
+ ctx.setTransactionContext(txTable.get(gtx));
+ }
+ else if (tx == null)
+ {
+ // then nullify the transaction tCtx as well
+ ctx.setTransactionContext(null);
+ }
+ }
+ else
+ {
+ ctx.setTransactionContext(tCtx);
+ }
+ }
+
+ /**
+ * Returns true if transaction is rolling back, false otherwise
+ */
+ protected boolean isRollingBack(Transaction tx)
+ {
+ if (tx == null) return false;
+ int status;
+ try
+ {
+ status = tx.getStatus();
+ return status == Status.STATUS_ROLLING_BACK || status == Status.STATUS_ROLLEDBACK;
+ }
+ catch (SystemException e)
+ {
+ log.error("failed getting transaction status", e);
+ return false;
+ }
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java (from rev 6858, core/branches/flat/src/main/java/org/jboss/cache/interceptors/InvocationContextInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-08 16:05:24 UTC (rev 6879)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+
+import org.jboss.cache.config.Option;
+import org.jboss.cache.transaction.TransactionTable;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutMapCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.RPCManager;
+
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+
+public class InvocationContextInterceptor extends BaseTransactionalContextInterceptor
+{
+ private RPCManager rpcManager;
+
+ @Inject
+ public void setDependencies(RPCManager rpcManager)
+ {
+ this.rpcManager = rpcManager;
+ }
+
+ @Override
+ public Object visitPutMapCommand(InvocationContext ctx, PutMapCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+ }
+
+ @Override
+ public Object visitPutKeyValueCommand(InvocationContext ctx, PutKeyValueCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+ }
+
+ @Override
+ public Object visitClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+ }
+
+ @Override
+ public Object visitRemoveCommand(InvocationContext ctx, RemoveCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, ctx.getGlobalTransaction(), false);
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, command.getGlobalTransaction(), true);
+ }
+
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ return handleAll(ctx, command, null, false);
+ }
+
+ @SuppressWarnings("deprecation")
+ private Object handleAll(InvocationContext ctx, VisitableCommand command, GlobalTransaction gtx, boolean scrubContextOnCompletion) throws Throwable
+ {
+ Option optionOverride = ctx.getOptionOverrides();
+ boolean suppressExceptions = false;
+ Transaction suspendedTransaction = null;
+ boolean resumeSuspended = false;
+
+ if (trace) log.trace("Invoked with command " + command + " and InvocationContext [" + ctx + "]");
+
+ try
+ {
+ if (txManager != null)
+ {
+ Transaction tx = getTransaction();
+ GlobalTransaction realGtx = getGlobalTransaction(tx, gtx);
+ if (tx == null && realGtx != null && realGtx.isRemote()) tx = txTable.getLocalTransaction(gtx);
+ setTransactionalContext(tx, realGtx, null, ctx);
+ }
+ else
+ {
+ setTransactionalContext(null, null, null, ctx);
+ }
+
+ if (optionOverride != null)
+ {
+ if (optionOverride.isFailSilently())
+ {
+ log.debug("FAIL_SILENTLY Option is present - suspending any ongoing transaction.");
+ suppressExceptions = true;
+ if (ctx.getTransaction() != null)
+ {
+ suspendedTransaction = txManager.suspend();
+ setTransactionalContext(null, null, null, ctx);
+ if (trace) log.trace("Suspending transaction " + suspendedTransaction);
+ resumeSuspended = true;
+ }
+ else
+ {
+ if (trace) log.trace("No ongoing transaction to suspend");
+ }
+ }
+ }
+
+ Object retval;
+ try
+ {
+ return invokeNextInterceptor(ctx, command);
+ }
+ catch (Throwable th)
+ {
+ retval = th;
+ // if fail silently return a null
+ if (suppressExceptions) return null;
+ Throwable t = (Throwable) retval;
+ if (t instanceof RuntimeException && t.getCause() != null)
+ {
+ throw t.getCause();
+ }
+ else
+ {
+ throw t;
+ }
+ }
+ // assume we're the first interceptor in the chain. Handle the exception-throwing.
+ }
+ finally
+ {
+ // TODO: scope upgrading should happen transparently
+ /*
+ * we should scrub txs after every call to prevent race conditions
+ * basically any other call coming in on the same thread and hijacking any running tx's
+ * was highlighted in JBCACHE-606
+ */
+ if (scrubContextOnCompletion) setTransactionalContext(null, null, null, ctx);
+
+ // clean up any invocation-scope options set up
+ if (trace) log.trace("Resetting invocation-scope options");
+ ctx.getOptionOverrides().reset();
+
+ // if this is a prepare, opt prepare or
+
+ if (resumeSuspended)
+ {
+ txManager.resume(suspendedTransaction);
+ }
+ else
+ {
+ if (ctx.getTransaction() != null && (TransactionTable.isValid(ctx.getTransaction())))
+ {
+ copyInvocationScopeOptionsToTxScope(ctx);
+ }
+ }
+
+ // reset the context to prevent leakage of internals
+ ctx.setCommand(null);
+
+ // TODO: Calling ctx.reset() here breaks stuff. Check whether this is just becuse UTs expect stuff in the ctx or whether this really breaks functionality.
+// ctx.reset();
+ // instead, for now, just wipe contents of the looked up node map
+ ctx.clearLookedUpEntries();
+ }
+ }
+
+ private GlobalTransaction getGlobalTransaction(Transaction tx, GlobalTransaction gtx)
+ {
+ if (gtx == null) gtx = txTable.getCurrentTransaction(tx, false);
+ if (gtx != null) gtx.setRemote(isRemoteGlobalTx(gtx));
+ return gtx;
+ }
+
+ private Transaction getTransaction() throws SystemException
+ {
+ // this creates a context if one did not exist.
+ if (txManager == null)
+ {
+ if (trace) log.trace("no transaction manager configured, setting tx as null.");
+ return null;
+ }
+ else
+ {
+ return txManager.getTransaction();
+ }
+ }
+
+ /**
+ * Tests if a global transaction originated from a different cache in the cluster
+ *
+ * @param gtx
+ * @return true if the gtx is remote, false if it originated locally.
+ */
+ private boolean isRemoteGlobalTx(GlobalTransaction gtx)
+ {
+ return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getLocalAddress()));
+ }
+}
\ No newline at end of file
More information about the jbosscache-commits
mailing list