[jbosscache-commits] JBoss Cache SVN: r7276 - in core/branches/flat/src: main/java/org/jboss/starobrno/cluster and 10 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Dec 10 11:16:43 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-12-10 11:16:42 -0500 (Wed, 10 Dec 2008)
New Revision: 7276

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
   core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java
Removed:
   core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
   core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
   core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
   core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
   core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
   core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
   core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
Log:
Fixed putForExternalread() and added unit tests

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -40,7 +40,7 @@
    /**
     * Under special operating behavior, associates the value with the specified key.
     * <ul>
-    * <li> Only goes through if the key specified does not exist; no-op otherwise (similar to {@link java.util.concurrent.ConcurrentMap#replace(Object, Object)})</i>
+    * <li> Only goes through if the key specified does not exist; no-op otherwise (similar to {@link java.util.concurrent.ConcurrentMap#putIfAbsent(Object, Object)})</i>
     * <li> Force asynchronous mode for replication to prevent any blocking.</li>
     * <li> invalidation does not take place. </li>
     * <li> 0ms lock timeout to prevent any blocking here either. If the lock is not acquired, this method is a no-op, and swallows the timeout exception.</li>

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -49,6 +49,7 @@
 import org.jboss.starobrno.lock.LockManager;
 import org.jboss.starobrno.marshall.ExtendedMarshaller;
 import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.statetransfer.StateTransferManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
@@ -224,7 +225,7 @@
          ctx.getOptionOverrides().setFailSilently(true);
          ctx.getOptionOverrides().setForceAsynchronous(true);
          ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
-         replace(key, value);
+         putIfAbsent(key, value);
       }
       catch (Exception e)
       {

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -36,6 +36,7 @@
 import org.jboss.starobrno.lock.LockManager;
 import org.jboss.starobrno.marshall.ExtendedMarshaller;
 import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.statetransfer.StateTransferManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -1,135 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
-   /**
-    * Disconnects and closes the underlying JGroups channel.
-    */
-   void disconnect();
-
-   /**
-    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
-    * still open/connected.
-    */
-   void stop();
-
-   /**
-    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
-    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
-    * available before this method is called.
-    */
-   void start();
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * @return true if the current Channel is the coordinator of the cluster.
-    */
-   boolean isCoordinator();
-
-   /**
-    * @return the Address of the current coordinator.
-    */
-   Address getCoordinator();
-
-   /**
-    * Retrieves the local JGroups channel's address
-    *
-    * @return an Address
-    */
-   Address getLocalAddress();
-
-   /**
-    * Returns a defensively copied list of  members in the current cluster view.
-    */
-   List<Address> getMembers();
-
-
-   /**
-    * Retrieves the Channel
-    *
-    * @return a channel
-    */
-   Channel getChannel();
-}
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -23,14 +23,14 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.remote.ReplicateCommand;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
 import org.jboss.starobrno.factories.annotations.Stop;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jboss.starobrno.commands.CommandsFactory;
-import org.jboss.starobrno.commands.remote.ReplicateCommand;
-import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.remoting.RPCManager;
 
 import java.util.ArrayList;
 import java.util.LinkedList;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -77,24 +77,28 @@
 
    public Object perform(InvocationContext ctx) throws Throwable
    {
-      notifier.notifyCacheEntryModified(key, true, ctx);
+      Object o = null;
       MVCCEntry e = ctx.lookupEntry(key);
-      Object o;
-      if (value instanceof Delta)
+      if (e.getValue() == null || !putIfAbsent)
       {
-         // magic
-         Delta dv = (Delta) value;
-         Object existing = e.getValue();
-         DeltaAware toMergeWith = null;
-         if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
-         e.setValue(dv.merge(toMergeWith));
-         o = existing;
+         notifier.notifyCacheEntryModified(key, true, ctx);
+
+         if (value instanceof Delta)
+         {
+            // magic
+            Delta dv = (Delta) value;
+            Object existing = e.getValue();
+            DeltaAware toMergeWith = null;
+            if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
+            e.setValue(dv.merge(toMergeWith));
+            o = existing;
+         }
+         else
+         {
+            o = e.setValue(value);
+         }
+         notifier.notifyCacheEntryModified(key, false, ctx);
       }
-      else
-      {
-         o = e.setValue(value);
-      }
-      notifier.notifyCacheEntryModified(key, false, ctx);
       return o;
    }
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -21,8 +21,8 @@
  */
 package org.jboss.starobrno.config;
 
-import org.jboss.cache.RPCManager;
 import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.util.Util;
 import org.jgroups.Channel;
 import org.jgroups.ChannelFactory;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -26,7 +26,6 @@
 import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
 import org.jboss.cache.invocation.CacheInvocationDelegate;
 import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.batch.BatchContainer;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.config.ConfigurationException;
@@ -39,6 +38,7 @@
 import org.jboss.starobrno.marshall.VersionAwareMarshaller;
 import org.jboss.starobrno.notifications.Notifier;
 import org.jboss.starobrno.remoting.ChannelMessageListener;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.remoting.RPCManagerImpl;
 import org.jboss.starobrno.transaction.TransactionTable;
 
@@ -67,10 +67,9 @@
             {
                componentImpl = VersionAwareMarshaller.class;
             }
-            else
-            if (componentType.equals(RPCManager.class))
+            else if (componentType.equals(RPCManager.class))
             {
-                componentImpl = RPCManagerImpl.class;
+               componentImpl = RPCManagerImpl.class;
             }
             else
             {

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -22,10 +22,10 @@
 package org.jboss.starobrno.factories;
 
 import org.jboss.cache.util.BeanUtils;
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.config.ConfigurationException;
 import org.jboss.starobrno.config.RuntimeConfig;
 import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
+import org.jboss.starobrno.remoting.RPCManager;
 
 import java.lang.reflect.Method;
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -22,7 +22,6 @@
 package org.jboss.starobrno.interceptors;
 
 
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.commands.VisitableCommand;
 import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
@@ -32,6 +31,7 @@
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.context.TransactionContext;
 import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -23,7 +23,6 @@
 
 import org.jboss.cache.util.concurrent.ConcurrentHashSet;
 import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.commands.VisitableCommand;
@@ -41,6 +40,7 @@
 import org.jboss.starobrno.jmx.annotations.ManagedOperation;
 import org.jboss.starobrno.lock.LockManager;
 import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.remoting.ReplicationException;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -21,7 +21,6 @@
  */
 package org.jboss.starobrno.interceptors.base;
 
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.cluster.ReplicationQueue;
 import org.jboss.starobrno.commands.CommandsFactory;
 import org.jboss.starobrno.commands.ReplicableCommand;
@@ -30,6 +29,7 @@
 import org.jboss.starobrno.context.TransactionContext;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.transaction.GlobalTransaction;
 import org.jboss.starobrno.transaction.TransactionTable;
 import org.jgroups.Address;

Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -1,154 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.jboss.starobrno.remoting;
-
-import org.jboss.cache.Fqn;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
-   /**
-    * Disconnects and closes the underlying JGroups channel.
-    */
-   void disconnect();
-
-   /**
-    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
-    * still open/connected.
-    */
-   void stop();
-
-   /**
-    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
-    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
-    * available before this method is called.
-    */
-   void start();
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * Invokes an RPC call on other caches in the cluster.
-    *
-    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
-    * @param cacheCommand        the cache command to invoke
-    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
-    * @param timeout             a timeout after which to throw a replication exception.
-    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
-    * @return a list of responses from each member contacted.
-    * @throws Exception in the event of problems.
-    */
-   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
-
-   /**
-    * @return true if the current Channel is the coordinator of the cluster.
-    */
-   boolean isCoordinator();
-
-   /**
-    * @return the Address of the current coordinator.
-    */
-   Address getCoordinator();
-
-   /**
-    * Retrieves the local JGroups channel's address
-    *
-    * @return an Address
-    */
-   Address getLocalAddress();
-
-   /**
-    * Returns a defensively copied list of  members in the current cluster view.
-    */
-   List<Address> getMembers();
-
-   /**
-    * Retrieves partial state from remote instances.
-    *
-    * @param sources           sources to consider for a state transfer
-    * @param sourceTarget      Fqn on source to retrieve state for
-    * @param integrationTarget integration point on local cache to apply state
-    * @throws Exception in the event of problems
-    */
-   void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception;
-
-   /**
-    * Retrieves partial state from remote instances.
-    *
-    * @param sources sources to consider for a state transfer
-    * @param subtree Fqn subtree to retrieve.  Will be integrated at the same point.
-    * @throws Exception in the event of problems
-    */
-   void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
-
-   /**
-    * Retrieves the Channel
-    *
-    * @return a channel
-    */
-   Channel getChannel();
-}
\ No newline at end of file

Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java (from rev 7261, core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster.  For now this is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+   /**
+    * Disconnects and closes the underlying JGroups channel.
+    */
+   void disconnect();
+
+   /**
+    * Stops the RPCDispatcher and frees resources.  Closes and disconnects the underlying JGroups channel if this is
+    * still open/connected.
+    */
+   void stop();
+
+   /**
+    * Starts the RPCManager by connecting the underlying JGroups channel (if configured for replication).  Connecting
+    * the channel may also involve state transfer (if configured) so the interceptor chain should be started and
+    * available before this method is called.
+    */
+   void start();
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param responseFilter      a response filter with which to filter out failed/unwanted/invalid responses.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param mode                the group request mode to use.  See {@link org.jgroups.blocks.GroupRequest}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * Invokes an RPC call on other caches in the cluster.
+    *
+    * @param recipients          a list of Addresses to invoke the call on.  If this is null, the call is broadcast to the entire cluster.
+    * @param cacheCommand        the cache command to invoke
+    * @param synchronous         if true, sets group request mode to {@link org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link org.jgroups.blocks.GroupRequest#GET_NONE}.
+    * @param timeout             a timeout after which to throw a replication exception.
+    * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.  See JGroups docs for more info.
+    * @return a list of responses from each member contacted.
+    * @throws Exception in the event of problems.
+    */
+   List<Object> callRemoteMethods(Vector<Address> recipients, ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean useOutOfBandMessage) throws Exception;
+
+   /**
+    * @return true if the current Channel is the coordinator of the cluster.
+    */
+   boolean isCoordinator();
+
+   /**
+    * @return the Address of the current coordinator.
+    */
+   Address getCoordinator();
+
+   /**
+    * Retrieves the local JGroups channel's address
+    *
+    * @return an Address
+    */
+   Address getLocalAddress();
+
+   /**
+    * Returns a defensively copied list of  members in the current cluster view.
+    */
+   List<Address> getMembers();
+
+
+   /**
+    * Retrieves the Channel
+    *
+    * @return a channel
+    */
+   Channel getChannel();
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:mergeinfo
   + 
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -23,13 +23,13 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.CacheSPI;
-import org.jboss.starobrno.RPCManager;
-import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.config.RuntimeConfig;
+import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.ComponentRegistry;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.Start;
@@ -44,10 +44,8 @@
 import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
 import org.jboss.starobrno.marshall.ExtendedMarshaller;
 import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.ChannelMessageListener;
 import org.jboss.starobrno.transaction.TransactionTable;
 import org.jboss.starobrno.util.ReflectionUtil;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
 import org.jgroups.*;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RspFilter;
@@ -148,11 +146,11 @@
 
       //otherwise just connect
       try
-         {
-            channel.connect(configuration.getClusterName());
-         }
-         catch (ChannelException e)
       {
+         channel.connect(configuration.getClusterName());
+      }
+      catch (ChannelException e)
+      {
          throw new CacheException("Unable to connect to JGroups channel", e);
       }
       if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
@@ -206,7 +204,7 @@
       channel.setOpt(Channel.AUTO_GETSTATE, false);
       channel.setOpt(Channel.BLOCK, true);
       rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+            invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
       checkAppropriateConfig();
       rpcDispatcher.setRequestMarshaller(extendedMarshaller);
       rpcDispatcher.setResponseMarshaller(extendedMarshaller);

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -24,12 +24,12 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.context.TransactionContext;
 import org.jboss.starobrno.factories.annotations.Inject;
 import org.jboss.starobrno.factories.annotations.NonVolatile;
 import org.jboss.starobrno.factories.context.ContextFactory;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jgroups.Address;
 
 import javax.transaction.Status;

Added: core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -0,0 +1,343 @@
+package org.jboss.starobrno.api.mvcc;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.jboss.cache.CacheFactory;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.UnitTestCacheFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.remoting.RPCManager;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.util.internals.ReplicationListener;
+import org.jgroups.Address;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.List;
+import java.util.Vector;
+
+ at Test(groups = {"functional", "jgroups", "transaction"})
+public class PutForExternalReadTest
+{
+   protected final String key = "k", value = "v", value2 = "v2";
+
+   protected CacheSPI<String, String> cache1, cache2;
+
+   ReplicationListener replListener1;
+   ReplicationListener replListener2;
+
+   protected TransactionManager tm1, tm2;
+
+   protected boolean useTx;
+
+
+   @BeforeMethod(alwaysRun = true)
+   public void setUp()
+   {
+
+      CacheFactory<String, String> cf = new UnitTestCacheFactory<String, String>();
+      Configuration c = new Configuration();
+      c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      cache1 = (CacheSPI<String, String>) cf.createCache(c, false);
+      cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+      cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very important for async tests!
+
+      cache1.start();
+      tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+
+      cache2 = (CacheSPI<String, String>) cf.createCache(cache1.getConfiguration().clone());
+
+      tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+      replListener1 = new ReplicationListener(cache1);
+      replListener2 = new ReplicationListener(cache2);
+
+      TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+   }
+
+   @AfterMethod(alwaysRun = true)
+   public void tearDown()
+   {
+      TestingUtil.killCaches(cache1, cache2);
+   }
+
+   public void testNoOpWhenKeyPresent()
+   {
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.putForExternalRead(key, value);
+      replListener2.waitForReplicationToOccur();
+
+
+      assertEquals("PFER should have succeeded", value, cache1.get(key));
+      assertEquals("PFER should have replicated", value, cache2.get(key));
+
+      // reset
+      replListener2.expect(RemoveCommand.class);
+      cache1.remove(key);
+      replListener2.waitForReplicationToOccur();
+
+      assert cache1.isEmpty() : "Should have reset";
+      assert cache2.isEmpty() : "Should have reset";
+
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.put(key, value);
+      replListener2.waitForReplicationToOccur();
+
+      // now this pfer should be a no-op
+      cache1.putForExternalRead(key, value2);
+
+      assertEquals("PFER should have been a no-op", value, cache1.get(key));
+      assertEquals("PFER should have been a no-op", value, cache2.get(key));
+   }
+
+   private Vector<Address> anyAddresses()
+   {
+      anyObject();
+      return null;
+   }
+
+   public void testAsyncForce() throws Exception
+   {
+      RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      List<Address> memberList = originalRpcManager.getMembers();
+      expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
+      // inject a mock RPC manager so that we can test whether calls made are sync or async.
+      ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
+      cr.registerComponent(rpcManager, RPCManager.class);
+      cr.rewire();
+
+      // specify what we expectWithTx called on the mock Rpc Manager.  For params we don't care about, just use ANYTHING.
+      // setting the mock object to expectWithTx the "sync" param to be false.
+      expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
+
+      replay(rpcManager);
+
+      // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
+      cache1.putForExternalRead(key, value);
+      verify(rpcManager);
+
+      // cleanup
+      TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
+      cache1.remove(key);
+   }
+
+   public void testTxSuspension() throws Exception
+   {
+      // create parent node first
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.put(key + "0", value);
+      replListener2.waitForReplicationToOccur();
+
+      // start a tx and do some stuff.
+      replListener2.expect(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.get(key + "0");
+      cache1.putForExternalRead(key, value); // should have happened in a separate tx and have committed already.
+      Transaction t = tm1.suspend();
+
+      replListener2.waitForReplicationToOccur();
+      assertEquals("PFER should have completed", value, cache1.get(key));
+      assertEquals("PFER should have completed", value, cache2.get(key));
+
+      tm1.resume(t);
+      tm1.commit();
+
+      assertEquals("parent fqn tx should have completed", value, cache1.get(key + "0"));
+      assertEquals("parent fqn tx should have completed", value, cache2.get(key + "0"));
+   }
+
+   public void testExceptionSuppression() throws Exception
+   {
+      RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+      try
+      {
+         List<Address> memberList = originalRpcManager.getMembers();
+         expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
+         expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
+         expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand) anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new RuntimeException("Barf!")).anyTimes();
+         replay(barfingRpcManager);
+
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager, RPCManager.class);
+         cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+         TestingUtil.extractComponentRegistry(cache1).rewire();
+
+         try
+         {
+            cache1.put(key, value);
+            fail("Should have barfed");
+         }
+         catch (RuntimeException re)
+         {
+         }
+
+         // clean up any indeterminate state left over
+         try
+         {
+            cache1.remove(key);
+            fail("Should have barfed");
+         }
+         catch (RuntimeException re)
+         {
+         }
+
+         assertNull("Should have cleaned up", cache1.get(key));
+
+         // should not barf
+         cache1.putForExternalRead(key, value);
+      }
+      finally
+      {
+         TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager, RPCManager.class);
+      }
+   }
+
+   public void testBasicPropagation() throws Exception
+   {
+      assert !cache1.containsKey(key);
+      assert !cache2.containsKey(key);
+
+      replListener2.expect(PutKeyValueCommand.class);
+      cache1.putForExternalRead(key, value);
+      replListener2.waitForReplicationToOccur();
+
+      assertEquals("PFER updated cache1", value, cache1.get(key));
+      assertEquals("PFER propagated to cache2 as expected", value, cache2.get(key));
+
+      // replication to cache 1 should NOT happen.
+      cache2.putForExternalRead(key, value + "0");
+
+      assertEquals("PFER updated cache2", value, cache2.get(key));
+      assertEquals("Cache1 should be unaffected", value, cache1.get(key));
+   }
+
+   /**
+    * Tests that setting a cacheModeLocal=true Option prevents propagation
+    * of the putForExternalRead().
+    *
+    * @throws Exception
+    */
+   public void testSimpleCacheModeLocal() throws Exception
+   {
+      cacheModeLocalTest(false);
+   }
+
+   /**
+    * Tests that setting a cacheModeLocal=true Option prevents propagation
+    * of the putForExternalRead() when the call occurs inside a transaction.
+    *
+    * @throws Exception
+    */
+   public void testCacheModeLocalInTx() throws Exception
+   {
+      cacheModeLocalTest(true);
+   }
+
+   private TransactionTable getTransactionTable(Cache cache)
+   {
+      return TestingUtil.extractComponent(cache, TransactionTable.class);
+   }
+
+   /**
+    * Tests that suspended transactions do not leak.  See JBCACHE-1246.
+    */
+   public void testMemLeakOnSuspendedTransactions() throws Exception
+   {
+      replListener2.expect(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.putForExternalRead(key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur();
+
+      TransactionTable tt1 = getTransactionTable(cache1);
+      TransactionTable tt2 = getTransactionTable(cache2);
+
+      assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+
+      System.out.println("PutForExternalReadTest.testMemLeakOnSuspendedTransactions");
+      //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+      replListener2.expectWithTx(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.putForExternalRead(key, value);
+      cache1.put(key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur();
+
+      assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+
+      replListener2.expectWithTx(PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.put(key, value);
+      cache1.putForExternalRead(key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur();
+
+      assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+
+      //do not expectWithTx a PFER replication, as the node already exists so this is a no-op
+      replListener2.expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+      tm1.begin();
+      cache1.put(key, value);
+      cache1.putForExternalRead(key, value);
+      cache1.put(key, value);
+      tm1.commit();
+      replListener2.waitForReplicationToOccur();
+
+      assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale global TXs";
+      assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale local TXs";
+      assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale global TXs";
+      assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale local TXs";
+   }
+
+   /**
+    * Tests that setting a cacheModeLocal=true Option prevents propagation
+    * of the putForExternalRead().
+    *
+    * @throws Exception
+    */
+   private void cacheModeLocalTest(boolean transactional) throws Exception
+   {
+      RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
+      RPCManager originalRpcManager = cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+
+      // inject a mock RPC manager so that we can test whether calls made are sync or async.
+      cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+
+      // specify that we expectWithTx nothing will be called on the mock Rpc Manager.
+      replay(rpcManager);
+
+      // now try a simple replication.  Since the RPCManager is a mock object it will not actually replicate anything.
+      if (transactional)
+         tm1.begin();
+
+      cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+      cache1.putForExternalRead(key, value);
+
+      if (transactional)
+         tm1.commit();
+
+      verify(rpcManager);
+      // cleanup
+      cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
+      cache1.remove(key);
+   }
+}

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -426,7 +426,7 @@
 
    public static void killTreeCaches(TreeCache... treeCaches)
    {
-      for (TreeCache tc: treeCaches)
+      for (TreeCache tc : treeCaches)
       {
          if (tc != null) killCaches(tc.getCache());
       }
@@ -485,19 +485,25 @@
 
    /**
     * Clears transaction with the current thread in the given transaction manager.
+    *
     * @param txManager a TransactionManager to be cleared
     */
-   public static void killTransaction(TransactionManager txManager) {
-      if (txManager != null) {
-         try {
+   public static void killTransaction(TransactionManager txManager)
+   {
+      if (txManager != null)
+      {
+         try
+         {
             txManager.rollback();
-         } catch (Exception e) {
-               // don't care
          }
+         catch (Exception e)
+         {
+            // don't care
+         }
       }
    }
-   
 
+
    /**
     * Clears any associated transactions with the current thread in the caches' transaction managers.
     */
@@ -674,4 +680,18 @@
    {
       dumpCacheContents(Arrays.asList(caches));
    }
+
+   /**
+    * Extracts a component of a given type from the cache's internal component registry
+    *
+    * @param cache
+    * @param componentType
+    * @param <T>
+    * @return
+    */
+   public static <T> T extractComponent(Cache cache, Class<T> componentType)
+   {
+      ComponentRegistry cr = extractComponentRegistry(cache);
+      return cr.getComponent(componentType);
+   }
 }

Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java	2008-12-10 15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java	2008-12-10 16:16:42 UTC (rev 7276)
@@ -1,16 +1,20 @@
 package org.jboss.starobrno.util.internals;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.jboss.starobrno.Cache;
-import org.jboss.starobrno.RPCManager;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.commands.remote.ReplicateCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
 import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.factories.ComponentRegistry;
 import org.jboss.starobrno.io.ByteBuffer;
 import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
 import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
 import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.remoting.RPCManager;
 import org.jboss.starobrno.util.TestingUtil;
 import org.jgroups.blocks.RpcDispatcher;
 import org.jgroups.util.Buffer;
@@ -50,6 +54,8 @@
 {
    private CountDownLatch latch = new CountDownLatch(1);
    private Set<Class<? extends ReplicableCommand>> expectedCommands;
+   Configuration configuration;
+   private static final Log log = LogFactory.getLog(ReplicationListener.class);
 
    /**
     * Builds a listener that will observe the given cache for recieving replication commands.
@@ -65,6 +71,7 @@
       realDispatcher.setMarshaller(delegate);
       realDispatcher.setRequestMarshaller(delegate);
       realDispatcher.setResponseMarshaller(delegate);
+      configuration = cache.getConfiguration();
    }
 
    private class MarshallerDelegate implements RpcDispatcher.Marshaller2
@@ -130,7 +137,7 @@
          }
          finally
          {
-            System.out.println("Processed command: " + realOne);
+            log.trace("Processed command: " + realOne);
             Iterator<Class<? extends ReplicableCommand>> it = expectedCommands.iterator();
             while (it.hasNext())
             {
@@ -138,7 +145,8 @@
                if (realOne.containsCommandType(replicableCommandClass))
                {
                   it.remove();
-               } else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
+               }
+               else if (realOne.getSingleModification() instanceof PrepareCommand) //explicit transaction
                {
                   PrepareCommand prepareCommand = (PrepareCommand) realOne.getSingleModification();
                   if (prepareCommand.containsModificationType(replicableCommandClass))
@@ -198,7 +206,7 @@
 
       public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception
       {
-         Object result =  realOne.objectFromByteBuffer(buffer, i , i1);
+         Object result = realOne.objectFromByteBuffer(buffer, i, i1);
          if (result instanceof ReplicateCommand && expectedCommands != null)
          {
             ReplicateCommand replicateCommand = (ReplicateCommand) result;
@@ -209,14 +217,22 @@
    }
 
    /**
+    * Waits for 1 minute
+    */
+   public void waitForReplicationToOccur()
+   {
+      waitForReplicationToOccur(60000);
+   }
+
+   /**
     * Blocks for the elements specified through {@link #expect(Class[])} invocations to be replicated in this cache.
     * if replication does not occur in the give timeout then an exception is being thrown.
     */
    public void waitForReplicationToOccur(long timeoutMillis)
    {
-      System.out.println("enter... ReplicationListener.waitForReplicationToOccur");
+      log.trace("enter... ReplicationListener.waitForReplicationToOccur");
       waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
-      System.out.println("exit... ReplicationListener.waitForReplicationToOccur");
+      log.trace("exit... ReplicationListener.waitForReplicationToOccur");
    }
 
    /**
@@ -266,4 +282,12 @@
    {
       expect();
    }
+
+   public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+   {
+      expect(PrepareCommand.class);
+      //this is because for async replication we have an 1pc transaction
+      if (configuration.getCacheMode().isSynchronous()) expect(CommitCommand.class);
+   }
+
 }
\ No newline at end of file




More information about the jbosscache-commits mailing list