[jbosscache-commits] JBoss Cache SVN: r6892 - in core/branches/flat/src/main/java/org/jboss: cache/util/concurrent/locks and 14 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Thu Oct 9 04:56:48 EDT 2008
Author: manik.surtani at jboss.com
Date: 2008-10-09 04:56:47 -0400 (Thu, 09 Oct 2008)
New Revision: 6892
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/container/
core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/DataContainerFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/LockManagerFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ReplicationException.java
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/OrderedSynchronizationHandler.java
Removed:
core/branches/flat/src/main/java/org/jboss/cache/Cache.java
core/branches/flat/src/main/java/org/jboss/cache/CacheSPI.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifier/
Modified:
core/branches/flat/src/main/java/org/jboss/cache/RPCManagerImpl.java
core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java
core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GravitateDataCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/SizeCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/ClusteredGetCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/DataGravitationCleanupCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContext.java
core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/notifications/event/NodeModifiedEvent.java
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
Log:
Several pints of Starobrno
Deleted: core/branches/flat/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/Cache.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/cache/Cache.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -1,539 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache;
-
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.config.Configuration;
-import org.jgroups.Address;
-
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * Interface for a Cache where data mappings are grouped and stored in a tree data
- * structure consisting of {@link Node}s.
- * <p/>
- * This is the central construct and basic client API of JBoss Cache and is used for
- * cache-wide operations.
- * <p/>
- * The cache is constructed using a {@link CacheFactory} and is started
- * using {@link #start}, if not already started by the CacheFactory.
- * <p/>
- * Once constructed, the Cache interface can be used to create or access {@link Node}s, which contain data. Once references
- * to {@link Node}s are obtained, data can be stored in them,
- * <p/>
- * As a convenience (and mainly to provide a familiar API to the older JBoss Cache 1.x.x releases) methods are provided that
- * operate directly on nodes.
- * <ul>
- * <li>{@link #put(Fqn,Object,Object)} </li>
- * <li>{@link #put(Fqn,java.util.Map)} </li>
- * <li>{@link #get(Fqn,Object)} </li>
- * <li>{@link #remove(Fqn,Object)} </li>
- * <li>{@link #removeNode(Fqn)} </li>
- * </ul>
- * <p/>
- * A simple example of usage:
- * <pre>
- * // creates with default settings and starts the cache
- * Cache cache = DefaultCacheFactory.getInstance().createCache();
- * Fqn personRecords = Fqn.fromString("/org/mycompany/personRecords");
- * <p/>
- * Node rootNode = cache.getRoot();
- * Node personRecordsNode = rootNode.addChild(personRecords);
- * <p/>
- * // now add some person records.
- * Fqn peterGriffin = Fqn.fromString("/peterGriffin");
- * Fqn stewieGriffin = Fqn.fromString("/stewieGriffin");
- * <p/>
- * // the addChild() API uses relative Fqns
- * Node peter = personRecordsNode.addChild(peterGriffin);
- * Node stewie = personRecordsNode.addChild(stewieGriffin);
- * <p/>
- * peter.put("name", "Peter Griffin");
- * peter.put("ageGroup", "MidLifeCrisis");
- * peter.put("homicidal", Boolean.FALSE);
- * <p/>
- * stewie.put("name", "Stewie Griffin");
- * stewie.put("ageGroup", "Infant");
- * stewie.put("homicidal", Boolean.TRUE);
- * <p/>
- * peter.getFqn().toString(); // will print out /org/mycompany/personRecords/peterGriffin
- * stewie.getFqn().toString(); // will print out /org/mycompany/personRecords/stewieGriffin
- * <p/>
- * peter.getFqn().getParent().equals(stewie.getFqn().getParent()); // will return true
- * <p/>
- * </pre>
- * <p/>
- * For more information, please read the JBoss Cache user guide and tutorial, available on <a href="http://labs.jboss.com/portal/jbosscache/docs/index.html" target="_BLANK">the JBoss Cache documentation site</a>,
- * and look through the examples <a href="http://labs.jboss.com/portal/jbosscache/download/index.html" target="_BLANK">shipped with the JBoss Cache distribution</a>.
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @see Node
- * @see CacheFactory
- * @since 2.0.0
- */
- at ThreadSafe
-public interface Cache<K, V>
-{
- /**
- * Retrieves the configuration of this cache.
- *
- * @return the configuration.
- */
- Configuration getConfiguration();
-
- /**
- * Returns the root node of this cache.
- *
- * @return the root node
- */
- Node<K, V> getRoot();
-
- /**
- * Adds a {@link org.jboss.cache.notifications.annotation.CacheListener}-annotated object to the entire cache. The object passed in needs to be properly annotated with the
- * {@link org.jboss.cache.notifications.annotation.CacheListener} annotation otherwise an {@link org.jboss.cache.notifications.IncorrectCacheListenerException} will be thrown.
- *
- * @param listener listener to add
- */
- void addCacheListener(Object listener);
-
- /**
- * Removes a {@link org.jboss.cache.notifications.annotation.CacheListener}-annotated object from the cache. The object passed in needs to be properly annotated with the
- * {@link org.jboss.cache.notifications.annotation.CacheListener} annotation otherwise an {@link org.jboss.cache.notifications.IncorrectCacheListenerException} will be thrown.
- *
- * @param listener listener to remove
- */
- void removeCacheListener(Object listener);
-
- /**
- * Retrieves an immutable {@link List} of objects annotated as {@link org.jboss.cache.notifications.annotation.CacheListener}s attached to the cache.
- *
- * @return an immutable {@link List} of objects annotated as {@link org.jboss.cache.notifications.annotation.CacheListener}s attached to the cache.
- */
- Set<Object> getCacheListeners();
-
- /**
- * Associates the specified value with the specified key for a {@link Node} in this cache.
- * If the {@link Node} previously contained a mapping for this key, the old value is replaced by the specified value.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
- * @param key key with which the specified value is to be associated.
- * @param value value to be associated with the specified key.
- * @return previous value associated with specified key, or <code>null</code> if there was no mapping for key.
- * A <code>null</code> return can also indicate that the Node previously associated <code>null</code> with the specified key, if the implementation supports null values.
- * @throws IllegalStateException if the cache is not in a started state.
- */
- V put(Fqn fqn, K key, V value);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #put(Fqn, Object, Object)}
- *
- * @param fqn String representation of the Fqn
- * @param key key with which the specified value is to be associated.
- * @param value value to be associated with the specified key.
- * @return previous value associated with specified key, or <code>null</code> if there was no mapping for key.
- * A <code>null</code> return can also indicate that the Node previously associated <code>null</code> with the specified key, if the implementation supports null values.
- * @throws IllegalStateException if the cache is not in a started state
- */
-
- V put(String fqn, K key, V value);
-
- /**
- * Under special operating behavior, associates the value with the specified key for a node identified by the Fqn passed in.
- * <ul>
- * <li> Only goes through if the node specified does not exist; no-op otherwise.</i>
- * <li> Force asynchronous mode for replication to prevent any blocking.</li>
- * <li> invalidation does not take place. </li>
- * <li> 0ms lock timeout to prevent any blocking here either. If the lock is not acquired, this method is a no-op, and swallows the timeout exception.</li>
- * <li> Ongoing transactions are suspended before this call, so failures here will not affect any ongoing transactions.</li>
- * <li> Errors and exceptions are 'silent' - logged at a much lower level than normal, and this method does not throw exceptions</li>
- * </ul>
- * This method is for caching data that has an external representation in storage, where, concurrent modification and
- * transactions are not a consideration, and failure to put the data in the cache should be treated as a 'suboptimal outcome'
- * rather than a 'failing outcome'.
- * <p/>
- * An example of when this method is useful is when data is read from, for example, a legacy datastore, and is cached before
- * returning the data to the caller. Subsequent calls would prefer to get the data from the cache and if the data doesn't exist
- * in the cache, fetch again from the legacy datastore.
- * <p/>
- * See <a href="http://jira.jboss.com/jira/browse/JBCACHE-848">JBCACHE-848</a> for details around this feature.
- * <p/>
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
- * @param key key with which the specified value is to be associated.
- * @param value value to be associated with the specified key.
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link org.jboss.cache.CacheStatus#STARTED}.
- */
- void putForExternalRead(Fqn fqn, K key, V value);
-
- /**
- * Copies all of the mappings from the specified map to a {@link Node}.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to copy the data to
- * @param data mappings to copy
- * @throws IllegalStateException if the cache is not in a started state
- */
- void put(Fqn fqn, Map<? extends K, ? extends V> data);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #put(Fqn, java.util.Map)}
- *
- * @param fqn String representation of the Fqn
- * @param data data map to insert
- * @throws IllegalStateException if the cache is not in a started state
- */
- void put(String fqn, Map<? extends K, ? extends V> data);
-
- /**
- * Removes the mapping for this key from a Node.
- * Returns the value to which the Node previously associated the key, or
- * <code>null</code> if the Node contained no mapping for this key.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
- * @param key key whose mapping is to be removed from the Node
- * @return previous value associated with specified Node's key
- * @throws IllegalStateException if the cache is not in a started state
- */
- V remove(Fqn fqn, K key);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #remove(Fqn, Object)}
- *
- * @param fqn string representation of the Fqn to retrieve
- * @param key key to remove
- * @return old value removed, or null if the fqn does not exist
- * @throws IllegalStateException if the cache is not in a started state
- */
- V remove(String fqn, K key);
-
- /**
- * Removes a {@link Node} indicated by absolute {@link Fqn}.
- *
- * @param fqn {@link Node} to remove
- * @return true if the node was removed, false if the node was not found
- * @throws IllegalStateException if the cache is not in a started state
- */
- boolean removeNode(Fqn fqn);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #removeNode(Fqn)}
- *
- * @param fqn string representation of the Fqn to retrieve
- * @return true if the node was found and removed, false otherwise
- * @throws IllegalStateException if the cache is not in a started state
- */
- boolean removeNode(String fqn);
-
- /**
- * A convenience method to retrieve a node directly from the cache. Equivalent to calling cache.getRoot().getChild(fqn).
- *
- * @param fqn fqn of the node to retrieve
- * @return a Node object, or a null if the node does not exist.
- * @throws IllegalStateException if the cache is not in a started state
- */
- Node<K, V> getNode(Fqn fqn);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #getNode(Fqn)}
- *
- * @param fqn string representation of the Fqn to retrieve
- * @return node, or null if the node does not exist
- * @throws IllegalStateException if the cache is not in a started state
- */
- Node<K, V> getNode(String fqn);
-
-
- /**
- * Convenience method that allows for direct access to the data in a {@link Node}.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be accessed.
- * @param key key under which value is to be retrieved.
- * @return returns data held under specified key in {@link Node} denoted by specified Fqn.
- * @throws IllegalStateException if the cache is not in a started state
- */
- V get(Fqn fqn, K key);
-
- /**
- * Convenience method that takes a string representation of an Fqn. Otherwise identical to {@link #get(Fqn, Object)}
- *
- * @param fqn string representation of the Fqn to retrieve
- * @param key key to fetch
- * @return value, or null if the fqn does not exist.
- * @throws IllegalStateException if the cache is not in a started state
- */
- V get(String fqn, K key);
-
- /**
- * Eviction call that evicts the specified {@link Node} from memory.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be evicted.
- * @param recursive evicts children as well
- * @throws IllegalStateException if the cache is not in a started state
- */
- void evict(Fqn fqn, boolean recursive);
-
- /**
- * Eviction call that evicts the specified {@link Node} from memory. Not recursive.
- *
- * @param fqn <b><i>absolute</i></b> {@link Fqn} to the {@link Node} to be evicted.
- * @throws IllegalStateException if the cache is not in a started state
- */
- void evict(Fqn fqn);
-
- /**
- * Retrieves a {@link Region} for a given {@link Fqn}. If the region does not exist,
- * and <li>createIfAbsent</li> is true, then one is created.
- * <p/>
- * If not, parent Fqns will be consulted in turn for registered regions, gradually working up to
- * Fqn.ROOT. If no regions are defined in any of the parents either, a null is returned.
- *
- * @param fqn Fqn that is contained in a region.
- * @param createIfAbsent If true, will create a new associated region if not found.
- * @return a MarshRegion. Null if none is found.
- * @see Region
- */
- Region getRegion(Fqn fqn, boolean createIfAbsent);
-
- /**
- * Removes a region denoted by the Fqn passed in.
- *
- * @param fqn of the region to remove
- * @return true if a region did exist and was removed; false otherwise.
- */
- boolean removeRegion(Fqn fqn);
-
- /**
- * Lifecycle method that initializes configuration state, the root node, etc.
- *
- * @throws CacheException if there are creation problems
- */
- void create() throws CacheException;
-
- /**
- * Lifecycle method that starts the cache loader,
- * starts cache replication, starts the region manager, etc., and (if configured) warms the cache using a
- * state transfer or cache loader preload.
- *
- * @throws CacheException if there are startup problems
- */
- void start() throws CacheException;
-
- /**
- * Lifecycle method that stops the cache, including replication,
- * clustering, cache loading, notifications, etc., and clears all cache in-memory state.
- * <p/>
- * State can be reconstituted by using either a cache loader or state transfer when the cache starts again.
- */
- void stop();
-
- /**
- * Lifecycle method that destroys the cache and removes any interceptors/configuration elements.
- * Cache can then be restarted (potentially after reconfiguring) using {@link #create()} and {@link #start()}.
- */
- void destroy();
-
- /**
- * Gets where the cache currently is its lifecycle transitions.
- *
- * @return the CacheStatus. Will not return <code>null</code>.
- */
- CacheStatus getCacheStatus();
-
- /**
- * @return the current invocation context for the current invocation and cache instance.
- * @throws IllegalStateException if the cache has been destroyed.
- */
- InvocationContext getInvocationContext();
-
- /**
- * Sets the passed in {@link org.jboss.cache.InvocationContext} as current.
- *
- * @param ctx invocation context to use
- * @throws IllegalStateException if the cache has been destroyed.
- */
- void setInvocationContext(InvocationContext ctx);
-
- /**
- * Returns the local address of this cache in a cluster, or <code>null</code>
- * if running in local mode.
- *
- * @return the local address of this cache in a cluster, or <code>null</code>
- * if running in local mode.
- */
- Address getLocalAddress();
-
- /**
- * Returns a list of members in the cluster, or <code>null</code>
- * if running in local mode.
- *
- * @return a {@link List} of members in the cluster, or <code>null</code>
- * if running in local mode.
- */
- List<Address> getMembers();
-
- /**
- * Moves a part of the cache to a different subtree.
- * <p/>
- * E.g.:
- * <p/>
- * assume a cache structure such as:
- * <p/>
- * <pre>
- * /a/b/c
- * /a/b/d
- * /a/b/e
- * <p/>
- * <p/>
- * Fqn f1 = Fqn.fromString("/a/b/c");
- * Fqn f2 = Fqn.fromString("/a/b/d");
- * <p/>
- * cache.move(f1, f2);
- * </pre>
- * <p/>
- * Will result in:
- * <pre>
- * <p/>
- * /a/b/d/c
- * /a/b/e
- * <p/>
- * </pre>
- * <p/>
- * and now
- * <p/>
- * <pre>
- * Fqn f3 = Fqn.fromString("/a/b/e");
- * Fqn f4 = Fqn.fromString("/a");
- * cache.move(f3, f4);
- * </pre>
- * <p/>
- * will result in:
- * <pre>
- * /a/b/d/c
- * /a/e
- * </pre>
- * No-op if the node to be moved is the root node.
- * <p/>
- * <b>Note</b>: As of 3.0.0 and when using MVCC locking, more specific behaviour is defined as follows:
- * <ul>
- * <li>A no-op if the node is moved unto itself. E.g., <tt>move(fqn, fqn.getParent())</tt> will not do anything.</li>
- * <li>If a target node does not exist it will be created silently, to be more consistent with other APIs such as <tt>put()</tt> on a nonexistent node.</li>
- * <li>If the source node does not exist this is a no-op, to be more consistent with other APIs such as <tt>get()</tt> on a nonexistent node.</li>
- * </ul>
- *
- * @param nodeToMove the Fqn of the node to move.
- * @param newParent new location under which to attach the node being moved.
- * @throws NodeNotExistsException may throw one of these if the target node does not exist or if a different thread has moved this node elsewhere already.
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- void move(Fqn nodeToMove, Fqn newParent) throws NodeNotExistsException;
-
- /**
- * Convenience method that takes in string representations of Fqns. Otherwise identical to {@link #move(Fqn, Fqn)}
- *
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- void move(String nodeToMove, String newParent) throws NodeNotExistsException;
-
- /**
- * Returns the version of the cache as a string.
- *
- * @return the version string of the cache.
- * @see Version#printVersion
- */
- String getVersion();
-
- /**
- * Retrieves a defensively copied data map of the underlying node. A convenience method to retrieving a node and
- * getting data from the node directly.
- *
- * @param fqn
- * @return map of data, or an empty map
- * @throws CacheException
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- Map<K, V> getData(Fqn fqn);
-
- /**
- * Convenience method that takes in a String represenation of the Fqn. Otherwise identical to {@link #getKeys(Fqn)}.
- */
- Set<K> getKeys(String fqn);
-
- /**
- * Returns a set of attribute keys for the Fqn.
- * Returns null if the node is not found, otherwise a Set.
- * The set is a copy of the actual keys for this node.
- * <p/>
- * A convenience method to retrieving a node and
- * getting keys from the node directly.
- *
- * @param fqn name of the node
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- Set<K> getKeys(Fqn fqn);
-
- /**
- * Convenience method that takes in a String represenation of the Fqn. Otherwise identical to {@link #clearData(Fqn)}.
- *
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- void clearData(String fqn);
-
- /**
- * Removes the keys and properties from a named node.
- * <p/>
- * A convenience method to retrieving a node and
- * getting keys from the node directly.
- *
- * @param fqn name of the node
- * @throws IllegalStateException if {@link #getCacheStatus()} would not return {@link CacheStatus#STARTED}.
- */
- void clearData(Fqn fqn);
-
- /**
- * Starts a batch. This is a lightweight batching mechanism that groups cache writes together and finally performs the
- * write, persistence and/or replication when {@link #endBatch(boolean)} is called rather than for each invocation on the
- * cache.
- * <p/>
- * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
- * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
- * <p/>
- *
- * @see #endBatch(boolean)
- * @since 3.0
- */
- void startBatch();
-
- /**
- * Ends an existing ongoing batch. A no-op if a batch has not been started yet.
- * <p/>
- * Note that if there is an existing transaction in scope and the cache has been configured to use a JTA compliant
- * transaction manager, calls to {@link #startBatch()} and {@link #endBatch(boolean)} are ignored and treated as no-ops.
- * <p/>
- *
- * @param successful if <tt>true</tt>, changes made in the batch are committed. If <tt>false</tt>, they are discarded.
- * @see #startBatch()
- * @since 3.0
- */
- void endBatch(boolean successful);
-}
Deleted: core/branches/flat/src/main/java/org/jboss/cache/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/CacheSPI.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/cache/CacheSPI.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -1,354 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.cache;
-
-import net.jcip.annotations.ThreadSafe;
-import org.jboss.cache.buddyreplication.BuddyManager;
-import org.jboss.cache.buddyreplication.GravitateResult;
-import org.jboss.cache.factories.ComponentRegistry;
-import org.jboss.cache.interceptors.base.CommandInterceptor;
-import org.jboss.cache.loader.CacheLoader;
-import org.jboss.cache.loader.CacheLoaderManager;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.notifications.Notifier;
-import org.jboss.cache.statetransfer.StateTransferManager;
-import org.jboss.cache.transaction.GlobalTransaction;
-import org.jboss.cache.transaction.TransactionTable;
-
-import javax.transaction.Transaction;
-import javax.transaction.TransactionManager;
-import java.util.List;
-import java.util.Set;
-
-/**
- * A more detailed interface to {@link Cache}, which is used when writing plugins for or extending JBoss Cache. A reference
- * to this interface should only be obtained when it is passed in to your code, for example when you write an
- * {@link org.jboss.cache.interceptors.base.CommandInterceptor} or {@link CacheLoader}.
- * <p/>
- * <B><I>You should NEVER attempt to directly cast a {@link Cache} instance to this interface. In future, the implementation may not allow it.</I></B>
- * <p/>
- * This interface contains overridden method signatures of some methods from {@link Cache}, overridden to ensure return
- * types of {@link Node} are replaced with {@link NodeSPI}.
- * <p/>
- *
- * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
- * @see NodeSPI
- * @see Cache
- * @see org.jboss.cache.loader.CacheLoader
- * @since 2.0.0
- */
- at ThreadSafe
-public interface CacheSPI<K, V> extends Cache<K, V>
-{
- /**
- * Overrides {@link org.jboss.cache.Cache#getRoot()} to return a {@link org.jboss.cache.NodeSPI} instead of a {@link org.jboss.cache.Node}.
- */
- NodeSPI<K, V> getRoot();
-
- /**
- * Overrides {@link Cache#getNode(String)} to return a {@link org.jboss.cache.NodeSPI} instead of a {@link org.jboss.cache.Node}.
- *
- * @param s string representation of an Fqn
- * @return a NodeSPI
- */
- NodeSPI<K, V> getNode(String s);
-
- /**
- * Overrides {@link Cache#getNode(Fqn)} to return a {@link org.jboss.cache.NodeSPI} instead of a {@link org.jboss.cache.Node}.
- *
- * @param f an Fqn
- * @return a NodeSPI
- */
- NodeSPI<K, V> getNode(Fqn f);
-
-
- /**
- * Retrieves a reference to a running {@link javax.transaction.TransactionManager}, if one is configured.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return a TransactionManager
- */
- TransactionManager getTransactionManager();
-
- /**
- * Retrieves the current Interceptor chain.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return an immutable {@link List} of {@link org.jboss.cache.interceptors.base.CommandInterceptor}s configured for this cache, or
- * <code>null</code> if {@link Cache#create() create()} has not been invoked
- * and the interceptors thus do not exist.
- */
- List<CommandInterceptor> getInterceptorChain();
-
- /**
- * Retrieves an instance of a {@link Marshaller}, which is capable of
- * converting Java objects to bytestreams and back in an efficient manner, which is
- * also interoperable with bytestreams produced/consumed by other versions of JBoss
- * Cache.
- * <p/>
- * The use of this marshaller is the <b>recommended</b> way of creating efficient,
- * compatible, byte streams from objects.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return an instance of {@link Marshaller}
- */
- Marshaller getMarshaller();
-
- /**
- * Adds a custom interceptor to the interceptor chain, at specified position, where the first interceptor in the chain
- * is at position 0 and the last one at getInterceptorChain().size() - 1.
- *
- * @param i the interceptor to add
- * @param position the position to add the interceptor
- */
- void addInterceptor(CommandInterceptor i, int position);
-
- /**
- * Adds a custom interceptor to the interceptor chain, after an instance of the specified interceptor type. Throws a
- * cache exception if it cannot find an interceptor of the specified type.
- *
- * @param i interceptor to add
- * @param afterInterceptor interceptor type after which to place custom interceptor
- */
- void addInterceptor(CommandInterceptor i, Class<? extends CommandInterceptor> afterInterceptor);
-
- /**
- * Removes the interceptor at a specified position, where the first interceptor in the chain
- * is at position 0 and the last one at getInterceptorChain().size() - 1.
- *
- * @param position the position at which to remove an interceptor
- */
- void removeInterceptor(int position);
-
- /**
- * Removes the interceptor of specified type.
- *
- * @param interceptorType type of interceptor to remove
- */
- void removeInterceptor(Class<? extends CommandInterceptor> interceptorType);
-
- /**
- * Retrieves the current CacheCacheLoaderManager instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return Retrieves a reference to the currently configured {@link org.jboss.cache.loader.CacheLoaderManager} if one or more cache loaders are configured, null otherwise.
- */
- CacheLoaderManager getCacheLoaderManager();
-
- /**
- * Retrieves the current BuddyManager instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return an instance of {@link BuddyManager} if buddy replication is enabled, null otherwise.
- */
- BuddyManager getBuddyManager();
-
- /**
- * Retrieves the current TransactionTable instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return the current {@link TransactionTable}
- */
- TransactionTable getTransactionTable();
-
- /**
- * Gets a handle of the RPC manager.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return the {@link org.jboss.cache.RPCManager} configured.
- */
- RPCManager getRPCManager();
-
- /**
- * Retrieves the current StateTransferManager instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return the current {@link org.jboss.cache.statetransfer.StateTransferManager}
- */
- StateTransferManager getStateTransferManager();
-
- /**
- * Retrieves the current RegionManager instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return the {@link RegionManager}
- */
- RegionManager getRegionManager();
-
-
- /**
- * Retrieves the current Notifier instance associated with the current Cache instance.
- * <p/>
- * From 2.1.0, Interceptor authors should obtain this by injection rather than this method. See the
- * {@link org.jboss.cache.factories.annotations.Inject} annotation.
- *
- * @return the notifier attached with this instance of the cache. See {@link org.jboss.cache.notifications.Notifier}, a class
- * that is responsible for emitting notifications to registered CacheListeners.
- */
- Notifier getNotifier();
-
- /**
- * @return the name of the cluster. Null if running in local mode.
- */
- String getClusterName();
-
- /**
- * @return the number of attributes in the cache.
- */
- int getNumberOfAttributes();
-
- /**
- * @return the number of nodes in the cache.
- */
- int getNumberOfNodes();
-
- /**
- * Returns the global transaction for this local transaction.
- * Optionally creates a new global transaction if it does not exist.
- *
- * @param tx the current transaction
- * @param createIfNotExists if true creates a new transaction if none exists
- * @return a GlobalTransaction
- */
- GlobalTransaction getCurrentTransaction(Transaction tx, boolean createIfNotExists);
-
- /**
- * Returns the transaction associated with the current thread.
- * If a local transaction exists, but doesn't yet have a mapping to a
- * GlobalTransaction, a new GlobalTransaction will be created and mapped to
- * the local transaction. Note that if a local transaction exists, but is
- * not ACTIVE or PREPARING, null is returned.
- *
- * @return A GlobalTransaction, or null if no (local) transaction was associated with the current thread
- */
- GlobalTransaction getCurrentTransaction();
-
- /**
- * Returns a node without accessing the interceptor chain. Does not return any nodes marked as invalid. Note that this call works
- * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
- * loading or activation does not take place, and so the results of this call should not be treated as definitive. Concurrent node
- * removal, passivation, etc. may affect the results of this call.
- *
- * @param fqn the Fqn to look up.
- * @param includeDeletedNodes if you intend to see nodes marked as deleted within the current tx, set this to true
- * @return a node if one exists or null
- */
- NodeSPI<K, V> peek(Fqn fqn, boolean includeDeletedNodes);
-
- /**
- * Returns a node without accessing the interceptor chain, optionally returning nodes that are marked as invalid ({@link org.jboss.cache.Node#isValid()} == false).
- * Note that this call works
- * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
- * loading or activation does not take place, and so the results of this call should not be treated as definitive. Concurrent node
- * removal, passivation, etc. may affect the results of this call.
- *
- * @param fqn the Fqn to look up.
- * @param includeDeletedNodes if you intend to see nodes marked as deleted within the current tx, set this to true
- * @param includeInvalidNodes if true, nodes marked as being invalid are also returned.
- * @return a node if one exists or null
- */
- NodeSPI<K, V> peek(Fqn fqn, boolean includeDeletedNodes, boolean includeInvalidNodes);
-
- /**
- * Used with buddy replication's data gravitation interceptor. If marshalling is necessary, ensure that the cache is
- * configured to use {@link org.jboss.cache.config.Configuration#useRegionBasedMarshalling} and the {@link org.jboss.cache.Region}
- * pertaining to the Fqn passed in is activated, and has an appropriate ClassLoader.
- *
- * @param fqn the fqn to gravitate
- * @param searchBuddyBackupSubtrees if true, buddy backup subtrees are searched and if false, they are not.
- * @param ctx
- * @return a GravitateResult which contains the data for the gravitation
- */
- GravitateResult gravitateData(Fqn fqn, boolean searchBuddyBackupSubtrees, InvocationContext ctx);
-
- /**
- * Returns a Set<Fqn> of Fqns of the topmost node of internal regions that
- * should not included in standard state transfers. Will include
- * {@link BuddyManager#BUDDY_BACKUP_SUBTREE} if buddy replication is
- * enabled.
- *
- * @return an unmodifiable Set<Fqn>. Will not return <code>null</code>.
- */
- Set<Fqn> getInternalFqns();
-
- int getNumberOfLocksHeld();
-
- /**
- * Helper method that does a peek and ensures that the result of the peek is not null. Note that this call works
- * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
- * loading or activation does not take place, and so the results of this call should not be treated as definitive.
- *
- * @param fqn Fqn to peek
- * @return true if the peek returns a non-null value.
- */
- boolean exists(Fqn fqn);
-
- /**
- * A convenience method that takes a String representation of an Fqn. Otherwise identical to {@link #exists(Fqn)}.
- * Note that this call works
- * directly on the cache data structure and will not pass through the interceptor chain. Hence node locking, cache
- * loading or activation does not take place, and so the results of this call should not be treated as definitive.
- */
- boolean exists(String fqn);
-
- /**
- * Returns all children of a given node. Returns an empty set if there are no children.
- * The set is unmodifiable.
- *
- * @param fqn The fully qualified name of the node
- * @return Set an unmodifiable set of children names, Object.
- */
- Set<Object> getChildrenNames(Fqn fqn);
-
- /**
- * Convenience method that takes a String representation of an Fqn. Otherwise identical to {@link #getChildrenNames(Fqn)}
- *
- * @param fqn as a string
- * @return Set an unmodifiable set of children names, Object.
- */
- Set<String> getChildrenNames(String fqn);
-
- /**
- * Returns the component registry associated with this cache instance.
- *
- * @see org.jboss.cache.factories.ComponentRegistry
- */
- ComponentRegistry getComponentRegistry();
-}
Modified: core/branches/flat/src/main/java/org/jboss/cache/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/cache/RPCManagerImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -46,7 +46,7 @@
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.factories.annotations.Stop;
-import org.jboss.starobrno.notifier.Notifier;
+import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.TransactionTable;
import org.jgroups.Address;
import org.jgroups.Channel;
Modified: core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLock.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -22,8 +22,8 @@
package org.jboss.cache.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
-import org.jboss.cache.invocation.InvocationContextContainer;
-import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.transaction.GlobalTransaction;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
Modified: core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/cache/util/concurrent/locks/OwnableReentrantLockContainer.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -22,7 +22,7 @@
package org.jboss.cache.util.concurrent.locks;
import net.jcip.annotations.ThreadSafe;
-import org.jboss.cache.invocation.InvocationContextContainer;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
import java.util.Arrays;
@@ -40,6 +40,11 @@
OwnableReentrantLock[] sharedLocks;
InvocationContextContainer icc;
+ public OwnableReentrantLockContainer(int concurrencyLevel, Object dummy)
+ {
+ throw new UnsupportedOperationException("Barf"); // todo remove this
+ }
+
/**
* Creates a new LockContainer which uses a certain number of shared locks across all elements that need to be locked.
*
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -44,7 +44,7 @@
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.notifier.Notifier;
+import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/CommandsFactoryImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,19 +21,25 @@
*/
package org.jboss.starobrno.commands;
-import org.jboss.starobrno.commands.write.*;
+import org.jboss.starobrno.commands.read.GetKeyValueCommand;
import org.jboss.starobrno.commands.read.SizeCommand;
-import org.jboss.starobrno.commands.read.GetKeyValueCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
import org.jboss.starobrno.commands.tx.RollbackCommand;
-import org.jboss.starobrno.commands.tx.CommitCommand;
-import org.jboss.starobrno.DataContainer;
+import org.jboss.starobrno.commands.write.ClearCommand;
+import org.jboss.starobrno.commands.write.EvictCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.PutMapCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.GlobalTransaction;
-import org.jboss.starobrno.notifier.Notifier;
import org.jgroups.Address;
+import java.util.List;
import java.util.Map;
-import java.util.List;
/**
* @author Mircea.Markus at jboss.com
@@ -43,6 +49,13 @@
private DataContainer dataContainer;
private Notifier notifier;
+ @Inject
+ private void setupDependencies(DataContainer container, Notifier notifier)
+ {
+ this.dataContainer = container;
+ this.notifier = notifier;
+ }
+
public PutKeyValueCommand buildPutKeyValueCommand(Object key, Object value)
{
return new PutKeyValueCommand(key, value, false);
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GetKeyValueCommand.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -26,7 +26,7 @@
import org.jboss.starobrno.commands.Visitor;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.notifier.Notifier;
+import org.jboss.starobrno.notifications.Notifier;
/**
* // TODO: MANIK: Document this
@@ -70,7 +70,7 @@
Object result = entry.getValue();
if (trace) log.trace("Found value " + result);
// if (sendNodeEvent) notifier.notifyNodeVisited(fqn, false, ctx);
- return result;
+ return result;
}
public int getCommandId()
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GravitateDataCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GravitateDataCommand.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/GravitateDataCommand.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -25,8 +25,8 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.starobrno.CacheSPI;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import org.jgroups.Address;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/SizeCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/SizeCommand.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/read/SizeCommand.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,9 +21,9 @@
*/
package org.jboss.starobrno.commands.read;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.Visitor;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
/**
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/ClusteredGetCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/ClusteredGetCommand.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/ClusteredGetCommand.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -23,9 +23,9 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.DataCommand;
import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.interceptors.InterceptorChain;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/DataGravitationCleanupCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/DataGravitationCleanupCommand.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/remote/DataGravitationCleanupCommand.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -27,8 +27,8 @@
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.commands.CommandsFactory;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.transaction.GlobalTransaction;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/DataContainer.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/DataContainer.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,48 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import java.util.Map.Entry;
+import java.util.Set;
+
+/**
+ * // TODO: MANIK: Document this
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public interface DataContainer<K, V>
+{
+ Set<Entry<K, V>> getEntries();
+
+ Entry<K, V> getEntry(K k);
+
+ void putEntry(Entry<K, V> entry);
+
+ boolean exists(Entry<K, V> entry);
+
+ int size();
+
+ void clear();
+
+ void removeEntry(K key);
+}
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/UnsortedDataContainer.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/container/UnsortedDataContainer.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,78 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.container;
+
+import java.util.Map.Entry;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
+/**
+ * // TODO: crappy and inefficient - but just a placeholder for now.
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+public class UnsortedDataContainer<K, V> implements DataContainer<K, V>
+{
+ private final ConcurrentMap<K, V> data = new ConcurrentHashMap<K, V>();
+
+ public Set<Entry<K, V>> getEntries()
+ {
+ return data.entrySet();
+ }
+
+ public Entry<K, V> getEntry(K k)
+ {
+ if (k == null) throw new NullPointerException("I don't like nulls!");
+ for (Entry<K, V> e : data.entrySet())
+ {
+ if (k.equals(e.getKey())) return e;
+ }
+ return null;
+ }
+
+ public void putEntry(Entry<K, V> kvEntry)
+ {
+ data.put(kvEntry.getKey(), kvEntry.getValue());
+ }
+
+ public boolean exists(Entry<K, V> kvEntry)
+ {
+ return data.containsKey(kvEntry.getKey());
+ }
+
+ public int size()
+ {
+ return data.size();
+ }
+
+ public void clear()
+ {
+ data.clear();
+ }
+
+ public void removeEntry(K key)
+ {
+ data.remove(key);
+ }
+}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContext.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContext.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContext.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,9 +21,9 @@
*/
package org.jboss.starobrno.context;
-import org.jboss.starobrno.OrderedSynchronizationHandler;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
import javax.transaction.Transaction;
import java.util.List;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/context/TransactionContextImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -25,7 +25,7 @@
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.config.Option;
import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.OrderedSynchronizationHandler;
+import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
import javax.transaction.RollbackException;
import javax.transaction.SystemException;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/ComponentRegistry.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -196,6 +196,8 @@
s.add(StateTransferManagerFactory.class);
s.add(StateTransferFactory.class);
s.add(NullComponentFactory.class);
+ s.add(LockManagerFactory.class);
+ s.add(DataContainerFactory.class);
return s;
}
Added: core/branches/flat/src/main/java/org/jboss/starobrno/factories/DataContainerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/DataContainerFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/DataContainerFactory.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.factories;
+
+import org.jboss.starobrno.container.DataContainer;
+import org.jboss.starobrno.container.UnsortedDataContainer;
+import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
+
+/**
+ * // TODO: MANIK: Document this
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+ at DefaultFactoryFor(classes = DataContainer.class)
+public class DataContainerFactory extends ComponentFactory
+{
+ protected <T> T construct(Class<T> componentType)
+ {
+ return (T) new UnsortedDataContainer();
+ }
+}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -30,15 +30,14 @@
import org.jboss.cache.lock.LockStrategyFactory;
import org.jboss.cache.marshall.Marshaller;
import org.jboss.cache.marshall.VersionAwareMarshaller;
-import org.jboss.cache.mvcc.MVCCNodeHelper;
import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
import org.jboss.starobrno.factories.context.ContextFactory;
import org.jboss.starobrno.invocation.InvocationContextContainer;
-import org.jboss.starobrno.notifier.Notifier;
+import org.jboss.starobrno.mvcc.MVCCEntryWrapper;
+import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.transaction.TransactionTable;
/**
@@ -47,9 +46,9 @@
* @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
* @since 2.1.0
*/
- at DefaultFactoryFor(classes = {Notifier.class, MVCCNodeHelper.class, RegionRegistry.class,
+ at DefaultFactoryFor(classes = {Notifier.class, RegionRegistry.class,
ChannelMessageListener.class, CacheLoaderManager.class, Marshaller.class, InvocationContextContainer.class,
- CacheInvocationDelegate.class, TransactionTable.class, DataContainer.class,
+ CacheInvocationDelegate.class, TransactionTable.class, MVCCEntryWrapper.class,
LockStrategyFactory.class, BuddyFqnTransformer.class, BatchContainer.class,
ContextFactory.class, EntryFactory.class, CommandsFactory.class})
public class EmptyConstructorFactory extends ComponentFactory
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EntryFactoryImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -22,8 +22,8 @@
package org.jboss.starobrno.factories;
import org.jboss.cache.lock.IsolationLevel;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.mvcc.EntryImpl;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/InterceptorChainFactory.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -22,7 +22,6 @@
package org.jboss.starobrno.factories;
-import org.jboss.starobrno.TxInterceptor;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.CustomInterceptorConfig;
@@ -31,6 +30,7 @@
import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.interceptors.InvocationContextInterceptor;
import org.jboss.starobrno.interceptors.LockingInterceptor;
+import org.jboss.starobrno.interceptors.TxInterceptor;
import org.jboss.starobrno.interceptors.base.CommandInterceptor;
import java.util.List;
Added: core/branches/flat/src/main/java/org/jboss/starobrno/factories/LockManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/LockManagerFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/LockManagerFactory.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,41 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.factories;
+
+import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
+import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.lock.StripedLockManager;
+
+/**
+ * // TODO: MANIK: Document this
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 3.0
+ */
+ at DefaultFactoryFor(classes = LockManager.class)
+public class LockManagerFactory extends ComponentFactory
+{
+ protected <T> T construct(Class<T> componentType)
+ {
+ return (T) new StripedLockManager();
+ }
+}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,11 +21,11 @@
*/
package org.jboss.starobrno.factories;
-import org.jboss.cache.RPCManager;
import org.jboss.cache.util.BeanUtils;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.RuntimeConfig;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
+import org.jboss.starobrno.remoting.RPCManager;
import java.lang.reflect.Method;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -22,8 +22,6 @@
package org.jboss.starobrno.interceptors;
-import org.jboss.cache.transaction.TransactionTable;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
@@ -35,7 +33,9 @@
import org.jboss.starobrno.config.Option;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
import javax.transaction.SystemException;
import javax.transaction.Transaction;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/LockingInterceptor.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,7 +21,6 @@
*/
package org.jboss.starobrno.interceptors;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.read.GetKeyValueCommand;
import org.jboss.starobrno.commands.read.GravitateDataCommand;
@@ -35,6 +34,7 @@
import org.jboss.starobrno.commands.write.PutMapCommand;
import org.jboss.starobrno.commands.write.RemoveCommand;
import org.jboss.starobrno.commands.write.ReplaceCommand;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.interceptors.base.PrePostProcessingCommandInterceptor;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/TxInterceptor.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,1051 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.interceptors;
+
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.VisitableCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
+import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.commands.tx.RollbackCommand;
+import org.jboss.starobrno.config.Option;
+import org.jboss.starobrno.context.InvocationContext;
+import org.jboss.starobrno.context.TransactionContext;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.context.ContextFactory;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
+import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
+import org.jboss.starobrno.remoting.ReplicationException;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.OrderedSynchronizationHandler;
+import org.jboss.starobrno.transaction.TransactionTable;
+
+import javax.transaction.InvalidTransactionException;
+import javax.transaction.Status;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This interceptor is the new default at the head of all interceptor chains,
+ * and makes transactional attributes available to all interceptors in the chain.
+ * This interceptor is also responsible for registering for synchronisation on
+ * transaction completion.
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ * @author <a href="mailto:stevew at jofti.com">Steve Woodcock (stevew at jofti.com)</a>
+ */
+public class TxInterceptor extends BaseTransactionalContextInterceptor
+{
+ protected CommandsFactory commandsFactory;
+ protected RPCManager rpcManager;
+ private Notifier notifier;
+ private InvocationContextContainer invocationContextContainer;
+ private ComponentRegistry componentRegistry;
+ private ContextFactory contextFactory;
+
+ /**
+ * List <Transaction>that we have registered for
+ */
+ private final Set<Transaction> transactions = new ConcurrentHashSet<Transaction>();
+ private final Map<Transaction, GlobalTransaction> rollbackTransactions = new ConcurrentHashMap<Transaction, GlobalTransaction>(16);
+ private long prepares = 0;
+ private long commits = 0;
+ private long rollbacks = 0;
+ protected boolean optimistic = false;
+ private LockManager lockManager;
+ private boolean statsEnabled;
+
+ @Inject
+ public void intialize(RPCManager rpcManager, ContextFactory contextFactory,
+ Notifier notifier, InvocationContextContainer icc,
+ CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager)
+ {
+ this.contextFactory = contextFactory;
+ this.commandsFactory = factory;
+ this.rpcManager = rpcManager;
+ this.notifier = notifier;
+ this.invocationContextContainer = icc;
+ this.componentRegistry = componentRegistry;
+ this.lockManager = lockManager;
+ setStatisticsEnabled(configuration.isExposeManagementStatistics());
+ }
+
+ @Override
+ public Object visitPrepareCommand(InvocationContext ctx, PrepareCommand command) throws Throwable
+ {
+ Object result = null;
+
+ // this is a prepare, commit, or rollback.
+ if (log.isDebugEnabled()) log.debug("Got gtx from invocation context " + ctx.getGlobalTransaction());
+ try
+ {
+ if (ctx.getGlobalTransaction().isRemote())
+ {
+ result = handleRemotePrepare(ctx, command);
+ if (getStatisticsEnabled()) prepares++;
+ }
+ else
+ {
+ if (trace) log.trace("received my own message (discarding it)");
+ result = null;
+ }
+ }
+ catch (Throwable e)
+ {
+ ctx.throwIfNeeded(e);
+ }
+
+ return result;
+ }
+
+ @Override
+ public Object visitCommitCommand(InvocationContext ctx, CommitCommand command) throws Throwable
+ {
+ if (!ctx.getGlobalTransaction().isRemote())
+ {
+ if (trace) log.trace("received my own message (discarding it)");
+ return null;
+ }
+ try
+ {
+ if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ Transaction ltx = txTable.getLocalTransaction(gtx, true);
+ // disconnect if we have a current tx associated
+ Transaction currentTx = txManager.getTransaction();
+ boolean resumeCurrentTxOnCompletion = false;
+ try
+ {
+ if (!ltx.equals(currentTx))
+ {
+ currentTx = txManager.suspend();
+ resumeCurrentTxOnCompletion = true;
+ txManager.resume(ltx);
+ // make sure we set this in the ctx
+ ctx.setTransaction(ltx);
+ }
+ if (log.isDebugEnabled()) log.debug(" executing commit() with local TX " + ltx + " under global tx " + gtx);
+ txManager.commit();
+ if (getStatisticsEnabled()) commits++;
+ }
+ finally
+ {
+ //resume the old transaction if we suspended it
+ if (resumeCurrentTxOnCompletion)
+ {
+ resumeTransactionOnCompletion(ctx, currentTx);
+ }
+ // remove from local lists.
+ transactions.remove(ltx);
+ // this tx has completed. Clean up in the tx table.
+ txTable.remove(gtx, ltx);
+ }
+ if (log.isDebugEnabled()) log.debug("Finished remote rollback method for " + gtx);
+ }
+ catch (Throwable throwable)
+ {
+ ctx.throwIfNeeded(throwable);
+ }
+ return null;
+ }
+
+ @Override
+ public Object visitRollbackCommand(InvocationContext ctx, RollbackCommand command) throws Throwable
+ {
+ if (!ctx.getGlobalTransaction().isRemote())
+ {
+ if (trace) log.trace("received my own message (discarding it)");
+ return null;
+ }
+ try
+ {
+ if (trace) log.trace("(" + rpcManager.getLocalAddress() + ") call on command [" + command + "]");
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ Transaction ltx = txTable.getLocalTransaction(gtx);
+ if (ltx == null)
+ {
+ log.warn("No local transaction for this remotely originating rollback. Possibly rolling back before a prepare call was broadcast?");
+ txTable.remove(gtx);
+ return null;
+ }
+ // disconnect if we have a current tx associated
+ Transaction currentTx = txManager.getTransaction();
+ boolean resumeCurrentTxOnCompletion = false;
+ try
+ {
+ if (!ltx.equals(currentTx))
+ {
+ currentTx = txManager.suspend();
+ resumeCurrentTxOnCompletion = true;
+ txManager.resume(ltx);
+ // make sure we set this in the ctx
+ ctx.setTransaction(ltx);
+ }
+ if (log.isDebugEnabled()) log.debug("executing with local TX " + ltx + " under global tx " + gtx);
+ txManager.rollback();
+ if (getStatisticsEnabled()) rollbacks++;
+ }
+ finally
+ {
+ //resume the old transaction if we suspended it
+ if (resumeCurrentTxOnCompletion)
+ {
+ resumeTransactionOnCompletion(ctx, currentTx);
+ }
+
+ // remove from local lists.
+ transactions.remove(ltx);
+
+ // this tx has completed. Clean up in the tx table.
+ txTable.remove(gtx, ltx);
+ }
+ if (log.isDebugEnabled()) log.debug("Finished remote commit/rollback method for " + gtx);
+ }
+ catch (Throwable throwable)
+ {
+ ctx.throwIfNeeded(throwable);
+ }
+
+ return null;
+ }
+
+ @Override
+ public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ try
+ {
+ return attachGtxAndPassUpChain(ctx, command);
+ }
+ catch (Throwable throwable)
+ {
+ ctx.throwIfNeeded(throwable);
+ return null;
+ }
+ }
+
+ protected Object attachGtxAndPassUpChain(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ Transaction tx = ctx.getTransaction();
+ if (tx != null) attachGlobalTransaction(ctx, tx, command);
+ return invokeNextInterceptor(ctx, command);
+ }
+
+ // ------------------------------------------------------------------------
+ // JMX statistics
+ // ------------------------------------------------------------------------
+
+ // --------------------------------------------------------------
+
+ /**
+ * Handles a remotely originating prepare call, by creating a local transaction for the remote global transaction
+ * and replaying modifications in this new local transaction.
+ *
+ * @param ctx invocation context
+ * @param command prepare command
+ * @return result of the prepare, typically a null.
+ * @throws Throwable in the event of problems.
+ */
+ private Object handleRemotePrepare(InvocationContext ctx, PrepareCommand command) throws Throwable
+ {
+ // the InvocationContextInterceptor would have set this for us
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+
+ // Is there a local transaction associated with GTX? (not the current tx associated with the thread, which may be
+ // in the invocation context
+ Transaction ltx = txTable.getLocalTransaction(gtx);
+ Transaction currentTx = txManager.getTransaction();
+
+ Object retval = null;
+ boolean success = false;
+ try
+ {
+ if (ltx == null)
+ {
+ if (currentTx != null) txManager.suspend();
+ // create a new local transaction
+ ltx = createLocalTx();
+ // associate this with a global tx
+ txTable.put(ltx, gtx);
+ if (trace) log.trace("Created new tx for gtx " + gtx);
+
+ if (log.isDebugEnabled())
+ log.debug("Started new local tx as result of remote prepare: local tx=" + ltx + " (status=" + ltx.getStatus() + "), gtx=" + gtx);
+ }
+ else
+ {
+ //this should be valid
+ if (!TransactionTable.isValid(ltx))
+ throw new CacheException("Transaction " + ltx + " not in correct state to be prepared");
+
+ //associate this thread with the local transaction associated with the global transaction, IF the localTx is NOT the current tx.
+ if (currentTx == null || !ltx.equals(currentTx))
+ {
+ if (trace) log.trace("Suspending current tx " + currentTx);
+ txManager.suspend();
+ txManager.resume(ltx);
+ }
+ }
+ if (trace) log.trace("Resuming existing tx " + ltx + ", global tx=" + gtx);
+
+ // at this point we have a non-null ltx
+
+ // Asssociate the local TX with the global TX. Create new
+ // transactionContext for TX in txTable, the modifications
+ // below will need this transactionContext to add their modifications
+ // under the GlobalTx key
+ TransactionContext transactionContext = txTable.get(gtx);
+ if (transactionContext == null)
+ {
+ // create a new transaction transactionContext
+ if (log.isDebugEnabled()) log.debug("creating new tx transactionContext");
+ transactionContext = contextFactory.createTransactionContext(ltx);
+ txTable.put(gtx, transactionContext);
+ }
+
+ setTransactionalContext(ltx, gtx, transactionContext, ctx);
+
+ // register a sync handler for this tx.
+ registerHandler(ltx, new RemoteSynchronizationHandler(gtx, ltx, transactionContext), ctx);
+
+ success = false;
+
+ // replay modifications
+ replayModifications(ctx, ltx, command);
+
+ success = true; // no exceptions were thrown above!!
+
+ // now pass the prepare command up the chain as well.
+ if (command.isOnePhaseCommit())
+ {
+ if (trace)
+ log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
+ }
+ else
+ {
+ // now pass up the prepare method itself.
+ invokeNextInterceptor(ctx, command);
+ }
+ // JBCACHE-361 Confirm that the transaction is ACTIVE
+ assertTxIsStillValid(ltx);
+ }
+ finally
+ {
+ // if we are running a one-phase commit, perform a commit or rollback now.
+ if (trace) log.trace("Are we running a 1-phase commit? " + command.isOnePhaseCommit());
+
+ if (command.isOnePhaseCommit())
+ {
+ try
+ {
+ if (success)
+ {
+ ltx.commit();
+ }
+ else
+ {
+ ltx.rollback();
+ }
+ }
+ catch (Throwable t)
+ {
+ log.error("Commit/rollback failed.", t);
+ if (success)
+ {
+ // try another rollback...
+ try
+ {
+ log.info("Attempting anotehr rollback");
+ //invokeOnePhaseCommitMethod(globalTransaction, modifications.size() > 0, false);
+ ltx.rollback();
+ }
+ catch (Throwable t2)
+ {
+ log.error("Unable to rollback", t2);
+ }
+ }
+ }
+ finally
+ {
+ transactions.remove(ltx);// JBAS-298
+ }
+ }
+
+ txManager.suspend();// suspends ltx - could be null
+ // resume whatever else we had going.
+ if (currentTx != null) txManager.resume(currentTx);
+ if (log.isDebugEnabled()) log.debug("Finished remote prepare " + gtx);
+ }
+
+ return retval;
+ }
+
+ private ReplicableCommand attachGlobalTransaction(InvocationContext ctx, Transaction tx, VisitableCommand command) throws Throwable
+ {
+ if (trace)
+ {
+ log.trace(" local transaction exists - registering global tx if not present for " + Thread.currentThread());
+ }
+ if (trace)
+ {
+ GlobalTransaction tempGtx = txTable.get(tx);
+ log.trace("Associated gtx in txTable is " + tempGtx);
+ }
+
+ // register a sync handler for this tx - only if the globalTransaction is not remotely initiated.
+ GlobalTransaction gtx = registerTransaction(tx, ctx);
+ if (gtx == null)
+ {
+ // get the current globalTransaction from the txTable.
+ gtx = txTable.get(tx);
+ }
+
+ // make sure we attach this globalTransaction to the invocation context.
+ ctx.setGlobalTransaction(gtx);
+
+ return command;
+ }
+
+ /**
+ * Replays modifications
+ */
+ protected void replayModifications(InvocationContext ctx, Transaction ltx, PrepareCommand command) throws Throwable
+ {
+ try
+ {
+ // replay modifications
+ for (VisitableCommand modification : command.getModifications())
+ {
+ invokeNextInterceptor(ctx, modification);
+ assertTxIsStillValid(ltx);
+ }
+ }
+ catch (Throwable th)
+ {
+ log.error("prepare failed!", th);
+ throw th;
+ }
+ }
+
+ private void resumeTransactionOnCompletion(InvocationContext ctx, Transaction currentTx)
+ throws SystemException, InvalidTransactionException
+ {
+ if (trace) log.trace("Resuming suspended transaction " + currentTx);
+ txManager.suspend();
+ if (currentTx != null)
+ {
+ txManager.resume(currentTx);
+ ctx.setTransaction(currentTx);
+ }
+ }
+
+ /**
+ * Handles a commit or a rollback. Called by the synch handler. Simply tests that we are in the correct tx and
+ * passes the meth call up the interceptor chain.
+ *
+ * @throws Throwable
+ */
+ @SuppressWarnings("deprecation")
+ private Object handleCommitRollback(InvocationContext ctx, VisitableCommand command) throws Throwable
+ {
+ GlobalTransaction gtx = ctx.getGlobalTransaction();
+ Object result;
+ VisitableCommand originalCommand = ctx.getCommand();
+ ctx.setCommand(command);
+ try
+ {
+ result = invokeNextInterceptor(ctx, command);
+ }
+ finally
+ {
+ ctx.setCommand(originalCommand);
+ }
+ if (log.isDebugEnabled()) log.debug("Finished local commit/rollback method for " + gtx);
+ return result;
+ }
+
+ // --------------------------------------------------------------
+ // Transaction phase runners
+ // --------------------------------------------------------------
+
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+ {
+ return commandsFactory.buildPrepareCommand(gtx, modifications, rpcManager.getLocalAddress(), onePhaseCommit);
+ }
+
+ /**
+ * creates a commit()
+ */
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit)
+ {
+ try
+ {
+ VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
+
+ if (trace) log.trace("Running commit for " + gtx);
+
+ handleCommitRollback(ctx, commitCommand);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Commit failed. Clearing stale locks.");
+ try
+ {
+ cleanupStaleLocks(ctx);
+ }
+ catch (RuntimeException re)
+ {
+ log.error("Unable to clear stale locks", re);
+ throw re;
+ }
+ catch (Throwable e2)
+ {
+ log.error("Unable to clear stale locks", e2);
+ throw new RuntimeException(e2);
+ }
+ if (e instanceof RuntimeException)
+ throw (RuntimeException) e;
+ else
+ throw new RuntimeException("Commit failed.", e);
+ }
+ }
+
+ protected void cleanupStaleLocks(InvocationContext ctx) throws Throwable
+ {
+ TransactionContext transactionContext = ctx.getTransactionContext();
+ if (transactionContext != null) lockManager.unlock(ctx);
+ }
+
+ /**
+ * creates a rollback()
+ */
+ protected void runRollbackPhase(InvocationContext ctx, GlobalTransaction gtx, Transaction tx)
+ {
+ try
+ {
+ // JBCACHE-457
+ VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
+ if (trace) log.trace(" running rollback for " + gtx);
+
+ //JBCACHE-359 Store a lookup for the globalTransaction so a listener
+ // callback can find it
+ rollbackTransactions.put(tx, gtx);
+
+ handleCommitRollback(ctx, rollbackCommand);
+ }
+ catch (Throwable e)
+ {
+ log.warn("Rollback had a problem", e);
+ }
+ finally
+ {
+ rollbackTransactions.remove(tx);
+ }
+ }
+
+ private boolean isOnePhaseCommit()
+ {
+ if (!configuration.getCacheMode().isSynchronous())
+ {
+ // this is a REPL_ASYNC call - do 1-phase commit. break!
+ if (trace) log.trace("This is a REPL_ASYNC call (1 phase commit) - do nothing for beforeCompletion()");
+ return true;
+ }
+ return false;
+ }
+
+ /**
+ * Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
+ * method call and passes the prepare() call up the chain.
+ */
+ @SuppressWarnings("deprecation")
+ public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<VisitableCommand> modifications) throws Throwable
+ {
+ // running a 2-phase commit.
+ VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+
+ Object result;
+
+ // Is there a local transaction associated with GTX ?
+ Transaction ltx = ctx.getTransaction();
+
+ //if ltx is not null and it is already running
+ Transaction currentTransaction = txManager.getTransaction();
+ if (currentTransaction != null && ltx != null && currentTransaction.equals(ltx))
+ {
+ VisitableCommand originalCommand = ctx.getCommand();
+ ctx.setCommand(prepareCommand);
+ try
+ {
+ result = invokeNextInterceptor(ctx, prepareCommand);
+ }
+ finally
+ {
+ ctx.setCommand(originalCommand);
+ }
+ }
+ else
+ {
+ log.warn("Local transaction does not exist or does not match expected transaction " + gtx);
+ throw new CacheException(" local transaction " + ltx + " does not exist or does not match expected transaction " + gtx);
+ }
+ return result;
+ }
+
+ // --------------------------------------------------------------
+ // Private helper methods
+ // --------------------------------------------------------------
+
+ protected void assertTxIsStillValid(Transaction tx)
+ {
+ if (!TransactionTable.isActive(tx))
+ {
+ try
+ {
+ throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; is " + tx.getStatus());
+ }
+ catch (SystemException e)
+ {
+ throw new ReplicationException("prepare() failed -- local transaction status is not STATUS_ACTIVE; Unable to retrieve transaction status.");
+ }
+ }
+ }
+
+ /**
+ * Creates a gtx (if one doesnt exist), a sync handler, and registers the tx.
+ */
+ private GlobalTransaction registerTransaction(Transaction tx, InvocationContext ctx) throws Exception
+ {
+ GlobalTransaction gtx;
+
+ if (TransactionTable.isValid(tx) && transactions.add(tx))
+ {
+ gtx = txTable.getCurrentTransaction(tx, true);
+ TransactionContext transactionContext;
+ if (ctx.getGlobalTransaction() == null)
+ {
+ ctx.setGlobalTransaction(gtx);
+ transactionContext = txTable.get(gtx);
+ ctx.setTransactionContext(transactionContext);
+ }
+ else
+ {
+ transactionContext = ctx.getTransactionContext();
+ }
+ if (gtx.isRemote())
+ {
+ // should be no need to register a handler since this a remotely initiated globalTransaction
+ if (trace) log.trace("is a remotely initiated gtx so no need to register a tx for it");
+ }
+ else
+ {
+ if (trace) log.trace("Registering sync handler for tx " + tx + ", gtx " + gtx);
+
+ // see the comment in the LocalSyncHandler for the last isOriginLocal param.
+ LocalSynchronizationHandler myHandler = new LocalSynchronizationHandler(gtx, tx, transactionContext, !ctx.isOriginLocal());
+ registerHandler(tx, myHandler, ctx);
+ }
+ }
+ else if ((gtx = rollbackTransactions.get(tx)) != null)
+ {
+ if (trace) log.trace("Transaction " + tx + " is already registered and is rolling back.");
+ }
+ else
+ {
+ if (trace) log.trace("Transaction " + tx + " is already registered.");
+ }
+ return gtx;
+ }
+
+ /**
+ * Registers a sync hander against a tx.
+ */
+ private void registerHandler(Transaction tx, Synchronization handler, InvocationContext ctx) throws Exception
+ {
+ OrderedSynchronizationHandler orderedHandler = ctx.getTransactionContext().getOrderedSynchronizationHandler(); //OrderedSynchronizationHandler.getInstance(tx);
+
+ if (trace) log.trace("registering for TX completion: SynchronizationHandler(" + handler + ")");
+
+ orderedHandler.registerAtHead(handler);// needs to be invoked first on TX commit
+
+ notifier.notifyTransactionRegistered(tx, ctx);
+ }
+
+ /**
+ * Creates and starts a local tx
+ *
+ * @throws Exception
+ */
+ protected Transaction createLocalTx() throws Exception
+ {
+ if (trace)
+ {
+ log.trace("Creating transaction for thread " + Thread.currentThread());
+ }
+ Transaction localTx;
+ if (txManager == null) throw new Exception("Failed to create local transaction; TransactionManager is null");
+ txManager.begin();
+ localTx = txManager.getTransaction();
+ return localTx;
+ }
+
+ // ------------------------------------------------------------------------
+ // Synchronization classes
+ // ------------------------------------------------------------------------
+
+ // this controls the whole transaction
+
+ private class RemoteSynchronizationHandler implements Synchronization
+ {
+ Transaction tx = null;
+ GlobalTransaction gtx = null;
+ List<VisitableCommand> modifications = null;
+ TransactionContext transactionContext = null;
+ protected InvocationContext ctx; // the context for this call.
+
+ RemoteSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext entry)
+ {
+ this.gtx = gtx;
+ this.tx = tx;
+ this.transactionContext = entry;
+ }
+
+ public void beforeCompletion()
+ {
+ if (trace) log.trace("Running beforeCompletion on gtx " + gtx);
+
+ if (transactionContext == null)
+ {
+ log.error("Transaction has a null transaction entry - beforeCompletion() will fail.");
+ throw new IllegalStateException("cannot find transaction entry for " + gtx);
+ }
+
+ modifications = transactionContext.getModifications();
+ ctx = invocationContextContainer.get();
+ setTransactionalContext(tx, gtx, transactionContext, ctx);
+
+ if (ctx.isOptionsUninitialised() && transactionContext.getOption() != null)
+ ctx.setOptionOverrides(transactionContext.getOption());
+
+ assertCanContinue();
+
+ ctx.setOriginLocal(false);
+ }
+
+ // this should really not be done here -
+ // it is supposed to be post commit not actually run the commit
+ public void afterCompletion(int status)
+ {
+ // could happen if a rollback is called and beforeCompletion() doesn't get called.
+ if (ctx == null)
+ {
+ ctx = invocationContextContainer.get();
+ setTransactionalContext(tx, gtx, transactionContext, ctx);
+
+ if (ctx.isOptionsUninitialised() && transactionContext != null && transactionContext.getOption() != null)
+ {
+ // use the options from the transaction entry instead
+ ctx.setOptionOverrides(transactionContext.getOption());
+ }
+ }
+
+ try
+ {
+ assertCanContinue();
+
+ try
+ {
+ if (txManager.getTransaction() != null && !txManager.getTransaction().equals(tx)) txManager.resume(tx);
+ }
+ catch (Exception e)
+ {
+ log.error("afterCompletion error: " + status, e);
+ }
+
+ if (trace) log.trace("calling aftercompletion for " + gtx);
+
+ // set any transaction wide options as current for this thread.
+ if (transactionContext != null)
+ {
+ // this should ideally be set in beforeCompletion(), after compacting the list.
+ if (modifications == null) modifications = transactionContext.getModifications();
+ ctx.setOptionOverrides(transactionContext.getOption());
+ }
+ if (tx != null) transactions.remove(tx);
+
+ switch (status)
+ {
+ case Status.STATUS_COMMITTED:
+ boolean onePhaseCommit = isOnePhaseCommit();
+ if (log.isDebugEnabled()) log.debug("Running commit phase. One phase? " + onePhaseCommit);
+ runCommitPhase(ctx, gtx, modifications, onePhaseCommit);
+ log.debug("Finished commit phase");
+ break;
+ case Status.STATUS_UNKNOWN:
+ log.warn("Received JTA STATUS_UNKNOWN in afterCompletion()! XA resources may not be in sync. The app should manually clean up resources at this point.");
+ case Status.STATUS_MARKED_ROLLBACK:
+ case Status.STATUS_ROLLEDBACK:
+ log.debug("Running rollback phase");
+ runRollbackPhase(ctx, gtx, tx);
+ log.debug("Finished rollback phase");
+ break;
+
+ default:
+ throw new IllegalStateException("illegal status: " + status);
+ }
+ }
+ catch (Exception th)
+ {
+ log.trace("Caught exception ", th);
+
+ }
+ finally
+ {
+ // clean up the tx table
+ txTable.remove(gtx);
+ txTable.remove(tx);
+ setTransactionalContext(null, null, null, ctx);
+ cleanupInternalState();
+ }
+ }
+
+ private void assertCanContinue()
+ {
+ if (!componentRegistry.invocationsAllowed(true) && (ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
+ throw new IllegalStateException("Cache not in STARTED state!");
+ }
+
+ /**
+ * Cleans out (nullifies) member variables held by the sync object for easier gc. Could be (falsely) seen as a mem
+ * leak if the TM implementation hangs on to the synchronizations for an unnecessarily long time even after the tx
+ * completes. See JBCACHE-1007.
+ */
+ private void cleanupInternalState()
+ {
+ tx = null;
+ gtx = null;
+ modifications = null;
+ if (transactionContext != null) transactionContext.reset();
+ transactionContext = null;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TxInterceptor.RemoteSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
+ }
+
+ protected String getTxAsString()
+ {
+ // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
+ if (tx == null)
+ return null;
+
+ return tx.getClass().getName() + "@" + System.identityHashCode(tx);
+ }
+ }
+
+ private class LocalSynchronizationHandler extends RemoteSynchronizationHandler
+ {
+ private boolean localRollbackOnly = true;
+ // a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
+ // cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
+ // This is STILL remotely originating though and this needs to be made explicit here.
+ // this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
+ private boolean remoteLocal = false;
+ private Option originalOptions, transactionalOptions;
+
+ /**
+ * A Synchronization for locally originating txs.
+ * <p/>
+ * a VERY strange situation where a tx has remote origins, but since certain buddy group org methods perform local
+ * cleanups even when remotely triggered, and optimistic locking is used, you end up with an implicit local tx.
+ * This is STILL remotely originating though and this needs to be made explicit here.
+ * this can be checked by inspecting the InvocationContext.isOriginLocal() at the time of registering the sync.
+ *
+ * @param gtx
+ * @param tx
+ * @param remoteLocal
+ */
+ LocalSynchronizationHandler(GlobalTransaction gtx, Transaction tx, TransactionContext transactionContext, boolean remoteLocal)
+ {
+ super(gtx, tx, transactionContext);
+ this.remoteLocal = remoteLocal;
+ }
+
+ @Override
+ public void beforeCompletion()
+ {
+ super.beforeCompletion();
+ ctx.setOriginLocal(!remoteLocal); // this is the LOCAL sync handler after all!
+ // fetch the modifications before the transaction is committed
+ // (and thus removed from the txTable)
+ setTransactionalContext(tx, gtx, transactionContext, ctx);
+ if (!transactionContext.hasModifications())
+ {
+ if (trace) log.trace("No modifications in this tx. Skipping beforeCompletion()");
+ modifications = Collections.emptyList();
+ return;
+ }
+ else
+ {
+ modifications = transactionContext.getModifications();
+ }
+
+ // set any transaction wide options as current for this thread, caching original options that would then be reset
+ originalOptions = ctx.getOptionOverrides();
+ transactionalOptions = transactionContext.getOption();
+ ctx.setOptionOverrides(transactionalOptions);
+
+ try
+ {
+ switch (tx.getStatus())
+ {
+ // if we are active or preparing then we can go ahead
+ case Status.STATUS_ACTIVE:
+ case Status.STATUS_PREPARING:
+ // run a prepare call.
+
+ Object result = isOnePhaseCommit() ? null : runPreparePhase(ctx, gtx, modifications);
+
+ if (result instanceof Throwable)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Transaction needs to be rolled back - the cache returned an instance of Throwable for this prepare call (tx=" + tx + " and gtx=" + gtx + ")", (Throwable) result);
+ tx.setRollbackOnly();
+ throw (Throwable) result;
+ }
+ break;
+ default:
+ throw new CacheException("transaction " + tx + " in status " + tx.getStatus() + " unable to start transaction");
+ }
+ }
+ catch (Throwable t)
+ {
+ if (log.isWarnEnabled()) log.warn("Caught exception, will now set transaction to roll back", t);
+ try
+ {
+ tx.setRollbackOnly();
+ }
+ catch (SystemException se)
+ {
+ throw new RuntimeException("setting tx rollback failed ", se);
+ }
+ if (t instanceof RuntimeException)
+ throw (RuntimeException) t;
+ else
+ throw new RuntimeException("", t);
+ }
+ finally
+ {
+ localRollbackOnly = false;
+ setTransactionalContext(null, null, null, ctx);
+ ctx.setOptionOverrides(originalOptions);
+ }
+ }
+
+ @Override
+ public void afterCompletion(int status)
+ {
+ // could happen if a rollback is called and beforeCompletion() doesn't get called.
+ if (ctx == null) ctx = invocationContextContainer.get();
+ ctx.setLocalRollbackOnly(localRollbackOnly);
+ setTransactionalContext(tx, gtx, transactionContext, ctx);
+ if (transactionalOptions != null) ctx.setOptionOverrides(transactionalOptions);
+ try
+ {
+ super.afterCompletion(status);
+ }
+ finally
+ {
+ ctx.setOptionOverrides(originalOptions);
+ }
+ }
+
+ @Override
+ public String toString()
+ {
+ return "TxInterceptor.LocalSynchronizationHandler(gtx=" + gtx + ", tx=" + getTxAsString() + ")";
+ }
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ prepares = 0;
+ commits = 0;
+ rollbacks = 0;
+ }
+
+ @ManagedOperation
+ public Map<String, Object> dumpStatistics()
+ {
+ Map<String, Object> retval = new HashMap<String, Object>(3);
+ retval.put("Prepares", prepares);
+ retval.put("Commits", commits);
+ retval.put("Rollbacks", rollbacks);
+ return retval;
+ }
+
+ @ManagedAttribute
+ public boolean getStatisticsEnabled()
+ {
+ return this.statsEnabled;
+ }
+
+ @ManagedAttribute
+ public void setStatisticsEnabled(boolean enabled)
+ {
+ this.statsEnabled = enabled;
+ }
+
+ @ManagedAttribute(description = "number of transaction prepares")
+ public long getPrepares()
+ {
+ return prepares;
+ }
+
+ @ManagedAttribute(description = "number of transaction commits")
+ public long getCommits()
+ {
+ return commits;
+ }
+
+ @ManagedAttribute(description = "number of transaction rollbacks")
+ public long getRollbacks()
+ {
+ return rollbacks;
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/lock/StripedLockManager.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -23,8 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.invocation.InvocationContextContainer;
-import org.jboss.cache.lock.MVCCLockManager;
import org.jboss.cache.util.concurrent.locks.LockContainer;
import org.jboss.cache.util.concurrent.locks.OwnableReentrantLock;
import org.jboss.cache.util.concurrent.locks.OwnableReentrantLockContainer;
@@ -33,6 +31,7 @@
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
import javax.transaction.TransactionManager;
import java.util.List;
@@ -53,7 +52,7 @@
LockContainer<Object> lockContainer;
private TransactionManager transactionManager;
private InvocationContextContainer invocationContextContainer;
- private static final Log log = LogFactory.getLog(MVCCLockManager.class);
+ private static final Log log = LogFactory.getLog(StripedLockManager.class);
private static final boolean trace = log.isTraceEnabled();
@Inject
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntry.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,7 +21,7 @@
*/
package org.jboss.starobrno.mvcc;
-import org.jboss.starobrno.DataContainer;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import java.util.Map.Entry;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/MVCCEntryWrapper.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -24,8 +24,8 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.lock.TimeoutException;
-import org.jboss.starobrno.DataContainer;
import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.EntryFactory;
import org.jboss.starobrno.factories.annotations.Inject;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/NullMarkerEntry.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -21,7 +21,7 @@
*/
package org.jboss.starobrno.mvcc;
-import org.jboss.starobrno.DataContainer;
+import org.jboss.starobrno.container.DataContainer;
/**
* A marker node to represent a null node for repeatable read, so that a read that returns a null can continue to return
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/ReadCommittedEntry.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -23,7 +23,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.starobrno.DataContainer;
+import org.jboss.starobrno.container.DataContainer;
import org.jboss.starobrno.context.InvocationContext;
import static org.jboss.starobrno.mvcc.ReadCommittedEntry.Flags.*;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/mvcc/RepeatableReadEntry.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -25,7 +25,7 @@
import org.apache.commons.logging.LogFactory;
import org.jboss.cache.mvcc.RepeatableReadNode;
import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.DataContainer;
+import org.jboss.starobrno.container.DataContainer;
import static org.jboss.starobrno.mvcc.ReadCommittedEntry.Flags.CHANGED;
import java.util.Map.Entry;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/notifications (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/notifier)
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifier/Notifier.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/Notifier.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -19,11 +19,11 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.starobrno.notifier;
+package org.jboss.starobrno.notifications;
import org.jboss.cache.buddyreplication.BuddyGroup;
import org.jboss.starobrno.context.InvocationContext;
-import org.jboss.starobrno.notifier.event.NodeModifiedEvent;
+import org.jboss.starobrno.notifications.event.NodeModifiedEvent;
import org.jgroups.View;
import javax.transaction.Transaction;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifier/NotifierImpl.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/NotifierImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.starobrno.notifier;
+package org.jboss.starobrno.notifications;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -43,7 +43,7 @@
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.factories.annotations.Stop;
import org.jboss.starobrno.mvcc.MVCCEntry;
-import org.jboss.starobrno.notifier.event.NodeModifiedEvent;
+import org.jboss.starobrno.notifications.event.NodeModifiedEvent;
import org.jgroups.View;
import javax.transaction.Transaction;
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/notifications/event/NodeModifiedEvent.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/notifier/event/NodeModifiedEvent.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/notifications/event/NodeModifiedEvent.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -19,7 +19,7 @@
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-package org.jboss.starobrno.notifier.event;
+package org.jboss.starobrno.notifications.event;
import org.jboss.cache.notifications.event.NodeEvent;
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,154 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.cache.Fqn;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster. For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+ /**
+ * Disconnects and closes the underlying JGroups channel.
+ */
+ void disconnect();
+
+ /**
+ * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying JGroups channel if this is
+ * still open/connected.
+ */
+ void stop();
+
+ /**
+ * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication). Connecting
+ * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+ * available before this method is called.
+ */
+ void start();
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param synchronous if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue. See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+ /**
+ * @return true if the current Channel is the coordinator of the cluster.
+ */
+ boolean isCoordinator();
+
+ /**
+ * @return the Address of the current coordinator.
+ */
+ Address getCoordinator();
+
+ /**
+ * Retrieves the local JGroups channel's address
+ *
+ * @return an Address
+ */
+ Address getLocalAddress();
+
+ /**
+ * Returns a defensively copied list of members in the current cluster view.
+ */
+ List<Address> getMembers();
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param sourceTarget Fqn on source to retrieve state for
+ * @param integrationTarget integration point on local cache to apply state
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
+
+ /**
+ * Retrieves partial state from remote instances.
+ *
+ * @param sources sources to consider for a state transfer
+ * @param subtree Fqn subtree to retrieve. Will be integrated at the same point.
+ * @throws Exception in the event of problems
+ */
+ void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
+
+ /**
+ * Retrieves the Channel
+ *
+ * @return a channel
+ */
+ Channel getChannel();
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Added: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,831 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.SuspectException;
+import org.jboss.cache.jmx.annotations.MBean;
+import org.jboss.cache.jmx.annotations.ManagedAttribute;
+import org.jboss.cache.jmx.annotations.ManagedOperation;
+import org.jboss.cache.lock.TimeoutException;
+import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
+import org.jboss.cache.marshall.Marshaller;
+import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
+import org.jboss.cache.statetransfer.DefaultStateTransferManager;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
+import org.jboss.cache.util.reflect.ReflectionUtil;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.config.RuntimeConfig;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.factories.annotations.Stop;
+import org.jboss.starobrno.interceptors.InterceptorChain;
+import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.ChannelException;
+import org.jgroups.ChannelFactory;
+import org.jgroups.ExtendedMembershipListener;
+import org.jgroups.JChannel;
+import org.jgroups.StateTransferException;
+import org.jgroups.View;
+import org.jgroups.blocks.GroupRequest;
+import org.jgroups.blocks.RspFilter;
+import org.jgroups.protocols.TP;
+import org.jgroups.stack.ProtocolStack;
+import org.jgroups.util.Rsp;
+import org.jgroups.util.RspList;
+
+import javax.transaction.TransactionManager;
+import java.text.NumberFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+import java.util.Vector;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Manager that handles all RPC calls between JBoss Cache instances
+ *
+ * @author <a href="mailto:manik at jboss.org">Manik Surtani (manik at jboss.org)</a>
+ */
+ at MBean(objectName = "RPCManager")
+public class RPCManagerImpl implements RPCManager
+{
+ private Channel channel;
+ private final Log log = LogFactory.getLog(RPCManagerImpl.class);
+ private List<Address> members;
+ private long replicationCount;
+ private long replicationFailures;
+ private boolean statisticsEnabled = false;
+
+ private final Object coordinatorLock = new Object();
+ /**
+ * True if this Cache is the coordinator.
+ */
+ private volatile boolean coordinator = false;
+ /**
+ * Thread gate used to block Dispatcher during JGroups FLUSH protocol
+ */
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ /**
+ * JGroups RpcDispatcher in use.
+ */
+ private CommandAwareRpcDispatcher rpcDispatcher = null;
+
+ /**
+ * JGroups message listener.
+ */
+ private ChannelMessageListener messageListener;
+ private Configuration configuration;
+ private Notifier notifier;
+ private CacheSPI spi;
+ private InvocationContextContainer invocationContextContainer;
+ private final boolean trace = log.isTraceEnabled();
+ private Marshaller marshaller;
+ private TransactionManager txManager;
+ private TransactionTable txTable;
+ private InterceptorChain interceptorChain;
+
+ private boolean isUsingBuddyReplication;
+ private boolean isInLocalMode;
+ private ComponentRegistry componentRegistry;
+ private LockManager lockManager;
+
+ @Inject
+ public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
+ Marshaller marshaller, TransactionTable txTable,
+ TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
+ ComponentRegistry componentRegistry, LockManager lockManager)
+ {
+ this.messageListener = messageListener;
+ this.configuration = configuration;
+ this.notifier = notifier;
+ // TODO: Inject cacheSPI when we are ready
+// this.spi = spi;
+ this.marshaller = marshaller;
+ this.txManager = txManager;
+ this.txTable = txTable;
+ this.invocationContextContainer = container;
+ this.interceptorChain = interceptorChain;
+ this.componentRegistry = componentRegistry;
+ this.lockManager = lockManager;
+ }
+
+ // ------------ START: Lifecycle methods ------------
+
+ @Start(priority = 15)
+ public void start()
+ {
+ switch (configuration.getCacheMode())
+ {
+ case LOCAL:
+ log.debug("cache mode is local, will not create the channel");
+ isInLocalMode = true;
+ isUsingBuddyReplication = false;
+ break;
+ case REPL_SYNC:
+ case REPL_ASYNC:
+ case INVALIDATION_ASYNC:
+ case INVALIDATION_SYNC:
+ isInLocalMode = false;
+ isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
+ if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+
+ boolean fetchState = shouldFetchStateOnStartup();
+ initialiseChannelAndRpcDispatcher(fetchState);
+
+ if (fetchState)
+ {
+ try
+ {
+ long start = System.currentTimeMillis();
+ // connect and state transfer
+ channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
+ //if I am not the only and the first member than wait for a state to arrive
+ if (getMembers().size() > 1) messageListener.waitForState();
+
+ if (log.isDebugEnabled())
+ log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
+ }
+ catch (StateTransferException ste)
+ {
+ // make sure we disconnect from the channel before we throw this exception!
+ // JBCACHE-761
+ disconnect();
+ throw new CacheException("Unable to fetch state on startup", ste);
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups channel", e);
+ }
+ catch (Exception ex)
+ {
+ throw new CacheException("Unable to fetch state on startup", ex);
+ }
+ }
+ else
+ {
+ //otherwise just connect
+ try
+ {
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups channel", e);
+ }
+ }
+ if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+ }
+ }
+
+ public void disconnect()
+ {
+ if (channel != null && channel.isOpen())
+ {
+ log.info("Disconnecting and closing the Channel");
+ channel.disconnect();
+ channel.close();
+ }
+ }
+
+ @Stop(priority = 8)
+ public void stop()
+ {
+ try
+ {
+ disconnect();
+ }
+ catch (Exception toLog)
+ {
+ log.error("Problem closing channel; setting it to null", toLog);
+ }
+
+ channel = null;
+ configuration.getRuntimeConfig().setChannel(null);
+ if (rpcDispatcher != null)
+ {
+ log.info("Stopping the RpcDispatcher");
+ rpcDispatcher.stop();
+ }
+
+ if (members != null) members = null;
+
+ coordinator = false;
+
+ rpcDispatcher = null;
+ }
+
+ /**
+ * @return true if we need to fetch state on startup. I.e., initiate a state transfer.
+ */
+ private boolean shouldFetchStateOnStartup()
+ {
+ boolean loaderFetch = configuration.getCacheLoaderConfig() != null && configuration.getCacheLoaderConfig().isFetchPersistentState();
+ return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication && (configuration.isFetchInMemoryState() || loaderFetch);
+ }
+
+ @SuppressWarnings("deprecation")
+ private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws CacheException
+ {
+ channel = configuration.getRuntimeConfig().getChannel();
+ if (channel == null)
+ {
+ // Try to create a multiplexer channel
+ channel = getMultiplexerChannel();
+
+ if (channel != null)
+ {
+ ReflectionUtil.setValue(configuration, "accessible", true);
+ configuration.setUsingMultiplexer(true);
+ if (log.isDebugEnabled())
+ log.debug("Created Multiplexer Channel for cache cluster " + configuration.getClusterName() + " using stack " + configuration.getMultiplexerStack());
+ }
+ else
+ {
+ try
+ {
+ if (configuration.getClusterConfig() == null)
+ {
+ log.debug("setting cluster properties to default value");
+ channel = new JChannel(configuration.getDefaultClusterConfig());
+ }
+ else
+ {
+ if (trace)
+ {
+ log.trace("Cache cluster properties: " + configuration.getClusterConfig());
+ }
+ channel = new JChannel(configuration.getClusterConfig());
+ }
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException(e);
+ }
+ }
+
+ configuration.getRuntimeConfig().setChannel(channel);
+ }
+
+ // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+ // remote instances will be received by self.
+ channel.setOpt(Channel.LOCAL, false);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
+ channel.setOpt(Channel.BLOCK, true);
+ // todo fix me
+ /*
+ if (configuration.isUseRegionBasedMarshalling())
+ {
+ rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
+ spi, invocationContextContainer, interceptorChain, componentRegistry);
+ }
+ else
+ {
+ rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
+ invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+ }
+ */
+ checkAppropriateConfig();
+ rpcDispatcher.setRequestMarshaller(marshaller);
+ rpcDispatcher.setResponseMarshaller(marshaller);
+ }
+
+ public Channel getChannel()
+ {
+ return channel;
+ }
+
+
+ private JChannel getMultiplexerChannel() throws CacheException
+ {
+ String stackName = configuration.getMultiplexerStack();
+
+ RuntimeConfig rtc = configuration.getRuntimeConfig();
+ ChannelFactory channelFactory = rtc.getMuxChannelFactory();
+ JChannel muxchannel = null;
+
+ if (channelFactory != null)
+ {
+ try
+ {
+ muxchannel = (JChannel) channelFactory.createMultiplexerChannel(stackName, configuration.getClusterName());
+ }
+ catch (Exception e)
+ {
+ throw new CacheException("Failed to create multiplexed channel using stack " + stackName, e);
+ }
+ }
+
+ return muxchannel;
+ }
+
+
+ @Deprecated
+ private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+ {
+ Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+ Object owner = lockManager.getOwner(node);
+
+ // todo fix me
+ /*
+ if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
+
+
+ for (Object readOwner : lockManager.getReadOwners(node))
+ {
+ if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
+ }
+ */
+
+ for (GlobalTransaction deadOwner : deadOwners)
+ {
+ boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+ // TODO: Fix me!!!
+// boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager, deadOwner, localTx, txTable, txManager);
+ boolean broken = true;
+
+ if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+ }
+
+ // Recursively unlock children
+ for (Object child : node.getChildrenDirect())
+ {
+ removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+ }
+ }
+
+
+ /**
+ * Only used with MVCC.
+ */
+ // TODO: Fix me
+ /*
+ private void removeLocksForDeadMembers(InternalNode<?, ?> node, List deadMembers)
+ {
+ Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+ Object owner = lockManager.getWriteOwner(node.getFqn());
+
+ if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
+
+ // MVCC won't have any read locks.
+
+ for (GlobalTransaction deadOwner : deadOwners)
+ {
+ boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+// boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager, deadOwner, localTx, txTable, txManager);
+ boolean broken = true; // TODO fix me!!
+
+ if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+ }
+
+ // Recursively unlock children
+ for (InternalNode child : node.getChildren()) removeLocksForDeadMembers(child, deadMembers);
+ }
+
+ private boolean isLockOwnerDead(Object owner, List deadMembers)
+ {
+ boolean result = false;
+ if (owner != null && owner instanceof GlobalTransaction)
+ {
+ Object addr = ((GlobalTransaction) owner).getAddress();
+ result = deadMembers.contains(addr);
+ }
+ return result;
+ }
+ */
+
+ // ------------ END: Lifecycle methods ------------
+
+ // ------------ START: RPC call methods ------------
+ public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, int mode, long timeout, boolean useOutOfBandMessage) throws Exception
+ {
+ return callRemoteMethods(recipients, command, mode, timeout, null, useOutOfBandMessage);
+ }
+
+ public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception
+ {
+ return callRemoteMethods(recipients, command, synchronous ? GroupRequest.GET_ALL : GroupRequest.GET_NONE, timeout, useOutOfBandMessage);
+ }
+
+ public List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand command, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception
+ {
+ boolean success = true;
+ try
+ {
+ // short circuit if we don't have an RpcDispatcher!
+ if (rpcDispatcher == null) return null;
+ int modeToUse = mode;
+ int preferredMode;
+ if ((preferredMode = spi.getInvocationContext().getOptionOverrides().getGroupRequestMode()) > -1)
+ modeToUse = preferredMode;
+ if (trace)
+ log.trace("callRemoteMethods(): valid members are " + recipients + " methods: " + command + " Using OOB? " + useOutOfBandMessage);
+ if (channel.flushSupported())
+ {
+ if (!flushBlockGate.await(configuration.getStateRetrievalTimeout(), TimeUnit.MILLISECONDS))
+ throw new TimeoutException("State retrieval timed out waiting for flush unblock.");
+ }
+ useOutOfBandMessage = false;
+ // todo fix me!!
+ RspList rsps = null;//rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+ if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
+ if (trace)
+ log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
+ // short-circuit no-return-value calls.
+ if (rsps == null) return Collections.emptyList();
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+ for (Rsp rsp : rsps.values())
+ {
+ if (rsp.wasSuspected() || !rsp.wasReceived())
+ {
+ CacheException ex;
+ if (rsp.wasSuspected())
+ {
+ ex = new SuspectException("Suspected member: " + rsp.getSender());
+ }
+ else
+ {
+ ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+ }
+ retval.add(new ReplicationException("rsp=" + rsp, ex));
+ success = false;
+ }
+ else
+ {
+ Object value = rsp.getValue();
+ if (value instanceof Exception && !(value instanceof ReplicationException))
+ {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
+ throw (Exception) value;
+ }
+ retval.add(value);
+ success = true;
+ }
+ }
+ return retval;
+ }
+ catch (Exception e)
+ {
+ success = false;
+ throw e;
+ }
+ finally
+ {
+ computeStats(success);
+ }
+ }
+
+ // ------------ START: Partial state transfer methods ------------
+
+ public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
+ {
+ String encodedStateId = sourceTarget + DefaultStateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
+ fetchPartialState(sources, encodedStateId);
+ }
+
+ public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
+ {
+ if (subtree == null)
+ {
+ throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
+ }
+ fetchPartialState(sources, subtree.toString());
+ }
+
+ private void fetchPartialState(List<Address> sources, String stateId) throws Exception
+ {
+ if (sources == null || sources.isEmpty() || stateId == null)
+ {
+ // should this really be throwing an exception? Are there valid use cases where partial state may not be available? - Manik
+ // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
+ //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+ if (log.isWarnEnabled())
+ log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
+ return;
+ }
+
+ List<Address> targets = new LinkedList<Address>(sources);
+
+ //skip *this* node as a target
+ targets.remove(getLocalAddress());
+
+ if (targets.isEmpty())
+ {
+ // Definitely no exception here -- this happens every time the 1st node in the
+ // cluster activates a region!! -- Brian
+ if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are no target members specified");
+ return;
+ }
+
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
+ boolean successfulTransfer = false;
+ for (Address target : targets)
+ {
+ try
+ {
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
+ messageListener.setStateSet(false);
+ successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
+ if (successfulTransfer)
+ {
+ try
+ {
+ messageListener.waitForState();
+ }
+ catch (Exception transferFailed)
+ {
+ if (log.isTraceEnabled()) log.trace("Error while fetching state", transferFailed);
+ successfulTransfer = false;
+ }
+ }
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
+ if (successfulTransfer) break;
+ }
+ catch (IllegalStateException ise)
+ {
+ // thrown by the JGroups channel if state retrieval fails.
+ if (log.isInfoEnabled())
+ log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
+ }
+ }
+
+ if (!successfulTransfer)
+ {
+ if (log.isDebugEnabled())
+ log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
+ }
+
+ }
+
+ // ------------ END: Partial state transfer methods ------------
+
+ // ------------ START: Informational methods ------------
+
+ public Address getLocalAddress()
+ {
+ return channel != null ? channel.getLocalAddress() : null;
+ }
+
+ public List<Address> getMembers()
+ {
+ if (isInLocalMode) return null;
+ if (members == null)
+ return Collections.emptyList();
+ else
+ return members;
+ }
+
+ public boolean isCoordinator()
+ {
+ return coordinator;
+ }
+
+ public Address getCoordinator()
+ {
+ if (channel == null)
+ {
+ return null;
+ }
+
+ synchronized (coordinatorLock)
+ {
+ while (members == null || members.isEmpty())
+ {
+ log.debug("getCoordinator(): waiting on viewAccepted()");
+ try
+ {
+ coordinatorLock.wait();
+ }
+ catch (InterruptedException e)
+ {
+ log.error("getCoordinator(): Interrupted while waiting for members to be set", e);
+ break;
+ }
+ }
+ return members != null && members.size() > 0 ? members.get(0) : null;
+ }
+ }
+
+ // ------------ END: Informational methods ------------
+
+ /*----------------------- MembershipListener ------------------------*/
+
+ protected class MembershipListenerAdaptor implements ExtendedMembershipListener
+ {
+
+ public void viewAccepted(View newView)
+ {
+ Vector<Address> newMembers = newView.getMembers();
+ if (log.isInfoEnabled()) log.info("Received new cluster view: " + newView);
+ synchronized (coordinatorLock)
+ {
+ boolean needNotification = false;
+ if (newMembers != null)
+ {
+ if (members != null)
+ {
+ // we had a membership list before this event. Check to make sure we haven't lost any members,
+ // and if so, determine what members have been removed
+ // and roll back any tx and break any locks
+ List<Address> removed = new ArrayList<Address>(members);
+ removed.removeAll(newMembers);
+ spi.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+ // todo fix me
+ NodeSPI root = null; // spi.getRoot();
+ if (root != null)
+ {
+ // todo fix me
+ //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
+ }
+ }
+
+ members = new ArrayList<Address>(newMembers); // defensive copy.
+
+ needNotification = true;
+ }
+
+ // Now that we have a view, figure out if we are the coordinator
+ coordinator = (members != null && members.size() != 0 && members.get(0).equals(getLocalAddress()));
+
+ // now notify listeners - *after* updating the coordinator. - JBCACHE-662
+ if (needNotification && notifier != null)
+ {
+ // TODO: Fix me when we have repl working
+ throw new UnsupportedOperationException("Fix me!");
+// InvocationContext ctx = spi.getInvocationContext();
+// notifier.notifyViewChange(newView, ctx);
+ }
+
+ // Wake up any threads that are waiting to know about who the coordinator is
+ coordinatorLock.notifyAll();
+ }
+ }
+
+ /**
+ * Called when a member is suspected.
+ */
+ public void suspect(Address suspected_mbr)
+ {
+ }
+
+ /**
+ * Indicates that a channel has received a BLOCK event from FLUSH protocol.
+ */
+ public void block()
+ {
+ flushBlockGate.close();
+ if (log.isDebugEnabled()) log.debug("Block received at " + getLocalAddress());
+ notifier.notifyCacheBlocked(true);
+ notifier.notifyCacheBlocked(false);
+
+ if (log.isDebugEnabled()) log.debug("Block processed at " + getLocalAddress());
+ }
+
+ /**
+ * Indicates that a channel has received a UNBLOCK event from FLUSH protocol.
+ */
+ public void unblock()
+ {
+ if (log.isDebugEnabled()) log.debug("UnBlock received at " + getLocalAddress());
+
+ notifier.notifyCacheUnblocked(true);
+ notifier.notifyCacheUnblocked(false);
+
+ if (log.isDebugEnabled()) log.debug("UnBlock processed at " + getLocalAddress());
+ flushBlockGate.open();
+ }
+
+ }
+
+ //jmx operations
+ private void computeStats(boolean success)
+ {
+ if (statisticsEnabled && rpcDispatcher != null)
+ {
+ if (success)
+ {
+ replicationCount++;
+ }
+ else
+ {
+ replicationFailures++;
+ }
+ }
+ }
+
+ @ManagedOperation
+ public void resetStatistics()
+ {
+ this.replicationCount = 0;
+ this.replicationFailures = 0;
+ }
+
+ @ManagedAttribute(description = "number of successful replications")
+ public long getReplicationCount()
+ {
+ return replicationCount;
+ }
+
+ @ManagedAttribute(description = "number of failed replications")
+ public long getReplicationFailures()
+ {
+ return replicationFailures;
+ }
+
+ @ManagedAttribute(description = "whether or not jmx statistics are enabled")
+ public boolean isStatisticsEnabled()
+ {
+ return statisticsEnabled;
+ }
+
+ @ManagedAttribute
+ public void setStatisticsEnabled(boolean statisticsEnabled)
+ {
+ this.statisticsEnabled = statisticsEnabled;
+ }
+
+ @ManagedAttribute
+ public String getSuccessRatio()
+ {
+ if (replicationCount == 0 || !statisticsEnabled)
+ {
+ return "N/A";
+ }
+ double totalCount = replicationCount + replicationFailures;
+ double ration = (double) replicationCount / totalCount * 100d;
+ return NumberFormat.getInstance().format(ration) + "%";
+ }
+
+ /**
+ * Checks to see whether the cache is using an appropriate JGroups config.
+ */
+ private void checkAppropriateConfig()
+ {
+ //if we use a shared transport do not log any warn message
+ if (configuration.getMultiplexerStack() != null)
+ return;
+ //bundling is not good for sync caches
+ Configuration.CacheMode cacheMode = configuration.getCacheMode();
+ if (!cacheMode.equals(Configuration.CacheMode.LOCAL) && configuration.getCacheMode().isSynchronous())
+ {
+ ProtocolStack stack = ((JChannel) channel).getProtocolStack();
+ TP transport = stack.getTransport();
+ if (transport.isEnableBundling())
+ {
+ log.warn("You have enabled jgroups's message bundling, which is not recommended for sync replication. If there is no particular " +
+ "reason for this we strongly recommend to disable message bundling in JGroups config (enable_bundling=\"false\").");
+ }
+ }
+ //bundling is good for async caches
+ if (!cacheMode.isSynchronous())
+ {
+ ProtocolStack stack = ((JChannel) channel).getProtocolStack();
+ TP transport = stack.getTransport();
+ if (!transport.isEnableBundling())
+ {
+ log.warn("You have disabled jgroups's message bundling, which is not recommended for async replication. If there is no particular " +
+ "reason for this we strongly recommend to enable message bundling in JGroups config (enable_bundling=\"true\").");
+ }
+ }
+ }
+}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ReplicationException.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/ReplicationException.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ReplicationException.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ReplicationException.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,59 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.CacheException;
+
+/**
+ * Thrown when a replication problem occurred
+ */
+public class ReplicationException extends CacheException
+{
+
+ private static final long serialVersionUID = 33172388691879866L;
+
+ public ReplicationException()
+ {
+ super();
+ }
+
+ public ReplicationException(Throwable cause)
+ {
+ super(cause);
+ }
+
+ public ReplicationException(String msg)
+ {
+ super(msg);
+ }
+
+ public ReplicationException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+
+ @Override
+ public String toString()
+ {
+ return super.toString();
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ReplicationException.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/transaction/OrderedSynchronizationHandler.java (from rev 6891, core/branches/flat/src/main/java/org/jboss/starobrno/OrderedSynchronizationHandler.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/transaction/OrderedSynchronizationHandler.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/transaction/OrderedSynchronizationHandler.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -0,0 +1,119 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.transaction;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+import javax.transaction.RollbackException;
+import javax.transaction.Synchronization;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import java.util.LinkedList;
+
+/**
+ * Maintains a list of Synchronization handlers. Reason is that we have to
+ * invoke certain handlers <em>before</em> others. See the description in
+ * SyncTxUnitTestCase.testConcurrentPuts(). For example, for synchronous
+ * replication, we have to execute the ReplicationInterceptor's
+ * afterCompletion() <em>before</em> the TransactionInterceptor's.
+ *
+ * @author Bela Ban
+ * @version $Id$
+ */
+public class OrderedSynchronizationHandler implements Synchronization
+{
+ static final Log log = LogFactory.getLog(org.jboss.cache.interceptors.OrderedSynchronizationHandler.class);
+
+ private Transaction tx = null;
+ private final LinkedList<Synchronization> handlers = new LinkedList<Synchronization>();
+
+ public OrderedSynchronizationHandler(Transaction tx) throws SystemException, RollbackException
+ {
+ this.tx = tx;
+ tx.registerSynchronization(this);
+ }
+
+ public void registerAtHead(Synchronization handler)
+ {
+ register(handler, true);
+ }
+
+ public void registerAtTail(Synchronization handler)
+ {
+ register(handler, false);
+ }
+
+ void register(Synchronization handler, boolean head)
+ {
+ if (handler != null && !handlers.contains(handler))
+ {
+ if (head)
+ handlers.addFirst(handler);
+ else
+ handlers.addLast(handler);
+ }
+ }
+
+ public void beforeCompletion()
+ {
+ for (Synchronization sync : handlers)
+ {
+ sync.beforeCompletion();
+ }
+ }
+
+ public void afterCompletion(int status)
+ {
+ RuntimeException exceptionInAfterCompletion = null;
+ for (Synchronization sync : handlers)
+ {
+ try
+ {
+ sync.afterCompletion(status);
+ }
+ catch (Throwable t)
+ {
+ log.error("failed calling afterCompletion() on " + sync, t);
+ exceptionInAfterCompletion = (RuntimeException) t;
+ }
+ }
+
+ // throw the exception so the TM can deal with it.
+ if (exceptionInAfterCompletion != null) throw exceptionInAfterCompletion;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "tx=" + getTxAsString() + ", handlers=" + handlers;
+ }
+
+ private String getTxAsString()
+ {
+ // JBCACHE-1114 -- don't call toString() on tx or it can lead to stack overflow
+ if (tx == null)
+ return null;
+
+ return tx.getClass().getName() + "@" + System.identityHashCode(tx);
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/transaction/OrderedSynchronizationHandler.java
___________________________________________________________________
Name: svn:keywords
+ Author Date Id Revision
Name: svn:eol-style
+ native
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-10-08 18:56:43 UTC (rev 6891)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-10-09 08:56:47 UTC (rev 6892)
@@ -23,13 +23,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.InvocationContext;
-import org.jboss.cache.RPCManager;
import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.context.TransactionContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.factories.context.ContextFactory;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jgroups.Address;
import javax.transaction.Status;
More information about the jbosscache-commits
mailing list