[jbosscache-commits] JBoss Cache SVN: r6897 - in core/branches/flat/src: main/java/org/jboss/starobrno/commands/read and 9 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Oct 9 09:22:09 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-10-09 09:22:09 -0400 (Thu, 09 Oct 2008)
New Revision: 6897
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java
core/branches/flat/src/test/java/org/jboss/starobrno/profiling/
core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/
core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
Removed:
core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java
core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java
core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
Log:
Working mem tests
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,10 +24,10 @@
import org.jboss.cache.CacheStatus;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.interceptors.base.CommandInterceptor;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.statetransfer.StateTransferManager;
+import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.read.GetKeyValueCommand;
import org.jboss.starobrno.commands.read.SizeCommand;
@@ -44,11 +44,12 @@
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.invocation.InvocationContextContainer;
import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
-import org.jboss.starobrno.batch.BatchContainer;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -295,7 +296,7 @@
return null; //TODO: Autogenerated. Implement me properly
}
- public Object getRPCManager()
+ public RPCManager getRPCManager()
{
return null; //TODO: Autogenerated. Implement me properly
}
@@ -305,7 +306,7 @@
return null; //TODO: Autogenerated. Implement me properly
}
- public org.jboss.cache.notifications.Notifier getNotifier()
+ public Notifier getNotifier()
{
return null; //TODO: Autogenerated. Implement me properly
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,14 +24,14 @@
import net.jcip.annotations.ThreadSafe;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.interceptors.base.CommandInterceptor;
-import org.jboss.cache.loader.CacheLoader;
import org.jboss.cache.loader.CacheLoaderManager;
import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.notifications.Notifier;
import org.jboss.cache.statetransfer.StateTransferManager;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
@@ -168,7 +168,7 @@
*
* @return the {@link org.jboss.cache.RPCManager} configured.
*/
- Object getRPCManager();
+ RPCManager getRPCManager();
/**
* Retrieves the current StateTransferManager instance associated with the current Cache instance.
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import java.util.Map.Entry;
-import java.util.Set;
-
-/**
- * // TODO: MANIK: Document this
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 3.0
- */
-public interface DataContainer<K, V>
-{
- Set<Entry<K, V>> getEntries();
-
- Entry<K, V> getEntry(K k);
-
- void putEntry(Entry<K, V> entry);
-
- boolean exists(Entry<K, V> entry);
-
- int size();
-
- void clear();
-
- void removeEntry(K key);
-}
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,154 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.cache.commands.ReplicableCommand;
-import org.jboss.cache.Fqn;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster. For now this is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
- /**
- * Disconnects and closes the underlying JGroups channel.
- */
- void disconnect();
-
- /**
- * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying JGroups channel if this is
- * still open/connected.
- */
- void stop();
-
- /**
- * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication). Connecting
- * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
- * available before this method is called.
- */
- void start();
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
-
- /**
- * @return true if the current Channel is the coordinator of the cluster.
- */
- boolean isCoordinator();
-
- /**
- * @return the Address of the current coordinator.
- */
- Address getCoordinator();
-
- /**
- * Retrieves the local JGroups channel's address
- *
- * @return an Address
- */
- Address getLocalAddress();
-
- /**
- * Returns a defensively copied list of members in the current cluster view.
- */
- List<Address> getMembers();
-
- /**
- * Retrieves partial state from remote instances.
- *
- * @param sources sources to consider for a state transfer
- * @param sourceTarget Fqn on source to retrieve state for
- * @param integrationTarget integration point on local cache to apply state
- * @throws Exception in the event of problems
- */
- void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
-
- /**
- * Retrieves partial state from remote instances.
- *
- * @param sources sources to consider for a state transfer
- * @param subtree Fqn subtree to retrieve. Will be integrated at the same point.
- * @throws Exception in the event of problems
- */
- void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
-
- /**
- * Retrieves the Channel
- *
- * @return a channel
- */
- Channel getChannel();
-}
\ No newline at end of file
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,57 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-/**
- * Thrown when a replication problem occurred
- */
-public class ReplicationException extends CacheException
-{
-
- private static final long serialVersionUID = 33172388691879866L;
-
- public ReplicationException()
- {
- super();
- }
-
- public ReplicationException(Throwable cause)
- {
- super(cause);
- }
-
- public ReplicationException(String msg)
- {
- super(msg);
- }
-
- public ReplicationException(String msg, Throwable cause)
- {
- super(msg, cause);
- }
-
- @Override
- public String toString()
- {
- return super.toString();
- }
-}
\ No newline at end of file
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,1041 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.cache.util.concurrent.ConcurrentHashSet;
-import org.jboss.starobrno.commands.CommandsFactory;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jboss.starobrno.commands.VisitableCommand;
-import org.jboss.starobrno.commands.tx.CommitCommand;
-import org.jboss.starobrno.commands.tx.PrepareCommand;
-import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.context.TransactionContext;
-import org.jboss.starobrno.factories.ComponentRegistry;
-import org.jboss.starobrno.factories.annotations.Inject;
-import org.jboss.starobrno.factories.context.ContextFactory;
-import org.jboss.starobrno.interceptors.BaseTransactionalContextInterceptor;
-import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
-import org.jboss.starobrno.jmx.annotations.ManagedOperation;
-import org.jboss.starobrno.lock.LockManager;
-import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.transaction.GlobalTransaction;
-import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
-import org.jboss.starobrno.transaction.TransactionTable;
-
-import javax.transaction.*;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This interceptor is the new default at the head of all interceptor chains,
- * and makes transactional attributes available to all interceptors in the chain.
- * This interceptor is also responsible for registering for synchronisation on
- * transaction completion.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
- */
-public class TxInterceptor extends BaseTransactionalContextInterceptor
-{
- protected CommandsFactory commandsFactory;
- protected RPCManager rpcManager;
- private Notifier notifier;
- private InvocationContextContainer invocationContextContainer;
- private ComponentRegistry componentRegistry;
- private ContextFactory contextFactory;
-
- /**
- * List <Transaction>that we have registered for
- */
- private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
- private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
- private long prepares = 0;
- private long commits = 0;
- private long rollbacks = 0;
- protected boolean optimistic = false;
- private LockManager lockManager;
- private boolean statsEnabled;
-
- @Inject
- public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
- Notifier notifier, InvocationContextContainer icc,
- CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager)
- {
- this.contextFactory = contextFactory;
- this.commandsFactory = factory;
- this.rpcManager = rpcManager;
- this.notifier = notifier;
- this.invocationContextContainer = icc;
- this.componentRegistry = componentRegistry;
- this.lockManager = lockManager;
- setStatisticsEnabled(configuration.isExposeManagementStatistics());
- }
-
- @Override
- public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
- {
- Object result = null;
-
- // this is a prepare, commit, or rollback.
- if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
- try
- {
- if (ctx.getGlobalTransaction().isRemote())
- {
- result = handleRemotePrepare(ctx, command);
- if (getStatisticsEnabled()) prepares++;
- }
- else
- {
- if (trace) log.trace("received my own message (discarding it)");
- result = null;
- }
- }
- catch (Throwable e)
- {
- ctx.throwIfNeeded(e);
- }
-
- return result;
- }
-
- @Override
- public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
- {
- if (!ctx.getGlobalTransaction().isRemote())
- {
- if (trace) log.trace("received my own message (discarding it)");
- return null;
- }
- try
- {
- if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- Transaction ltx = txTable.getLocalTransaction(gtx, true);
- // disconnect if we have a current tx associated
- Transaction currentTx = txManager.getTransaction();
- boolean resumeCurrentTxOnCompletion = false;
- try
- {
- if (!ltx.equals(currentTx))
- {
- currentTx = txManager.suspend();
- resumeCurrentTxOnCompletion = true;
- txManager.resume(ltx);
- // make sure we set this in the ctx
- ctx.setTransaction(ltx);
- }
- if (log.isDebugEnabled()) log.debug(" executing commit() with local TX " + ltx + " under global tx " + gtx);
- txManager.commit();
- if (getStatisticsEnabled()) commits++;
- }
- finally
- {
- //resume the old transaction if we suspended it
- if (resumeCurrentTxOnCompletion)
- {
- resumeTransactionOnCompletion(ctx, currentTx);
- }
- // remove from local lists.
- transactions.remove(ltx);
- // this tx has completed. Clean up in the tx table.
- txTable.remove(gtx, ltx);
- }
- if (log.isDebugEnabled()) log.debug("Finished remote rollback method for " + gtx);
- }
- catch (Throwable throwable)
- {
- ctx.throwIfNeeded(throwable);
- }
- return null;
- }
-
- @Override
- public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
- {
- if (!ctx.getGlobalTransaction().isRemote())
- {
- if (trace) log.trace("received my own message (discarding it)");
- return null;
- }
- try
- {
- if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- Transaction ltx = txTable.getLocalTransaction(gtx);
- if (ltx == null)
- {
- log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
- txTable.remove(gtx);
- return null;
- }
- // disconnect if we have a current tx associated
- Transaction currentTx = txManager.getTransaction();
- boolean resumeCurrentTxOnCompletion = false;
- try
- {
- if (!ltx.equals(currentTx))
- {
- currentTx = txManager.suspend();
- resumeCurrentTxOnCompletion = true;
- txManager.resume(ltx);
- // make sure we set this in the ctx
- ctx.setTransaction(ltx);
- }
- if (log.isDebugEnabled()) log.debug("executing with local TX " + ltx + " under global tx " + gtx);
- txManager.rollback();
- if (getStatisticsEnabled()) rollbacks++;
- }
- finally
- {
- //resume the old transaction if we suspended it
- if (resumeCurrentTxOnCompletion)
- {
- resumeTransactionOnCompletion(ctx, currentTx);
- }
-
- // remove from local lists.
- transactions.remove(ltx);
-
- // this tx has completed. Clean up in the tx table.
- txTable.remove(gtx, ltx);
- }
- if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
- }
- catch (Throwable throwable)
- {
- ctx.throwIfNeeded(throwable);
- }
-
- return null;
- }
-
- @Override
- public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
- {
- try
- {
- return attachGtxAndPassUpChain(ctx, command);
- }
- catch (Throwable throwable)
- {
- ctx.throwIfNeeded(throwable);
- return null;
- }
- }
-
- protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand command) throws Throwable
- {
- Transaction tx = ctx.getTransaction();
- if (tx != null) attachGlobalTransaction(ctx, tx, command);
- return invokeNextInterceptor(ctx, command);
- }
-
- // ------------------------------------------------------------------------
- // JMX statistics
- // ------------------------------------------------------------------------
-
- // --------------------------------------------------------------
-
- /**
- * Handles a remotely originating prepare call, by creating a local transaction for the remote global transaction
- * and replaying modifications in this new local transaction.
- *
- * @param ctx invocation context
- * @param command prepare command
- * @return result of the prepare, typically a null.
- * @throws Throwable in the event of problems.
- */
- private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command) throws Throwable
- {
- // the InvocationContextInterceptor would have set this for us
- GlobalTransaction gtx = ctx.getGlobalTransaction();
-
- // Is there a local transaction associated with GTX? (not the current tx associated with the thread, which may be
- // in the invocation context
- Transaction ltx = txTable.getLocalTransaction(gtx);
- Transaction currentTx = txManager.getTransaction();
-
- Object retval = null;
- boolean success = false;
- try
- {
- if (ltx == null)
- {
- if (currentTx != null) txManager.suspend();
- // create a new local transaction
- ltx = createLocalTx();
- // associate this with a global tx
- txTable.put(ltx, gtx);
- if (trace) log.trace("Created new tx for gtx " + gtx);
-
- if (log.isDebugEnabled())
- log.debug("Started new local tx as result of remote prepare: local tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
- }
- else
- {
- //this should be valid
- if (!TransactionTable.isValid(ltx))
- throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
-
- //associate this thread with the local transaction associated with the global transaction, IF the localTx is NOT the current tx.
- if (currentTx == null || !ltx.equals(currentTx))
- {
- if (trace) log.trace("Suspending current tx " + currentTx);
- txManager.suspend();
- txManager.resume(ltx);
- }
- }
- if (trace) log.trace("Resuming existing tx " + ltx + ", global tx=" + gtx);
-
- // at this point we have a non-null ltx
-
- // Asssociate the local TX with the global TX. Create new
- // transactionContext for TX in txTable, the modifications
- // below will need this transactionContext to add their modifications
- // under the GlobalTx key
- TransactionContext transactionContext = txTable.get(gtx);
- if (transactionContext == null)
- {
- // create a new transaction transactionContext
- if (log.isDebugEnabled()) log.debug("creating new tx transactionContext");
- transactionContext = contextFactory.createTransactionContext(ltx);
- txTable.put(gtx, transactionContext);
- }
-
- setTransactionalContext(ltx, gtx, transactionContext, ctx);
-
- // register a sync handler for this tx.
- registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, transactionContext), ctx);
-
- success = false;
-
- // replay modifications
- replayModifications(ctx, ltx, command);
-
- success = true; // no exceptions were thrown above!!
-
- // now pass the prepare command up the chain as well.
- if (command.isOnePhaseCommit())
- {
- if (trace)
- log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
- }
- else
- {
- // now pass up the prepare method itself.
- invokeNextInterceptor(ctx, command);
- }
- // JBCACHE-361 Confirm that the transaction is ACTIVE
- assertTxIsStillValid(ltx);
- }
- finally
- {
- // if we are running a one-phase commit, perform a commit or rollback now.
- if (trace) log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
-
- if (command.isOnePhaseCommit())
- {
- try
- {
- if (success)
- {
- ltx.commit();
- }
- else
- {
- ltx.rollback();
- }
- }
- catch (Throwable t)
- {
- log.error("Commit/rollback failed.", t);
- if (success)
- {
- // try another rollback...
- try
- {
- log.info("Attempting anotehr rollback");
- //invokeOnePhaseCommitMethod(globalTransaction, modifications.size() > 0, false);
- ltx.rollback();
- }
- catch (Throwable t2)
- {
- log.error("Unable to rollback", t2);
- }
- }
- }
- finally
- {
- transactions.remove(ltx);// JBAS-298
- }
- }
-
- txManager.suspend();// suspends ltx - could be null
- // resume whatever else we had going.
- if (currentTx != null) txManager.resume(currentTx);
- if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
- }
-
- return retval;
- }
-
- private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
- {
- if (trace)
- {
- log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
- }
- if (trace)
- {
- GlobalTransaction tempGtx = txTable.get(tx);
- log.trace("Associated gtx in txTable is " + tempGtx);
- }
-
- // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
- GlobalTransaction gtx = registerTransaction(tx, ctx);
- if (gtx == null)
- {
- // get the current globalTransaction from the txTable.
- gtx = txTable.get(tx);
- }
-
- // make sure we attach this globalTransaction to the invocation context.
- ctx.setGlobalTransaction(gtx);
-
- return command;
- }
-
- /**
- * Replays modifications
- */
- protected void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
- {
- try
- {
- // replay modifications
- for (VisitableCommand modification : command.getModifications())
- {
- invokeNextInterceptor(ctx, modification);
- assertTxIsStillValid(ltx);
- }
- }
- catch (Throwable th)
- {
- log.error("prepare failed!", th);
- throw th;
- }
- }
-
- private void resumeTransactionOnCompletion(InvocationContext ctx, Transaction currentTx)
- throws SystemException, InvalidTransactionException
- {
- if (trace) log.trace("Resuming suspended transaction " + currentTx);
- txManager.suspend();
- if (currentTx != null)
- {
- txManager.resume(currentTx);
- ctx.setTransaction(currentTx);
- }
- }
-
- /**
- * Handles a commit or a rollback. Called by the synch handler. Simply tests that we are in the correct tx and
- * passes the meth call up the interceptor chain.
- *
- * @throws Throwable
- */
- @SuppressWarnings("deprecation")
- private Object handleCommitRollback(InvocationContext ctx, VisitableCommand command) throws Throwable
- {
- GlobalTransaction gtx = ctx.getGlobalTransaction();
- Object result;
- VisitableCommand originalCommand = ctx.getCommand();
- ctx.setCommand(command);
- try
- {
- result = invokeNextInterceptor(ctx, command);
- }
- finally
- {
- ctx.setCommand(originalCommand);
- }
- if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
- return result;
- }
-
- // --------------------------------------------------------------
- // Transaction phase runners
- // --------------------------------------------------------------
-
- protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
- {
- return commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), onePhaseCommit);
- }
-
- /**
- * creates a commit()
- */
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
- {
- try
- {
- VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
-
- if (trace) log.trace("Running commit for " + gtx);
-
- handleCommitRollback(ctx, commitCommand);
- }
- catch (Throwable e)
- {
- log.warn("Commit failed. Clearing stale locks.");
- try
- {
- cleanupStaleLocks(ctx);
- }
- catch (RuntimeException re)
- {
- log.error("Unable to clear stale locks", re);
- throw re;
- }
- catch (Throwable e2)
- {
- log.error("Unable to clear stale locks", e2);
- throw new RuntimeException(e2);
- }
- if (e instanceof RuntimeException)
- throw (RuntimeException) e;
- else
- throw new RuntimeException("Commit failed.", e);
- }
- }
-
- protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
- {
- TransactionContext transactionContext = ctx.getTransactionContext();
- if (transactionContext != null) lockManager.unlock(ctx);
- }
-
- /**
- * creates a rollback()
- */
- protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx)
- {
- try
- {
- // JBCACHE-457
- VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
- if (trace) log.trace(" running rollback for " + gtx);
-
- //JBCACHE-359 Store a lookup for the globalTransaction so a listener
- // callback can find it
- rollbackTransactions.put(tx, gtx);
-
- handleCommitRollback(ctx, rollbackCommand);
- }
- catch (Throwable e)
- {
- log.warn("Rollback had a problem", e);
- }
- finally
- {
- rollbackTransactions.remove(tx);
- }
- }
-
- private boolean isOnePhaseCommit()
- {
- if (!configuration.getCacheMode().isSynchronous())
- {
- // this is a REPL_ASYNC call - do 1-phase commit. break!
- if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
- return true;
- }
- return false;
- }
-
- /**
- * Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
- * method call and passes the prepare() call up the chain.
- */
- @SuppressWarnings("deprecation")
- public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<VisitableCommand> modifications) throws Throwable
- {
- // running a 2-phase commit.
- VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
-
- Object result;
-
- // Is there a local transaction associated with GTX ?
- Transaction ltx = ctx.getTransaction();
-
- //if ltx is not null and it is already running
- Transaction currentTransaction = txManager.getTransaction();
- if (currentTransaction != null && ltx != null && currentTransaction.equals(ltx))
- {
- VisitableCommand originalCommand = ctx.getCommand();
- ctx.setCommand(prepareCommand);
- try
- {
- result = invokeNextInterceptor(ctx, prepareCommand);
- }
- finally
- {
- ctx.setCommand(originalCommand);
- }
- }
- else
- {
- log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
- throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
- }
- return result;
- }
-
- // --------------------------------------------------------------
- // Private helper methods
- // --------------------------------------------------------------
-
- protected void assertTxIsStillValid(Transaction tx)
- {
- if (!TransactionTable.isActive(tx))
- {
- try
- {
- throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
- }
- catch (SystemException e)
- {
- throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
- }
- }
- }
-
- /**
- * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
- */
- private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
- {
- GlobalTransaction gtx;
-
- if (TransactionTable.isValid(tx) && transactions.add(tx))
- {
- gtx = txTable.getCurrentTransaction(tx, true);
- TransactionContext transactionContext;
- if (ctx.getGlobalTransaction() == null)
- {
- ctx.setGlobalTransaction(gtx);
- transactionContext = txTable.get(gtx);
- ctx.setTransactionContext(transactionContext);
- }
- else
- {
- transactionContext = ctx.getTransactionContext();
- }
- if (gtx.isRemote())
- {
- // should be no need to register a handler since this a remotely initiated globalTransaction
- if (trace) log.trace("is a remotely initiated gtx so no need to register a tx for it");
- }
- else
- {
- if (trace) log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
-
- // see the comment in the LocalSyncHandler for the last isOriginLocal param.
- LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, transactionContext, !ctx.isOriginLocal());
- registerHandler(tx, myHandler, ctx);
- }
- }
- else if ((gtx = rollbackTransactions.get(tx)) != null)
- {
- if (trace) log.trace("Transaction " + tx + " is already registered and is rolling back.");
- }
- else
- {
- if (trace) log.trace("Transaction " + tx + " is already registered.");
- }
- return gtx;
- }
-
- /**
- * Registers a sync hander against a tx.
- */
- private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
- {
- OrderedSynchronizationHandler orderedHandler = ctx.getTransactionContext().getOrderedSynchronizationHandler(); //OrderedSynchronizationHandler.getInstance(tx);
-
- if (trace) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
-
- orderedHandler.registerAtHead(handler);// needs to be invoked first on TX commit
-
- notifier.notifyTransactionRegistered(tx, ctx);
- }
-
- /**
- * Creates and starts a local tx
- *
- * @throws Exception
- */
- protected Transaction createLocalTx() throws Exception
- {
- if (trace)
- {
- log.trace("Creating transaction for thread " + Thread.currentThread());
- }
- Transaction localTx;
- if (txManager == null) throw new Exception("Failed to create local transaction; TransactionManager is null");
- txManager.begin();
- localTx = txManager.getTransaction();
- return localTx;
- }
-
- // ------------------------------------------------------------------------
- // Synchronization classes
- // ------------------------------------------------------------------------
-
- // this controls the whole transaction
-
- private class RemoteSynchronizationHandler implements Synchronization
- {
- Transaction tx = null;
- GlobalTransaction gtx = null;
- List<VisitableCommand> modifications = null;
- TransactionContext transactionContext = null;
- protected InvocationContext ctx; // the context for this call.
-
- RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext entry)
- {
- this.gtx = gtx;
- this.tx = tx;
- this.transactionContext = entry;
- }
-
- public void beforeCompletion()
- {
- if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
-
- if (transactionContext == null)
- {
- log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
- throw new IllegalStateException("cannot find transaction entry for " + gtx);
- }
-
- modifications = transactionContext.getModifications();
- ctx = invocationContextContainer.get();
- setTransactionalContext(tx, gtx, transactionContext, ctx);
-
- if (ctx.isOptionsUninitialised() && transactionContext.getOption() != null)
- ctx.setOptionOverrides(transactionContext.getOption());
-
- assertCanContinue();
-
- ctx.setOriginLocal(false);
- }
-
- // this should really not be done here -
- // it is supposed to be post commit not actually run the commit
- public void afterCompletion(int status)
- {
- // could happen if a rollback is called and beforeCompletion() doesn't get called.
- if (ctx == null)
- {
- ctx = invocationContextContainer.get();
- setTransactionalContext(tx, gtx, transactionContext, ctx);
-
- if (ctx.isOptionsUninitialised() && transactionContext != null && transactionContext.getOption() != null)
- {
- // use the options from the transaction entry instead
- ctx.setOptionOverrides(transactionContext.getOption());
- }
- }
-
- try
- {
- assertCanContinue();
-
- try
- {
- if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
- }
- catch (Exception e)
- {
- log.error("afterCompletion error: " + status, e);
- }
-
- if (trace) log.trace("calling aftercompletion for " + gtx);
-
- // set any transaction wide options as current for this thread.
- if (transactionContext != null)
- {
- // this should ideally be set in beforeCompletion(), after compacting the list.
- if (modifications == null) modifications = transactionContext.getModifications();
- ctx.setOptionOverrides(transactionContext.getOption());
- }
- if (tx != null) transactions.remove(tx);
-
- switch (status)
- {
- case Status.STATUS_COMMITTED:
- boolean onePhaseCommit = isOnePhaseCommit();
- if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
- runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
- log.debug("Finished commit phase");
- break;
- case Status.STATUS_UNKNOWN:
- log.warn("Received JTA STATUS_UNKNOWN in afterCompletion()! XA resources may not be in sync. The app should manually clean up resources at this point.");
- case Status.STATUS_MARKED_ROLLBACK:
- case Status.STATUS_ROLLEDBACK:
- log.debug("Running rollback phase");
- runRollbackPhase(ctx, gtx, tx);
- log.debug("Finished rollback phase");
- break;
-
- default:
- throw new IllegalStateException("illegal status: " + status);
- }
- }
- catch (Exception th)
- {
- log.trace("Caught exception ", th);
-
- }
- finally
- {
- // clean up the tx table
- txTable.remove(gtx);
- txTable.remove(tx);
- setTransactionalContext(null, null, null, ctx);
- cleanupInternalState();
- }
- }
-
- private void assertCanContinue()
- {
- if (!componentRegistry.invocationsAllowed(true) && (ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
- throw new IllegalStateException("Cache not in STARTED state!");
- }
-
- /**
- * Cleans out (nullifies) member variables held by the sync object for easier gc. Could be (falsely) seen as a mem
- * leak if the TM implementation hangs on to the synchronizations for an unnecessarily long time even after the tx
- * completes. See JBCACHE-1007.
- */
- private void cleanupInternalState()
- {
- tx = null;
- gtx = null;
- modifications = null;
- if (transactionContext != null) transactionContext.reset();
- transactionContext = null;
- }
-
- @Override
- public String toString()
- {
- return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
- }
-
- protected String getTxAsString()
- {
- // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
- if (tx == null)
- return null;
-
- return tx.getClass().getName() + "@" + System.identityHashCode(tx);
- }
- }
-
- private class LocalSynchronizationHandler extends RemoteSynchronizationHandler
- {
- private boolean localRollbackOnly = true;
- // a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
- // cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
- // This is STILL remotely originating though and this needs to be made explicit here.
- // this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
- private boolean remoteLocal = false;
- private Option originalOptions, transactionalOptions;
-
- /**
- * A Synchronization for locally originating txs.
- * <p/>
- * a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
- * cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
- * This is STILL remotely originating though and this needs to be made explicit here.
- * this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
- *
- * @param gtx
- * @param tx
- * @param remoteLocal
- */
- LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext transactionContext, boolean remoteLocal)
- {
- super(gtx, tx, transactionContext);
- this.remoteLocal = remoteLocal;
- }
-
- @Override
- public void beforeCompletion()
- {
- super.beforeCompletion();
- ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
- // fetch the modifications before the transaction is committed
- // (and thus removed from the txTable)
- setTransactionalContext(tx, gtx, transactionContext, ctx);
- if (!transactionContext.hasModifications())
- {
- if (trace) log.trace("No modifications in this tx. Skipping beforeCompletion()");
- modifications = Collections.emptyList();
- return;
- }
- else
- {
- modifications = transactionContext.getModifications();
- }
-
- // set any transaction wide options as current for this thread, caching original options that would then be reset
- originalOptions = ctx.getOptionOverrides();
- transactionalOptions = transactionContext.getOption();
- ctx.setOptionOverrides(transactionalOptions);
-
- try
- {
- switch (tx.getStatus())
- {
- // if we are active or preparing then we can go ahead
- case Status.STATUS_ACTIVE:
- case Status.STATUS_PREPARING:
- // run a prepare call.
-
- Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx, modifications);
-
- if (result instanceof Throwable)
- {
- if (log.isDebugEnabled())
- log.debug("Transaction needs to be rolled back - the cache returned an instance of Throwable for this prepare call (tx=" + tx + " and gtx=" + gtx + ")", (Throwable) result);
- tx.setRollbackOnly();
- throw (Throwable) result;
- }
- break;
- default:
- throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unable to start transaction");
- }
- }
- catch (Throwable t)
- {
- if (log.isWarnEnabled()) log.warn("Caught exception, will now set transaction to roll back", t);
- try
- {
- tx.setRollbackOnly();
- }
- catch (SystemException se)
- {
- throw new RuntimeException("setting tx rollback failed ", se);
- }
- if (t instanceof RuntimeException)
- throw (RuntimeException) t;
- else
- throw new RuntimeException("", t);
- }
- finally
- {
- localRollbackOnly = false;
- setTransactionalContext(null, null, null, ctx);
- ctx.setOptionOverrides(originalOptions);
- }
- }
-
- @Override
- public void afterCompletion(int status)
- {
- // could happen if a rollback is called and beforeCompletion() doesn't get called.
- if (ctx == null) ctx = invocationContextContainer.get();
- ctx.setLocalRollbackOnly(localRollbackOnly);
- setTransactionalContext(tx, gtx, transactionContext, ctx);
- if (transactionalOptions != null) ctx.setOptionOverrides(transactionalOptions);
- try
- {
- super.afterCompletion(status);
- }
- finally
- {
- ctx.setOptionOverrides(originalOptions);
- }
- }
-
- @Override
- public String toString()
- {
- return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
- }
- }
-
- @ManagedOperation
- public void resetStatistics()
- {
- prepares = 0;
- commits = 0;
- rollbacks = 0;
- }
-
- @ManagedOperation
- public Map<String, Object> dumpStatistics()
- {
- Map<String, Object> retval = new HashMap<String, Object>(3);
- retval.put("Prepares", prepares);
- retval.put("Commits", commits);
- retval.put("Rollbacks", rollbacks);
- return retval;
- }
-
- @ManagedAttribute
- public boolean getStatisticsEnabled()
- {
- return this.statsEnabled;
- }
-
- @ManagedAttribute
- public void setStatisticsEnabled(boolean enabled)
- {
- this.statsEnabled = enabled;
- }
-
- @ManagedAttribute(description = "number of transaction prepares")
- public long getPrepares()
- {
- return prepares;
- }
-
- @ManagedAttribute(description = "number of transaction commits")
- public long getCommits()
- {
- return commits;
- }
-
- @ManagedAttribute(description = "number of transaction rollbacks")
- public long getRollbacks()
- {
- return rollbacks;
- }
-}
\ No newline at end of file
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -1,78 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source
- * Copyright 2008, Red Hat Middleware LLC, and individual contributors
- * by the @authors tag. See the copyright.txt in the distribution for a
- * full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import java.util.Map.Entry;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-/**
- * // TODO: crappy and inefficient - but just a placeholder for now.
- *
- * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
- * @since 3.0
- */
-public class UnsortedDataContainer<K, V> implements DataContainer<K, V>
-{
- private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
-
- public Set<Entry<K, V>> getEntries()
- {
- return data.entrySet();
- }
-
- public Entry<K, V> getEntry(K k)
- {
- if (k == null) throw new NullPointerException("I don't like nulls!");
- for (Entry<K, V> e : data.entrySet())
- {
- if (k.equals(e.getKey())) return e;
- }
- return null;
- }
-
- public void putEntry(Entry<K, V> kvEntry)
- {
- data.put(kvEntry.getKey(), kvEntry.getValue());
- }
-
- public boolean exists(Entry<K, V> kvEntry)
- {
- return data.containsKey(kvEntry.getKey());
- }
-
- public int size()
- {
- return data.size();
- }
-
- public void clear()
- {
- data.clear();
- }
-
- public void removeEntry(K key)
- {
- data.remove(key);
- }
-}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,8 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
import org.jboss.starobrno.notifications.Notifier;
/**
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ClearCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
/**
* @author Mircea.Markus at jboss.com
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/EvictCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.Visitor;
import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
/**
* @author Mircea.Markus at jboss.com
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.Visitor;
import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
/**
* Implements functionality defined by {@link Cache#put(org.jboss.cache.Fqn, Object, Object)}.
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutMapCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
import java.util.Map;
import java.util.Map.Entry;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/RemoveCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.Visitor;
import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
/**
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/ReplaceCommand.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,8 +23,8 @@
import org.jboss.starobrno.commands.Visitor;
import org.jboss.starobrno.commands.read.AbstractDataCommand;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.mvcc.MVCCEntry;
/**
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,6 @@
*/
package org.jboss.starobrno.container;
-import java.util.Map.Entry;
import java.util.Set;
/**
@@ -32,17 +31,17 @@
*/
public interface DataContainer<K, V>
{
- Set<Entry<K, V>> getEntries();
+ V get(K k);
- Entry<K, V> getEntry(K k);
+ void put(K k, V v);
- void putEntry(Entry<K, V> entry);
+ boolean containsKey(K k);
- boolean exists(Entry<K, V> entry);
+ V remove(K k);
int size();
void clear();
- void removeEntry(K key);
+ Set<K> keySet();
}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java (from rev 6894, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntry.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,57 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.jboss.starobrno.context.InvocationContext;
+
+import java.util.Map.Entry;
+
+/**
+ * // TODO: MANIK: Document this
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public interface MVCCEntry extends Entry
+{
+ boolean isNullEntry();
+
+ void copyForUpdate(DataContainer container, boolean writeSkewCheck);
+
+ void commitUpdate(InvocationContext ctx, DataContainer container);
+
+ void rollbackUpdate();
+
+ boolean isChanged();
+
+ boolean isCreated();
+
+ void setCreated(boolean created);
+
+ boolean isDeleted();
+
+ void setDeleted(boolean deleted);
+
+ boolean isValid();
+
+ void setValid(boolean valid);
+}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/MVCCEntryCreator.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,179 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.lock.TimeoutException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.factories.EntryFactory;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.lock.LockManager;
+
+/**
+ * Wraps mvcc entries.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class MVCCEntryCreator
+{
+ DataContainer container;
+ boolean writeSkewCheck;
+ LockManager lockManager;
+ Configuration configuration;
+ long defaultLockAcquisitionTimeout;
+ EntryFactory entryFactory;
+
+ private static final Log log = LogFactory.getLog(MVCCEntryCreator.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+
+ @Inject
+ public void injectDependencies(DataContainer dataContainer, LockManager lockManager, Configuration configuration, EntryFactory entryFactory)
+ {
+ this.container = dataContainer;
+ this.configuration = configuration;
+ this.lockManager = lockManager;
+ this.entryFactory = entryFactory;
+ }
+
+ @Start
+ public void start()
+ {
+ defaultLockAcquisitionTimeout = configuration.getLockAcquisitionTimeout();
+ writeSkewCheck = configuration.isWriteSkewCheck();
+ }
+
+ public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext) throws InterruptedException
+ {
+ return wrapEntryForReading(ctx, key, putInContext, false);
+ }
+
+ public MVCCEntry wrapEntryForReading(InvocationContext ctx, Object key, boolean putInContext, boolean forceWriteLock) throws InterruptedException
+ {
+ // TODO: Do we need to wrap for reading if we are not in a TX?
+ // TODO: Also, do we need to wrap for reading even IN a TX if we are using read-committed?
+
+ MVCCEntry mvccEntry;
+ if (forceWriteLock)
+ {
+ if (trace) log.trace("Forcing lock on reading key " + key);
+ return wrapEntryForWriting(ctx, key, false, false);
+ }
+ else if ((mvccEntry = ctx.lookupEntry(key)) == null)
+ {
+ if (trace) log.trace("Key " + key + " is not in context, fetching from container.");
+ // simple implementation. Peek the node, wrap it, put wrapped node in the context.
+ Object value = container.get(key);
+ mvccEntry = entryFactory.createWrappedEntry(key, value, false);
+ if (mvccEntry != null && putInContext) ctx.putLookedUpEntry(mvccEntry);
+ return mvccEntry;
+ }
+ else
+ {
+ if (trace) log.trace("Key " + key + " is already in context.");
+ return mvccEntry;
+ }
+ }
+
+ public MVCCEntry wrapEntryForWriting(InvocationContext ctx, Object key, boolean createIfAbsent, boolean forceLockIfAbsent) throws InterruptedException
+ {
+ MVCCEntry mvccEntry = ctx.lookupEntry(key);
+ if (createIfAbsent && mvccEntry != null && mvccEntry.isNullEntry()) mvccEntry = null;
+ if (mvccEntry != null) // exists in context! Just acquire lock if needed, and wrap.
+ {
+ // acquire lock if needed
+ if (acquireLock(ctx, key))
+ {
+ // create a copy of the underlying node
+ mvccEntry.copyForUpdate(container, writeSkewCheck);
+ }
+ if (trace) log.trace("Retrieving wrapped node " + key);
+ if (mvccEntry.isDeleted() && createIfAbsent)
+ {
+ if (trace) log.trace("Node is deleted in current scope. Need to un-delete.");
+ mvccEntry.setDeleted(false);
+ mvccEntry.setValid(true);
+ }
+ }
+ else
+ {
+ // else, fetch from dataContainer.
+ Object value = container.get(key);
+ if (value != null)
+ {
+ // exists in cache! Just acquire lock if needed, and wrap.
+ // do we need a lock?
+ boolean needToCopy = false;
+ if (acquireLock(ctx, key)) needToCopy = true;
+ mvccEntry = entryFactory.createWrappedEntry(key, value, false);
+ ctx.putLookedUpEntry(mvccEntry);
+ if (needToCopy) mvccEntry.copyForUpdate(container, writeSkewCheck);
+ }
+ else if (createIfAbsent) // else, do we need to create one?
+ {
+ // now to lock and create the node. Lock first to prevent concurrent creation!
+ acquireLock(ctx, key);
+ mvccEntry = entryFactory.createWrappedEntry(key, value, true);
+ mvccEntry.setCreated(true);
+ ctx.putLookedUpEntry(mvccEntry);
+ mvccEntry.copyForUpdate(container, writeSkewCheck);
+ }
+ }
+
+ // see if we need to force the lock on nonexistent entries.
+ if (mvccEntry == null && forceLockIfAbsent) acquireLock(ctx, key);
+
+ return mvccEntry;
+ }
+
+ /**
+ * Attempts to lock a node if the lock isn't already held in the current scope, and records the lock in the context.
+ *
+ * @param ctx context
+ * @param fqn Fqn to lock
+ * @return true if a lock was needed and acquired, false if it didn't need to acquire the lock (i.e., lock was already held)
+ * @throws InterruptedException if interrupted
+ * @throws org.jboss.cache.lock.TimeoutException
+ * if we are unable to acquire the lock after a specified timeout.
+ */
+ private boolean acquireLock(InvocationContext ctx, Object key) throws InterruptedException, TimeoutException
+ {
+ // don't EVER use lockManager.isLocked() since with lock striping it may be the case that we hold the relevant
+ // lock which may be shared with another Fqn that we have a lock for already.
+ // nothing wrong, just means that we fail to record the lock. And that is a problem.
+ // Better to check our records and lock again if necessary.
+ if (!ctx.hasLockedKey(key))
+ {
+ if (!lockManager.lockAndRecord(key, ctx))
+ {
+ Object owner = lockManager.getOwner(key);
+ throw new TimeoutException("Unable to acquire lock on key [" + key + "] after [" + ctx.getLockAcquisitionTimeout(defaultLockAcquisitionTimeout) + "] milliseconds for requestor [" + lockManager.getLockOwner(ctx) + "]! Lock held by [" + owner + "]");
+ }
+ return true;
+ }
+ return false;
+ }
+}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/NullMarkerEntry.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,68 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+/**
+ * A marker node to represent a null node for repeatable read, so that a read that returns a null can continue to return
+ * null.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class NullMarkerEntry extends ReadCommittedEntry
+{
+ /**
+ * @return always returns true
+ */
+ @Override
+ public boolean isNullEntry()
+ {
+ return true;
+ }
+
+ /**
+ * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were removed.
+ */
+ @Override
+ public boolean isDeleted()
+ {
+ return true;
+ }
+
+ /**
+ * @return always returns true so that any get commands, upon getting this node, will ignore the node as though it were invalid.
+ */
+ @Override
+ public boolean isValid()
+ {
+ return false;
+ }
+
+ /**
+ * A no-op.
+ */
+ @Override
+ public void copyForUpdate(DataContainer d, boolean b)
+ {
+ // no op
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/ReadCommittedEntry.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,205 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import static org.jboss.starobrno.container.ReadCommittedEntry.Flags.*;
+import org.jboss.starobrno.context.InvocationContext;
+
+/**
+ * A node delegate that encapsulates read committed semantics when writes are initiated, committed or rolled back.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class ReadCommittedEntry implements MVCCEntry
+{
+ private static final Log log = LogFactory.getLog(ReadCommittedEntry.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ protected Object key, value, oldValue;
+ protected byte flags = 0;
+
+
+ protected ReadCommittedEntry()
+ {
+ setValid(true);
+ }
+
+ public ReadCommittedEntry(Object key, Object value)
+ {
+ setValid(true);
+ this.key = key;
+ this.value = value;
+ }
+
+ public Object getKey()
+ {
+ return key;
+ }
+
+ public Object getValue()
+ {
+ return value;
+ }
+
+ public Object setValue(Object value)
+ {
+ return this.value = value;
+ }
+
+ protected static enum Flags
+ {
+ CHANGED(0x1), CREATED(0x2), DELETED(0x4), VALID(0x8);
+ final byte mask;
+
+ Flags(int mask)
+ {
+ this.mask = (byte) mask;
+ }
+ }
+
+ /**
+ * Tests whether a flag is set.
+ *
+ * @param flag flag to test
+ * @return true if set, false otherwise.
+ */
+ protected final boolean isFlagSet(Flags flag)
+ {
+ return (flags & flag.mask) != 0;
+ }
+
+ /**
+ * Unility method that sets the value of the given flag to true.
+ *
+ * @param flag flag to set
+ */
+ protected final void setFlag(Flags flag)
+ {
+ flags |= flag.mask;
+ }
+
+ /**
+ * Utility method that sets the value of the flag to false.
+ *
+ * @param flag flag to unset
+ */
+ protected final void unsetFlag(Flags flag)
+ {
+ flags &= ~flag.mask;
+ }
+
+ public boolean isNullEntry()
+ {
+ return false;
+ }
+
+ public void copyForUpdate(DataContainer container, boolean writeSkewCheck)
+ {
+ if (isFlagSet(CHANGED)) return; // already copied
+
+ setFlag(CHANGED); // mark as changed
+
+ // if newly created, then nothing to copy.
+ if (!isFlagSet(CREATED)) oldValue = value;
+ }
+
+ @SuppressWarnings("unchecked")
+ public void commitUpdate(InvocationContext ctx, DataContainer container)
+ {
+ // only do stuff if there are changes.
+ if (isFlagSet(CHANGED))
+ {
+ if (trace)
+ log.trace("Updating entry [" + getKey() + "]. deleted=" + isDeleted() + " valid=" + isValid() + " changed=" + isChanged() + " created=" + isFlagSet(CREATED));
+ if (isFlagSet(DELETED))
+ {
+ container.remove(key);
+
+ }
+ else
+ {
+ container.put(key, value);
+ }
+ reset();
+ }
+ }
+
+ private void reset()
+ {
+ oldValue = null;
+ flags = 0;
+ setValid(true);
+ }
+
+ public void rollbackUpdate()
+ {
+ value = oldValue;
+ reset();
+ }
+
+ public boolean isChanged()
+ {
+ return isFlagSet(CHANGED);
+ }
+
+ public boolean isValid()
+ {
+ return isFlagSet(VALID);
+ }
+
+ public void setValid(boolean valid)
+ {
+ if (valid)
+ setFlag(VALID);
+ else
+ unsetFlag(VALID);
+ }
+
+ public boolean isCreated()
+ {
+ return isFlagSet(CREATED);
+ }
+
+ public void setCreated(boolean created)
+ {
+ if (created)
+ setFlag(CREATED);
+ else
+ unsetFlag(CREATED);
+ }
+
+ public boolean isDeleted()
+ {
+ return isFlagSet(DELETED);
+ }
+
+ public void setDeleted(boolean deleted)
+ {
+ if (deleted)
+ setFlag(DELETED);
+ else
+ unsetFlag(DELETED);
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java (from rev 6895, core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/RepeatableReadEntry.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,69 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.mvcc.RepeatableReadNode;
+import org.jboss.starobrno.CacheException;
+import static org.jboss.starobrno.container.ReadCommittedEntry.Flags.CHANGED;
+
+/**
+ * A node delegate that encapsulates repeatable read semantics when writes are initiated, committed or rolled back.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class RepeatableReadEntry extends ReadCommittedEntry
+{
+ private static final Log log = LogFactory.getLog(RepeatableReadNode.class);
+
+ public RepeatableReadEntry(Object key, Object value)
+ {
+ super(key, value);
+ }
+
+ @Override
+ public void copyForUpdate(DataContainer container, boolean writeSkewCheck)
+ {
+ if (isFlagSet(CHANGED)) return; // already copied
+
+ // mark node as changed.
+ setFlag(CHANGED);
+
+ if (writeSkewCheck)
+ {
+ // check for write skew.
+ Object actualValue = container.get(key);
+
+ if (actualValue != null && actualValue != value)
+ {
+ String errormsg = new StringBuilder().append("Detected write skew on key [").append(getKey()).append("]. Another process has changed the entry since we last read it!").toString();
+ if (log.isWarnEnabled()) log.warn(errormsg + ". Unable to copy entry for update.");
+ throw new CacheException(errormsg);
+ }
+ }
+
+ // make a backup copy
+ oldValue = value;
+ }
+}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,6 @@
*/
package org.jboss.starobrno.container;
-import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -36,29 +35,24 @@
{
private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
- public Set<Entry<K, V>> getEntries()
+ public V get(K k)
{
- return data.entrySet();
+ return data.get(k);
}
- public Entry<K, V> getEntry(K k)
+ public void put(K k, V v)
{
- if (k == null) throw new NullPointerException("I don't like nulls!");
- for (Entry<K, V> e : data.entrySet())
- {
- if (k.equals(e.getKey())) return e;
- }
- return null;
+ data.put(k, v);
}
- public void putEntry(Entry<K, V> kvEntry)
+ public boolean containsKey(K k)
{
- data.put(kvEntry.getKey(), kvEntry.getValue());
+ return data.containsKey(k);
}
- public boolean exists(Entry<K, V> kvEntry)
+ public V remove(K k)
{
- return data.containsKey(kvEntry.getKey());
+ return data.remove(k);
}
public int size()
@@ -71,8 +65,8 @@
data.clear();
}
- public void removeEntry(K key)
+ public Set<K> keySet()
{
- data.remove(key);
+ return data.keySet();
}
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/EntryLookup.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,7 +21,7 @@
*/
package org.jboss.starobrno.context;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
import java.util.Map;
import java.util.Set;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/InvocationContextImpl.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -26,7 +26,7 @@
import org.jboss.cache.util.Immutables;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,7 +24,7 @@
import org.jboss.cache.util.Immutables;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.config.Option;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
import javax.transaction.RollbackException;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -23,7 +23,6 @@
import org.jboss.cache.RegionRegistry;
-import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.invocation.CacheInvocationDelegate;
import org.jboss.cache.loader.CacheLoaderManager;
@@ -31,12 +30,13 @@
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
+import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.config.ConfigurationException;
+import org.jboss.starobrno.container.MVCCEntryCreator;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
import org.jboss.starobrno.factories.context.ContextFactory;
import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.mvcc.MVCCEntryWrapper;
import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.TransactionTable;
@@ -48,7 +48,7 @@
*/
@DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
- CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryWrapper.class,
+ CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryCreator.class,
LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
ContextFactory.class, EntryFactory.class, CommandsFactory.class})
public class EmptyConstructorFactory extends ComponentFactory
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactory.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,10 +21,8 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.starobrno.mvcc.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntry;
-import java.util.Map.Entry;
-
/**
* // TODO: MANIK: Document this
*
@@ -33,7 +31,5 @@
*/
public interface EntryFactory
{
- Entry createEntry(Object key, Object value, boolean putInContainer);
-
- MVCCEntry createWrappedEntry(Entry entry);
+ MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert);
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,16 +24,13 @@
import org.jboss.cache.lock.IsolationLevel;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.NullMarkerEntry;
+import org.jboss.starobrno.container.ReadCommittedEntry;
+import org.jboss.starobrno.container.RepeatableReadEntry;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
-import org.jboss.starobrno.mvcc.EntryImpl;
-import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.mvcc.NullMarkerEntry;
-import org.jboss.starobrno.mvcc.ReadCommittedEntry;
-import org.jboss.starobrno.mvcc.RepeatableReadEntry;
-import java.util.Map.Entry;
-
/**
* // TODO: MANIK: Document this
*
@@ -61,18 +58,11 @@
useRepeatableRead = configuration.getIsolationLevel() == IsolationLevel.REPEATABLE_READ;
}
- public Entry createEntry(Object key, Object value, boolean putInContainer)
+ public MVCCEntry createWrappedEntry(Object key, Object value, boolean isForInsert)
{
- Entry e = new EntryImpl(key, value);
- if (putInContainer) dataContainer.putEntry(e);
- return e;
- }
+ if (value == null && !isForInsert) return useRepeatableRead ? NULL_MARKER : null;
- public MVCCEntry createWrappedEntry(Entry entry)
- {
- if (entry == null) return useRepeatableRead ? NULL_MARKER : null;
-
- MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(entry) : new ReadCommittedEntry(entry);
+ MVCCEntry mvccEntry = useRepeatableRead ? new RepeatableReadEntry(key, value) : new ReadCommittedEntry(key, value);
return mvccEntry;
}
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InterceptorChain.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -24,13 +24,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
-import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.cache.util.CachePrinter;
import java.util.ArrayList;
import java.util.Collections;
@@ -75,6 +74,7 @@
@Start
private void printChainInfo()
{
+ if (log.isDebugEnabled()) log.debug("Interceptor chain size: " + size());
if (log.isDebugEnabled()) log.debug("Interceptor chain is: " + toString());
}
@@ -358,14 +358,11 @@
{
StringBuilder sb = new StringBuilder();
CommandInterceptor i = firstInChain;
- if (i != null)
+ while (i != null)
{
- if (i.getNext() != null)
- {
- sb.append(i.getNext()).append("\n");
- }
- sb.append("\t>> ");
+ sb.append("\n\t>> ");
sb.append(i.getClass().getName());
+ i = i.getNext();
}
return sb.toString();
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -21,6 +21,7 @@
*/
package org.jboss.starobrno.interceptors;
+import org.jboss.cache.lock.IsolationLevel;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.read.GetKeyValueCommand;
import org.jboss.starobrno.commands.read.GravitateDataCommand;
@@ -35,16 +36,18 @@
import org.jboss.starobrno.commands.write.RemoveCommand;
import org.jboss.starobrno.commands.write.ReplaceCommand;
import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.MVCCEntry;
+import org.jboss.starobrno.container.MVCCEntryCreator;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.interceptors.base.PrePostProcessingCommandInterceptor;
import org.jboss.starobrno.lock.LockManager;
-import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.mvcc.MVCCEntryWrapper;
+import java.util.HashSet;
import java.util.List;
import java.util.ListIterator;
-import java.util.Map.Entry;
+import java.util.Set;
/**
* Interceptor to implement <a href="http://wiki.jboss.org/wiki/JBossCacheMVCC">MVCC</a> functionality.
@@ -57,16 +60,23 @@
{
LockManager lockManager;
DataContainer dataContainer;
- MVCCEntryWrapper entryWrapper;
+ MVCCEntryCreator entryWrapper;
+ boolean useReadCommitted;
@Inject
- public void setDependencies(LockManager lockManager, DataContainer dataContainer, MVCCEntryWrapper entryWrapper)
+ public void setDependencies(LockManager lockManager, DataContainer dataContainer, MVCCEntryCreator entryWrapper)
{
this.lockManager = lockManager;
this.dataContainer = dataContainer;
this.entryWrapper = entryWrapper;
}
+ @Start
+ private void determineIsolationLevel()
+ {
+ useReadCommitted = configuration.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
+ }
+
@Override
protected boolean doBeforeCall(InvocationContext ctx, VisitableCommand command)
{
@@ -138,11 +148,7 @@
public Object handleClearCommand(InvocationContext ctx, ClearCommand command) throws Throwable
{
// get a snapshot of all keys in the data container
- for (Object o : dataContainer.getEntries())
- {
- Object key = ((Entry) o).getKey();
- entryWrapper.wrapEntryForWriting(ctx, key, false, false);
- }
+ for (Object key : dataContainer.keySet()) entryWrapper.wrapEntryForWriting(ctx, key, false, false);
return invokeNextInterceptor(ctx, command);
}
@@ -211,7 +217,22 @@
}
else
{
- if (trace) log.trace("Nothing to do since there is a transaction in scope.");
+ if (useReadCommitted)
+ {
+ // wipe all unchanged entries from context, to force subsequent reads to go to the container,
+ // hence providing R_C semantics.
+ Set<Object> keysToRemove = new HashSet<Object>();
+ for (MVCCEntry e : ctx.getLookedUpEntries().values())
+ {
+ if (e.isChanged()) keysToRemove.add(e.getKey());
+ }
+ for (Object k : keysToRemove) ctx.removeKeyLocked(k);
+ }
+ else
+ {
+ if (trace) log.trace("Nothing to do since there is a transaction in scope.");
+ }
+
}
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -36,13 +36,13 @@
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.MVCCEntry;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Destroy;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.factories.annotations.Stop;
-import org.jboss.starobrno.mvcc.MVCCEntry;
import org.jboss.starobrno.notifications.event.NodeModifiedEvent;
import org.jgroups.View;
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-09 12:52:15 UTC (rev 6896)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/BasicTest.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -31,7 +31,7 @@
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @since 3.0
*/
- at Test
+ at Test(groups = "functional")
public class BasicTest
{
public void basicTest()
Added: core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/profiling/MemConsumptionTest.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,114 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.cache.profiling;
+
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.io.IOException;
+import java.text.NumberFormat;
+import java.util.Arrays;
+import java.util.Random;
+import java.util.concurrent.TimeUnit;
+
+ at Test(groups = "profiling")
+public class MemConsumptionTest
+{
+ // adjust the next 3 numbers
+ int numEntries = 1000000;
+ int payloadSize = 20; // bytes
+ int keySize = 10; // bytes
+ int bytesPerChar = 2;
+
+ Random r = new Random();
+
+ public void testMemConsumption() throws IOException
+ {
+ int kBytesCached = (bytesPerChar * numEntries * (payloadSize + keySize)) / 1024;
+ System.out.println("Bytes to be cached: " + NumberFormat.getIntegerInstance().format(kBytesCached) + " kb");
+
+// Cache<byte[], byte[]> c = new DefaultCacheFactory<byte[], byte[]>().createCache(); // default LOCAL cache
+ Cache<String, String> c = new DefaultCacheFactory<String, String>().createCache(); // default LOCAL cache
+// Map<String, String> c = new ConcurrentHashMap<String, String>();
+ long start = System.nanoTime();
+ for (int i = 0; i < numEntries; i++)
+ {
+// c.put(generateUniqueKey(i, keySize), generateBytePayload(payloadSize));
+ c.put(generateUniqueString(i, keySize), generateRandomString(payloadSize));
+ if (i % 1000 == 0) System.out.println("Added " + i + " entries");
+ }
+ long duration = System.nanoTime() - start;
+ System.out.println("Done, took " + TimeUnit.NANOSECONDS.toSeconds(duration) + " seconds!");
+
+ System.out.println("Calling System.gc()");
+ System.gc(); // clear any unnecessary objects
+
+ TestingUtil.sleepThread(1000); // wait for gc
+
+ // wait for manual test exit
+ System.out.println("Cache populated; check mem usage using jconsole, etc.!");
+ System.in.read();
+ }
+
+ private String generateUniqueString(int runNumber, int keySize)
+ {
+ // string size should be exactly equal to key size but also be unique.
+ // start by creating a string from the run number
+ StringBuilder sb = new StringBuilder();
+ // append the run number
+ sb.append(runNumber);
+ for (int i = sb.length(); i < keySize; i++) sb.append("_");
+ return sb.toString();
+ }
+
+ private byte[] generateUniqueKey(int runNumber, int keySize)
+ {
+ byte[] b = new byte[keySize];
+ b[0] = (byte) (runNumber >>> 0);
+ b[1] = (byte) (runNumber >>> 8);
+ b[2] = (byte) (runNumber >>> 16);
+ b[3] = (byte) (runNumber >>> 24);
+
+ for (int i = 4; i < keySize; i++) b[i] = 0;
+ return b;
+ }
+
+ private byte[] generateBytePayload(int payloadSize)
+ {
+ byte[] b = new byte[payloadSize];
+ Arrays.fill(b, (byte) 0);
+ return b;
+ }
+
+ private String generateRandomString(int stringSize)
+ {
+ StringBuilder sb = new StringBuilder();
+ for (int i = 0; i < stringSize; i++)
+ {
+ sb.append(r.nextInt(9)); // single digit
+ }
+ assert sb.length() == stringSize;
+ return sb.toString();
+ }
+}
Added: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-10-09 13:22:09 UTC (rev 6897)
@@ -0,0 +1,641 @@
+/*
+ * JBoss, the OpenSource J2EE webOS
+ *
+ * Distributable under LGPL license.
+ * See terms of license at gnu.org.
+ */
+
+package org.jboss.starobrno.util;
+
+import org.jboss.cache.CacheStatus;
+import org.jboss.cache.invocation.CacheInvocationDelegate;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.interceptors.base.CommandInterceptor;
+import org.jboss.starobrno.lock.LockManager;
+import org.jgroups.JChannel;
+
+import java.io.File;
+import java.lang.reflect.Field;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Random;
+
+/**
+ * Utilities for unit testing JBossCache.
+ *
+ * @author <a href="mailto://brian.stansberry@jboss.com">Brian Stansberry</a>
+ * @version $Revision$
+ */
+public class TestingUtil
+{
+ private static Random random = new Random();
+
+ /**
+ * Extracts the value of a field in a given target instance using reflection, able to extract private fields as well.
+ *
+ * @param target object to extract field from
+ * @param fieldName name of field to extract
+ * @return field value
+ */
+ public static Object extractField(Object target, String fieldName)
+ {
+ return extractField(target.getClass(), target, fieldName);
+ }
+
+ public static void replaceField(Object newValue, String fieldName, Object owner, Class baseType)
+ {
+ Field field;
+ try
+ {
+ field = baseType.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ field.set(owner, newValue);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);//just to simplify exception handeling
+ }
+ }
+
+
+ public static Object extractField(Class type, Object target, String fieldName)
+ {
+ Field field;
+ try
+ {
+ field = type.getDeclaredField(fieldName);
+ field.setAccessible(true);
+ return field.get(target);
+ }
+ catch (Exception e)
+ {
+ if (type.equals(Object.class))
+ {
+ e.printStackTrace();
+ return null;
+ }
+ else
+ {
+ // try with superclass!!
+ return extractField(type.getSuperclass(), target, fieldName);
+ }
+ }
+ }
+
+ public static <T extends CommandInterceptor> T findInterceptor(CacheSPI<?, ?> cache, Class<T> interceptorToFind)
+ {
+ for (CommandInterceptor i : cache.getInterceptorChain())
+ {
+ if (interceptorToFind.isInstance(i)) return interceptorToFind.cast(i);
+ }
+ return null;
+ }
+
+ /**
+ * Injects an interceptor after a specified interceptor in a running cache. Your new interceptor need not be
+ * initialised with pointers to the next interceptor, etc. as this method does all that for you, including calling
+ * setCache().
+ *
+ * @param cache running cache instance
+ * @param interceptorToInject interceptor instance to inject.
+ * @param interceptorAfterWhichToInject class of interceptor to search for in the chain and after which to add your interceptor
+ */
+ public static void injectInterceptor(CacheSPI<?, ?> cache, CommandInterceptor interceptorToInject, Class<? extends CommandInterceptor> interceptorAfterWhichToInject)
+ {
+ cache.addInterceptor(interceptorToInject, interceptorAfterWhichToInject);
+ }
+
+ /**
+ * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.Cache[])}
+ * until it either returns true or <code>timeout</code> ms have elapsed.
+ *
+ * @param caches caches which must all have consistent views
+ * @param timeout max number of ms to loop
+ * @throws RuntimeException if <code>timeout</code> ms have elapse without
+ * all caches having the same number of members.
+ */
+ public static void blockUntilViewsReceived(Cache[] caches, long timeout)
+ {
+ long failTime = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < failTime)
+ {
+ sleepThread(100);
+ if (areCacheViewsComplete(caches))
+ {
+ return;
+ }
+ }
+
+ throw new RuntimeException("timed out before caches had complete views");
+ }
+
+ /**
+ * Version of blockUntilViewsReceived that uses varargs
+ */
+ public static void blockUntilViewsReceived(long timeout, Cache... caches)
+ {
+ blockUntilViewsReceived(caches, timeout);
+ }
+
+ /**
+ * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.CacheSPI[])}
+ * until it either returns true or <code>timeout</code> ms have elapsed.
+ *
+ * @param caches caches which must all have consistent views
+ * @param timeout max number of ms to loop
+ * @throws RuntimeException if <code>timeout</code> ms have elapse without
+ * all caches having the same number of members.
+ */
+ public static void blockUntilViewsReceived(CacheSPI[] caches, long timeout)
+ {
+ long failTime = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < failTime)
+ {
+ sleepThread(100);
+ if (areCacheViewsComplete(caches))
+ {
+ return;
+ }
+ }
+
+ throw new RuntimeException("timed out before caches had complete views");
+ }
+
+
+ /**
+ * An overloaded version of {@link #blockUntilViewsReceived(long,org.jboss.cache.Cache[])} that allows for 'shrinking' clusters.
+ * I.e., the usual method barfs if there are more members than expected. This one takes a param (barfIfTooManyMembers) which,
+ * if false, will NOT barf but will wait until the cluster 'shrinks' to the desired size. Useful if in tests, you kill
+ * a member and want to wait until this fact is known across the cluster.
+ *
+ * @param timeout
+ * @param barfIfTooManyMembers
+ * @param caches
+ */
+ public static void blockUntilViewsReceived(long timeout, boolean barfIfTooManyMembers, Cache... caches)
+ {
+ long failTime = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < failTime)
+ {
+ sleepThread(100);
+ if (areCacheViewsComplete(caches, barfIfTooManyMembers))
+ {
+ return;
+ }
+ }
+
+ throw new RuntimeException("timed out before caches had complete views");
+ }
+
+ /**
+ * Loops, continually calling {@link #areCacheViewsComplete(org.jboss.cache.CacheSPI[])}
+ * until it either returns true or <code>timeout</code> ms have elapsed.
+ *
+ * @param groupSize number of caches expected in the group
+ * @param timeout max number of ms to loop
+ * @throws RuntimeException if <code>timeout</code> ms have elapse without
+ * all caches having the same number of members.
+ */
+ public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long timeout)
+ {
+ blockUntilViewReceived(cache, groupSize, timeout, true);
+ }
+
+ public static void blockUntilViewReceived(CacheSPI cache, int groupSize, long timeout, boolean barfIfTooManyMembersInView)
+ {
+ long failTime = System.currentTimeMillis() + timeout;
+
+ while (System.currentTimeMillis() < failTime)
+ {
+ sleepThread(100);
+ if (isCacheViewComplete(cache, groupSize, barfIfTooManyMembersInView))
+ {
+ return;
+ }
+ }
+
+ throw new RuntimeException("timed out before caches had complete views");
+ }
+
+ /**
+ * Checks each cache to see if the number of elements in the array
+ * returned by {@link CacheSPI#getMembers()} matches the size of
+ * the <code>caches</code> parameter.
+ *
+ * @param caches caches that should form a View
+ * @return <code>true</code> if all caches have
+ * <code>caches.length</code> members; false otherwise
+ * @throws IllegalStateException if any of the caches have MORE view
+ * members than caches.length
+ */
+ public static boolean areCacheViewsComplete(Cache[] caches)
+ {
+ return areCacheViewsComplete(caches, true);
+ }
+
+ public static boolean areCacheViewsComplete(Cache[] caches, boolean barfIfTooManyMembers)
+ {
+ int memberCount = caches.length;
+
+ for (int i = 0; i < memberCount; i++)
+ {
+ if (!isCacheViewComplete(caches[i], memberCount, barfIfTooManyMembers))
+ {
+ return false;
+ }
+ }
+
+ return true;
+ }
+
+ /**
+ * Checks each cache to see if the number of elements in the array
+ * returned by {@link org.jboss.cache.RPCManager#getMembers()} matches the size of
+ * the <code>caches</code> parameter.
+ *
+ * @param caches caches that should form a View
+ * @return <code>true</code> if all caches have
+ * <code>caches.length</code> members; false otherwise
+ * @throws IllegalStateException if any of the caches have MORE view
+ * members than caches.length
+ */
+ public static boolean areCacheViewsComplete(CacheSPI[] caches)
+ {
+ if (caches == null) throw new NullPointerException("Cache impl array is null");
+ Cache[] c = new Cache[caches.length];
+ for (int i = 0; i < caches.length; i++) c[i] = caches[i];
+ return areCacheViewsComplete(c);
+ }
+
+ /**
+ * @param cache
+ * @param memberCount
+ */
+ public static boolean isCacheViewComplete(CacheSPI cache, int memberCount)
+ {
+ List members = cache.getRPCManager().getMembers();
+ if (members == null || memberCount > members.size())
+ {
+ return false;
+ }
+ else if (memberCount < members.size())
+ {
+ // This is an exceptional condition
+ StringBuilder sb = new StringBuilder("Cache at address ");
+ sb.append(cache.getRPCManager().getLocalAddress());
+ sb.append(" had ");
+ sb.append(members.size());
+ sb.append(" members; expecting ");
+ sb.append(memberCount);
+ sb.append(". Members were (");
+ for (int j = 0; j < members.size(); j++)
+ {
+ if (j > 0)
+ {
+ sb.append(", ");
+ }
+ sb.append(members.get(j));
+ }
+ sb.append(')');
+
+ throw new IllegalStateException(sb.toString());
+ }
+
+ return true;
+ }
+
+ /**
+ * @param c
+ * @param memberCount
+ */
+ public static boolean isCacheViewComplete(Cache c, int memberCount)
+ {
+ return isCacheViewComplete(c, memberCount, true);
+ }
+
+ public static boolean isCacheViewComplete(Cache c, int memberCount, boolean barfIfTooManyMembers)
+ {
+ CacheSPI cache = (CacheSPI) c;
+ List members = cache.getRPCManager().getMembers();
+ if (members == null || memberCount > members.size())
+ {
+ return false;
+ }
+ else if (memberCount < members.size())
+ {
+ if (barfIfTooManyMembers)
+ {
+ // This is an exceptional condition
+ StringBuilder sb = new StringBuilder("Cache at address ");
+ sb.append(cache.getRPCManager().getLocalAddress());
+ sb.append(" had ");
+ sb.append(members.size());
+ sb.append(" members; expecting ");
+ sb.append(memberCount);
+ sb.append(". Members were (");
+ for (int j = 0; j < members.size(); j++)
+ {
+ if (j > 0)
+ {
+ sb.append(", ");
+ }
+ sb.append(members.get(j));
+ }
+ sb.append(')');
+
+ throw new IllegalStateException(sb.toString());
+ }
+ else return false;
+ }
+
+ return true;
+ }
+
+
+ /**
+ * Puts the current thread to sleep for the desired number of ms, suppressing
+ * any exceptions.
+ *
+ * @param sleeptime number of ms to sleep
+ */
+ public static void sleepThread(long sleeptime)
+ {
+ try
+ {
+ Thread.sleep(sleeptime);
+ }
+ catch (InterruptedException ie)
+ {
+ }
+ }
+
+ public static void sleepRandom(int maxTime)
+ {
+ sleepThread(random.nextInt(maxTime));
+ }
+
+ public static void recursiveFileRemove(String directoryName)
+ {
+ File file = new File(directoryName);
+ recursiveFileRemove(file);
+ }
+
+ public static void recursiveFileRemove(File file)
+ {
+ if (file.exists())
+ {
+ System.out.println("Deleting file " + file);
+ recursivedelete(file);
+ }
+ }
+
+ private static void recursivedelete(File f)
+ {
+ if (f.isDirectory())
+ {
+ File[] files = f.listFiles();
+ for (File file : files)
+ {
+ recursivedelete(file);
+ }
+ }
+ //System.out.println("File " + f.toURI() + " deleted = " + f.delete());
+ f.delete();
+ }
+
+ /**
+ * Kills a cache - stops it, clears any data in any cache loaders, and rolls back any associated txs
+ */
+ public static void killCaches(Cache... caches)
+ {
+ for (Cache c : caches)
+ {
+ try
+ {
+ if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+ {
+ CacheSPI spi = (CacheSPI) c;
+ if (spi.getTransactionManager() != null)
+ {
+ try
+ {
+ spi.getTransactionManager().rollback();
+ }
+ catch (Exception e)
+ {
+ // don't care
+ }
+ }
+
+ // TODO fix cache loader stuff
+ /*
+ CacheLoaderManager clm = spi.getCacheLoaderManager();
+ CacheLoader cl = clm == null ? null : clm.getCacheLoader();
+ if (cl != null)
+ {
+ try
+ {
+ cl.remove(Fqn.ROOT);
+ }
+ catch (Exception e)
+ {
+ // don't care
+ }
+ }
+ */
+
+ spi.stop();
+ }
+ }
+ catch (Throwable t)
+ {
+
+ }
+ }
+ }
+
+ /**
+ * Clears any associated transactions with the current thread in the caches' transaction managers.
+ */
+ public static void killTransactions(Cache... caches)
+ {
+ for (Cache c : caches)
+ {
+ if (c != null && c.getCacheStatus() == CacheStatus.STARTED)
+ {
+ CacheSPI ci = (CacheSPI) c;
+ if (ci.getTransactionManager() != null)
+ {
+ try
+ {
+ ci.getTransactionManager().rollback();
+ }
+ catch (Exception e)
+ {
+ // don't care
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * For testing only - introspects a cache and extracts the ComponentRegistry
+ *
+ * @param cache cache to introspect
+ * @return component registry
+ */
+ public static ComponentRegistry extractComponentRegistry(Cache cache)
+ {
+ return (ComponentRegistry) extractField(cache, "componentRegistry");
+ }
+
+ public static LockManager extractLockManager(Cache cache)
+ {
+ return extractComponentRegistry(cache).getComponent(LockManager.class);
+ }
+
+ /**
+ * For testing only - introspects a cache and extracts the ComponentRegistry
+ *
+ * @param ci interceptor chain to introspect
+ * @return component registry
+ */
+ public static ComponentRegistry extractComponentRegistry(InterceptorChain ci)
+ {
+ return (ComponentRegistry) extractField(ci, "componentRegistry");
+ }
+
+
+ /**
+ * Replaces the existing interceptor chain in the cache wih one represented by the interceptor passed in. This
+ * utility updates dependencies on all components that rely on the interceptor chain as well.
+ *
+ * @param cache cache that needs to be altered
+ * @param interceptor the first interceptor in the new chain.
+ */
+ public static void replaceInterceptorChain(CacheSPI<?, ?> cache, CommandInterceptor interceptor)
+ {
+ ComponentRegistry cr = extractComponentRegistry(cache);
+ // make sure all interceptors here are wired.
+ CommandInterceptor i = interceptor;
+ do
+ {
+ cr.wireDependencies(i);
+ }
+ while ((i = i.getNext()) != null);
+
+ InterceptorChain inch = cr.getComponent(InterceptorChain.class);
+ inch.setFirstInChain(interceptor);
+ }
+
+ /**
+ * Retrieves the remote delegate for a given cache. It is on this remote delegate that the JGroups RPCDispatcher
+ * invokes remote methods.
+ *
+ * @param cache cache instance for which a remote delegate is to be retrieved
+ * @return remote delegate, or null if the cacge is not configured for replication.
+ */
+ public static CacheInvocationDelegate getInvocationDelegate(CacheSPI cache)
+ {
+ ComponentRegistry cr = extractComponentRegistry(cache);
+ return cr.getComponent(CacheInvocationDelegate.class);
+ }
+
+ /**
+ * Blocks until the cache has reached a specified state.
+ *
+ * @param cache cache to watch
+ * @param cacheStatus status to wait for
+ * @param timeout timeout to wait for
+ */
+ public static void blockUntilCacheStatusAchieved(Cache cache, CacheStatus cacheStatus, long timeout)
+ {
+ CacheSPI spi = (CacheSPI) cache;
+ long killTime = System.currentTimeMillis() + timeout;
+ while (System.currentTimeMillis() < killTime)
+ {
+ if (spi.getCacheStatus() == cacheStatus) return;
+ sleepThread(50);
+ }
+ throw new RuntimeException("Timed out waiting for condition");
+ }
+
+ public static void replicateCommand(CacheSPI cache, VisitableCommand command) throws Throwable
+ {
+ ComponentRegistry cr = extractComponentRegistry(cache);
+ InterceptorChain ic = cr.getComponent(InterceptorChain.class);
+ ic.invoke(command);
+ }
+
+ public static void blockUntilViewsReceived(int timeout, List caches)
+ {
+ blockUntilViewsReceived((Cache[]) caches.toArray(new Cache[]{}), timeout);
+ }
+
+
+ public static CommandsFactory extractCommandsFactory(CacheSPI<Object, Object> cache)
+ {
+ return (CommandsFactory) extractField(cache, "commandsFactory");
+ }
+
+ public static String getJGroupsAttribute(Cache cache, String protocol, String attribute)
+ {
+ String s = ((JChannel) ((CacheSPI) cache).getRPCManager().getChannel()).getProperties();
+ String[] protocols = s.split(":");
+ String attribs = null;
+ for (String p : protocols)
+ {
+ boolean hasAttribs = p.contains("(");
+ String name = hasAttribs ? p.substring(0, p.indexOf('(')) : p;
+ attribs = hasAttribs ? p.substring(p.indexOf('(') + 1, p.length() - 1) : null;
+
+ if (name.equalsIgnoreCase(protocol)) break;
+ }
+
+ if (attribs != null)
+ {
+ String[] attrArray = attribs.split(";");
+ for (String a : attrArray)
+ {
+ String[] kvPairs = a.split("=");
+ if (kvPairs[0].equalsIgnoreCase(attribute)) return kvPairs[1];
+ }
+ }
+ return null;
+ }
+
+ public static void dumpCacheContents(List caches)
+ {
+ System.out.println("**** START: Cache Contents ****");
+ int count = 1;
+ for (Object o : caches)
+ {
+ CacheSPI c = (CacheSPI) o;
+ if (c == null)
+ {
+ System.out.println(" ** Cache " + count + " is null!");
+ }
+ else
+ {
+ System.out.println(" ** Cache " + count + " is " + c.getRPCManager().getLocalAddress());
+// System.out.println(" " + CachePrinter.printCacheDetails(c));
+ }
+ count++;
+ }
+ System.out.println("**** END: Cache Contents ****");
+ }
+
+ public static void dumpCacheContents(Cache... caches)
+ {
+ dumpCacheContents(Arrays.asList(caches));
+ }
+}
More information about the jbosscache-commits
mailing list