Author: manik.surtani(a)jboss.com
Date: 2008-12-10 11:16:42 -0500 (Wed, 10 Dec 2008)
New Revision: 7276
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java
Removed:
core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
Log:
Fixed putForExternalread() and added unit tests
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java 2008-12-10 15:10:12
UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/Cache.java 2008-12-10 16:16:42
UTC (rev 7276)
@@ -40,7 +40,7 @@
/**
* Under special operating behavior, associates the value with the specified key.
* <ul>
- * <li> Only goes through if the key specified does not exist; no-op otherwise
(similar to {@link java.util.concurrent.ConcurrentMap#replace(Object, Object)})</i>
+ * <li> Only goes through if the key specified does not exist; no-op otherwise
(similar to {@link java.util.concurrent.ConcurrentMap#putIfAbsent(Object,
Object)})</i>
* <li> Force asynchronous mode for replication to prevent any
blocking.</li>
* <li> invalidation does not take place. </li>
* <li> 0ms lock timeout to prevent any blocking here either. If the lock is not
acquired, this method is a no-op, and swallows the timeout exception.</li>
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-12-10
15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheDelegate.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -49,6 +49,7 @@
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.marshall.ExtendedMarshaller;
import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
@@ -224,7 +225,7 @@
ctx.getOptionOverrides().setFailSilently(true);
ctx.getOptionOverrides().setForceAsynchronous(true);
ctx.getOptionOverrides().setLockAcquisitionTimeout(0);
- replace(key, value);
+ putIfAbsent(key, value);
}
catch (Exception e)
{
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-12-10 15:10:12
UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/CacheSPI.java 2008-12-10 16:16:42
UTC (rev 7276)
@@ -36,6 +36,7 @@
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.marshall.ExtendedMarshaller;
import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.statetransfer.StateTransferManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-12-10
15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -1,135 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.starobrno;
-
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options
injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
- /**
- * Disconnects and closes the underlying JGroups channel.
- */
- void disconnect();
-
- /**
- * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying
JGroups channel if this is
- * still open/connected.
- */
- void stop();
-
- /**
- * Starts the RPCManager by connecting the underlying JGroups channel (if configured
for replication). Connecting
- * the channel may also involve state transfer (if configured) so the interceptor
chain should be started and
- * available before this method is called.
- */
- void start();
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage)
throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean
useOutOfBandMessage) throws Exception;
-
- /**
- * @return true if the current Channel is the coordinator of the cluster.
- */
- boolean isCoordinator();
-
- /**
- * @return the Address of the current coordinator.
- */
- Address getCoordinator();
-
- /**
- * Retrieves the local JGroups channel's address
- *
- * @return an Address
- */
- Address getLocalAddress();
-
- /**
- * Returns a defensively copied list of members in the current cluster view.
- */
- List<Address> getMembers();
-
-
- /**
- * Retrieves the Channel
- *
- * @return a channel
- */
- Channel getChannel();
-}
\ No newline at end of file
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/cluster/ReplicationQueue.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -23,14 +23,14 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.starobrno.commands.CommandsFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.remote.ReplicateCommand;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
import org.jboss.starobrno.factories.annotations.Stop;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jboss.starobrno.commands.CommandsFactory;
-import org.jboss.starobrno.commands.remote.ReplicateCommand;
-import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.remoting.RPCManager;
import java.util.ArrayList;
import java.util.LinkedList;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/commands/write/PutKeyValueCommand.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -77,24 +77,28 @@
public Object perform(InvocationContext ctx) throws Throwable
{
- notifier.notifyCacheEntryModified(key, true, ctx);
+ Object o = null;
MVCCEntry e = ctx.lookupEntry(key);
- Object o;
- if (value instanceof Delta)
+ if (e.getValue() == null || !putIfAbsent)
{
- // magic
- Delta dv = (Delta) value;
- Object existing = e.getValue();
- DeltaAware toMergeWith = null;
- if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
- e.setValue(dv.merge(toMergeWith));
- o = existing;
+ notifier.notifyCacheEntryModified(key, true, ctx);
+
+ if (value instanceof Delta)
+ {
+ // magic
+ Delta dv = (Delta) value;
+ Object existing = e.getValue();
+ DeltaAware toMergeWith = null;
+ if (existing instanceof DeltaAware) toMergeWith = (DeltaAware) existing;
+ e.setValue(dv.merge(toMergeWith));
+ o = existing;
+ }
+ else
+ {
+ o = e.setValue(value);
+ }
+ notifier.notifyCacheEntryModified(key, false, ctx);
}
- else
- {
- o = e.setValue(value);
- }
- notifier.notifyCacheEntryModified(key, false, ctx);
return o;
}
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/config/RuntimeConfig.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -21,8 +21,8 @@
*/
package org.jboss.starobrno.config;
-import org.jboss.cache.RPCManager;
import org.jboss.cache.buddyreplication.BuddyGroup;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelFactory;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/factories/EmptyConstructorFactory.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -26,7 +26,6 @@
import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
import org.jboss.cache.invocation.CacheInvocationDelegate;
import org.jboss.cache.lock.LockStrategyFactory;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.batch.BatchContainer;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.config.ConfigurationException;
@@ -39,6 +38,7 @@
import org.jboss.starobrno.marshall.VersionAwareMarshaller;
import org.jboss.starobrno.notifications.Notifier;
import org.jboss.starobrno.remoting.ChannelMessageListener;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.remoting.RPCManagerImpl;
import org.jboss.starobrno.transaction.TransactionTable;
@@ -67,10 +67,9 @@
{
componentImpl = VersionAwareMarshaller.class;
}
- else
- if (componentType.equals(RPCManager.class))
+ else if (componentType.equals(RPCManager.class))
{
- componentImpl = RPCManagerImpl.class;
+ componentImpl = RPCManagerImpl.class;
}
else
{
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/factories/RuntimeConfigAwareFactory.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -22,10 +22,10 @@
package org.jboss.starobrno.factories;
import org.jboss.cache.util.BeanUtils;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.config.ConfigurationException;
import org.jboss.starobrno.config.RuntimeConfig;
import org.jboss.starobrno.factories.annotations.DefaultFactoryFor;
+import org.jboss.starobrno.remoting.RPCManager;
import java.lang.reflect.Method;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/InvocationContextInterceptor.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -22,7 +22,6 @@
package org.jboss.starobrno.interceptors;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.VisitableCommand;
import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
@@ -32,6 +31,7 @@
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.context.TransactionContext;
import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/TxInterceptor.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -23,7 +23,6 @@
import org.jboss.cache.util.concurrent.ConcurrentHashSet;
import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.commands.VisitableCommand;
@@ -41,6 +40,7 @@
import org.jboss.starobrno.jmx.annotations.ManagedOperation;
import org.jboss.starobrno.lock.LockManager;
import org.jboss.starobrno.notifications.Notifier;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.remoting.ReplicationException;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/interceptors/base/BaseRpcInterceptor.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -21,7 +21,6 @@
*/
package org.jboss.starobrno.interceptors.base;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.cluster.ReplicationQueue;
import org.jboss.starobrno.commands.CommandsFactory;
import org.jboss.starobrno.commands.ReplicableCommand;
@@ -30,6 +29,7 @@
import org.jboss.starobrno.context.TransactionContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.transaction.GlobalTransaction;
import org.jboss.starobrno.transaction.TransactionTable;
import org.jgroups.Address;
Deleted: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -1,154 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.jboss.starobrno.remoting;
-
-import org.jboss.cache.Fqn;
-import org.jboss.starobrno.commands.ReplicableCommand;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.blocks.RspFilter;
-
-import java.util.List;
-import java.util.Vector;
-
-/**
- * Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
- * transport, and in future more transport options may become available.
- * <p/>
- * Implementations have a simple lifecycle:
- * <ul>
- * <li>start() - starts the underlying channel based on configuration options
injected, and connects the channel</li>
- * <li>disconnect() - disconnects the channel</li>
- * <li>stop() - stops the dispatcher and releases resources</li>
- * </ul>
- *
- * @author Manik Surtani
- * @since 2.1.0
- */
-public interface RPCManager
-{
- /**
- * Disconnects and closes the underlying JGroups channel.
- */
- void disconnect();
-
- /**
- * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying
JGroups channel if this is
- * still open/connected.
- */
- void stop();
-
- /**
- * Starts the RPCManager by connecting the underlying JGroups channel (if configured
for replication). Connecting
- * the channel may also involve state transfer (if configured) so the interceptor
chain should be started and
- * available before this method is called.
- */
- void start();
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage)
throws Exception;
-
- /**
- * Invokes an RPC call on other caches in the cluster.
- *
- * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
- * @param cacheCommand the cache command to invoke
- * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
- * @param timeout a timeout after which to throw a replication exception.
- * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
- * @return a list of responses from each member contacted.
- * @throws Exception in the event of problems.
- */
- List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean
useOutOfBandMessage) throws Exception;
-
- /**
- * @return true if the current Channel is the coordinator of the cluster.
- */
- boolean isCoordinator();
-
- /**
- * @return the Address of the current coordinator.
- */
- Address getCoordinator();
-
- /**
- * Retrieves the local JGroups channel's address
- *
- * @return an Address
- */
- Address getLocalAddress();
-
- /**
- * Returns a defensively copied list of members in the current cluster view.
- */
- List<Address> getMembers();
-
- /**
- * Retrieves partial state from remote instances.
- *
- * @param sources sources to consider for a state transfer
- * @param sourceTarget Fqn on source to retrieve state for
- * @param integrationTarget integration point on local cache to apply state
- * @throws Exception in the event of problems
- */
- void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn
integrationTarget) throws Exception;
-
- /**
- * Retrieves partial state from remote instances.
- *
- * @param sources sources to consider for a state transfer
- * @param subtree Fqn subtree to retrieve. Will be integrated at the same point.
- * @throws Exception in the event of problems
- */
- void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception;
-
- /**
- * Retrieves the Channel
- *
- * @return a channel
- */
- Channel getChannel();
-}
\ No newline at end of file
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
(from rev 7261, core/branches/flat/src/main/java/org/jboss/starobrno/RPCManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
(rev 0)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -0,0 +1,135 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jgroups.Address;
+import org.jgroups.Channel;
+import org.jgroups.blocks.RspFilter;
+
+import java.util.List;
+import java.util.Vector;
+
+/**
+ * Provides a mechanism for communicating with other caches in the cluster. For now this
is based on JGroups as an underlying
+ * transport, and in future more transport options may become available.
+ * <p/>
+ * Implementations have a simple lifecycle:
+ * <ul>
+ * <li>start() - starts the underlying channel based on configuration options
injected, and connects the channel</li>
+ * <li>disconnect() - disconnects the channel</li>
+ * <li>stop() - stops the dispatcher and releases resources</li>
+ * </ul>
+ *
+ * @author Manik Surtani
+ * @since 2.1.0
+ */
+public interface RPCManager
+{
+ /**
+ * Disconnects and closes the underlying JGroups channel.
+ */
+ void disconnect();
+
+ /**
+ * Stops the RPCDispatcher and frees resources. Closes and disconnects the underlying
JGroups channel if this is
+ * still open/connected.
+ */
+ void stop();
+
+ /**
+ * Starts the RPCManager by connecting the underlying JGroups channel (if configured
for replication). Connecting
+ * the channel may also involve state transfer (if configured) so the interceptor
chain should be started and
+ * available before this method is called.
+ */
+ void start();
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param responseFilter a response filter with which to filter out
failed/unwanted/invalid responses.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, RspFilter responseFilter, boolean
useOutOfBandMessage) throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param mode the group request mode to use. See {@link
org.jgroups.blocks.GroupRequest}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, int mode, long timeout, boolean useOutOfBandMessage)
throws Exception;
+
+ /**
+ * Invokes an RPC call on other caches in the cluster.
+ *
+ * @param recipients a list of Addresses to invoke the call on. If this is
null, the call is broadcast to the entire cluster.
+ * @param cacheCommand the cache command to invoke
+ * @param synchronous if true, sets group request mode to {@link
org.jgroups.blocks.GroupRequest#GET_ALL}, and if false sets it to {@link
org.jgroups.blocks.GroupRequest#GET_NONE}.
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param useOutOfBandMessage if true, the message is put on JGroups' OOB queue.
See JGroups docs for more info.
+ * @return a list of responses from each member contacted.
+ * @throws Exception in the event of problems.
+ */
+ List<Object> callRemoteMethods(Vector<Address> recipients,
ReplicableCommand cacheCommand, boolean synchronous, long timeout, boolean
useOutOfBandMessage) throws Exception;
+
+ /**
+ * @return true if the current Channel is the coordinator of the cluster.
+ */
+ boolean isCoordinator();
+
+ /**
+ * @return the Address of the current coordinator.
+ */
+ Address getCoordinator();
+
+ /**
+ * Retrieves the local JGroups channel's address
+ *
+ * @return an Address
+ */
+ Address getLocalAddress();
+
+ /**
+ * Returns a defensively copied list of members in the current cluster view.
+ */
+ List<Address> getMembers();
+
+
+ /**
+ * Retrieves the Channel
+ *
+ * @return a channel
+ */
+ Channel getChannel();
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManager.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:mergeinfo
+
Name: svn:eol-style
+ LF
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -23,13 +23,13 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
-import org.jboss.starobrno.RPCManager;
-import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.RuntimeConfig;
+import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.ComponentRegistry;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.Start;
@@ -44,10 +44,8 @@
import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
import org.jboss.starobrno.marshall.ExtendedMarshaller;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.remoting.ChannelMessageListener;
import org.jboss.starobrno.transaction.TransactionTable;
import org.jboss.starobrno.util.ReflectionUtil;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
@@ -148,11 +146,11 @@
//otherwise just connect
try
- {
- channel.connect(configuration.getClusterName());
- }
- catch (ChannelException e)
{
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
throw new CacheException("Unable to connect to JGroups channel", e);
}
if (log.isInfoEnabled()) log.info("Cache local address is " +
getLocalAddress());
@@ -206,7 +204,7 @@
channel.setOpt(Channel.AUTO_GETSTATE, false);
channel.setOpt(Channel.BLOCK, true);
rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new
MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry);
+ invocationContextContainer, invocationContextContainer, interceptorChain,
componentRegistry);
checkAppropriateConfig();
rpcDispatcher.setRequestMarshaller(extendedMarshaller);
rpcDispatcher.setResponseMarshaller(extendedMarshaller);
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java
===================================================================
---
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/main/java/org/jboss/starobrno/transaction/TransactionTable.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -24,12 +24,12 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.CacheException;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.context.TransactionContext;
import org.jboss.starobrno.factories.annotations.Inject;
import org.jboss.starobrno.factories.annotations.NonVolatile;
import org.jboss.starobrno.factories.context.ContextFactory;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jgroups.Address;
import javax.transaction.Status;
Added:
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/api/mvcc/PutForExternalReadTest.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -0,0 +1,343 @@
+package org.jboss.starobrno.api.mvcc;
+
+import org.easymock.EasyMock;
+import static org.easymock.EasyMock.*;
+import org.jboss.cache.CacheFactory;
+import org.jboss.starobrno.Cache;
+import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.UnitTestCacheFactory;
+import org.jboss.starobrno.commands.ReplicableCommand;
+import org.jboss.starobrno.commands.write.PutKeyValueCommand;
+import org.jboss.starobrno.commands.write.RemoveCommand;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.ComponentRegistry;
+import org.jboss.starobrno.remoting.RPCManager;
+import org.jboss.starobrno.transaction.TransactionTable;
+import org.jboss.starobrno.util.TestingUtil;
+import org.jboss.starobrno.util.internals.ReplicationListener;
+import org.jgroups.Address;
+import static org.testng.AssertJUnit.*;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import java.util.List;
+import java.util.Vector;
+
+@Test(groups = {"functional", "jgroups", "transaction"})
+public class PutForExternalReadTest
+{
+ protected final String key = "k", value = "v", value2 =
"v2";
+
+ protected CacheSPI<String, String> cache1, cache2;
+
+ ReplicationListener replListener1;
+ ReplicationListener replListener2;
+
+ protected TransactionManager tm1, tm2;
+
+ protected boolean useTx;
+
+
+ @BeforeMethod(alwaysRun = true)
+ public void setUp()
+ {
+
+ CacheFactory<String, String> cf = new UnitTestCacheFactory<String,
String>();
+ Configuration c = new Configuration();
+ c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ cache1 = (CacheSPI<String, String>) cf.createCache(c, false);
+
cache1.getConfiguration().setTransactionManagerLookupClass("org.jboss.cache.transaction.DummyTransactionManagerLookup");
+ cache1.getConfiguration().setSerializationExecutorPoolSize(0);//this is very
important for async tests!
+
+ cache1.start();
+ tm1 = cache1.getConfiguration().getRuntimeConfig().getTransactionManager();
+
+ cache2 = (CacheSPI<String, String>)
cf.createCache(cache1.getConfiguration().clone());
+
+ tm2 = cache2.getConfiguration().getRuntimeConfig().getTransactionManager();
+ replListener1 = new ReplicationListener(cache1);
+ replListener2 = new ReplicationListener(cache2);
+
+ TestingUtil.blockUntilViewsReceived(10000, cache1, cache2);
+ }
+
+ @AfterMethod(alwaysRun = true)
+ public void tearDown()
+ {
+ TestingUtil.killCaches(cache1, cache2);
+ }
+
+ public void testNoOpWhenKeyPresent()
+ {
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.putForExternalRead(key, value);
+ replListener2.waitForReplicationToOccur();
+
+
+ assertEquals("PFER should have succeeded", value, cache1.get(key));
+ assertEquals("PFER should have replicated", value, cache2.get(key));
+
+ // reset
+ replListener2.expect(RemoveCommand.class);
+ cache1.remove(key);
+ replListener2.waitForReplicationToOccur();
+
+ assert cache1.isEmpty() : "Should have reset";
+ assert cache2.isEmpty() : "Should have reset";
+
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.put(key, value);
+ replListener2.waitForReplicationToOccur();
+
+ // now this pfer should be a no-op
+ cache1.putForExternalRead(key, value2);
+
+ assertEquals("PFER should have been a no-op", value, cache1.get(key));
+ assertEquals("PFER should have been a no-op", value, cache2.get(key));
+ }
+
+ private Vector<Address> anyAddresses()
+ {
+ anyObject();
+ return null;
+ }
+
+ public void testAsyncForce() throws Exception
+ {
+ RPCManager rpcManager = EasyMock.createNiceMock(RPCManager.class);
+ RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ List<Address> memberList = originalRpcManager.getMembers();
+ expect(rpcManager.getMembers()).andReturn(memberList).anyTimes();
+ // inject a mock RPC manager so that we can test whether calls made are sync or
async.
+ ComponentRegistry cr = TestingUtil.extractComponentRegistry(cache1);
+ cr.registerComponent(rpcManager, RPCManager.class);
+ cr.rewire();
+
+ // specify what we expectWithTx called on the mock Rpc Manager. For params we
don't care about, just use ANYTHING.
+ // setting the mock object to expectWithTx the "sync" param to be false.
+ expect(rpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), eq(false), anyLong(), anyBoolean())).andReturn(null);
+
+ replay(rpcManager);
+
+ // now try a simple replication. Since the RPCManager is a mock object it will not
actually replicate anything.
+ cache1.putForExternalRead(key, value);
+ verify(rpcManager);
+
+ // cleanup
+ TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager,
RPCManager.class);
+ cache1.remove(key);
+ }
+
+ public void testTxSuspension() throws Exception
+ {
+ // create parent node first
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.put(key + "0", value);
+ replListener2.waitForReplicationToOccur();
+
+ // start a tx and do some stuff.
+ replListener2.expect(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.get(key + "0");
+ cache1.putForExternalRead(key, value); // should have happened in a separate tx and
have committed already.
+ Transaction t = tm1.suspend();
+
+ replListener2.waitForReplicationToOccur();
+ assertEquals("PFER should have completed", value, cache1.get(key));
+ assertEquals("PFER should have completed", value, cache2.get(key));
+
+ tm1.resume(t);
+ tm1.commit();
+
+ assertEquals("parent fqn tx should have completed", value, cache1.get(key
+ "0"));
+ assertEquals("parent fqn tx should have completed", value, cache2.get(key
+ "0"));
+ }
+
+ public void testExceptionSuppression() throws Exception
+ {
+ RPCManager barfingRpcManager = EasyMock.createNiceMock(RPCManager.class);
+ RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+ try
+ {
+ List<Address> memberList = originalRpcManager.getMembers();
+ expect(barfingRpcManager.getMembers()).andReturn(memberList).anyTimes();
+
expect(barfingRpcManager.getLocalAddress()).andReturn(originalRpcManager.getLocalAddress()).anyTimes();
+ expect(barfingRpcManager.callRemoteMethods(anyAddresses(), (ReplicableCommand)
anyObject(), anyBoolean(), anyLong(), anyBoolean())).andThrow(new
RuntimeException("Barf!")).anyTimes();
+ replay(barfingRpcManager);
+
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(barfingRpcManager,
RPCManager.class);
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(barfingRpcManager);
+ TestingUtil.extractComponentRegistry(cache1).rewire();
+
+ try
+ {
+ cache1.put(key, value);
+ fail("Should have barfed");
+ }
+ catch (RuntimeException re)
+ {
+ }
+
+ // clean up any indeterminate state left over
+ try
+ {
+ cache1.remove(key);
+ fail("Should have barfed");
+ }
+ catch (RuntimeException re)
+ {
+ }
+
+ assertNull("Should have cleaned up", cache1.get(key));
+
+ // should not barf
+ cache1.putForExternalRead(key, value);
+ }
+ finally
+ {
+
TestingUtil.extractComponentRegistry(cache1).registerComponent(originalRpcManager,
RPCManager.class);
+ }
+ }
+
+ public void testBasicPropagation() throws Exception
+ {
+ assert !cache1.containsKey(key);
+ assert !cache2.containsKey(key);
+
+ replListener2.expect(PutKeyValueCommand.class);
+ cache1.putForExternalRead(key, value);
+ replListener2.waitForReplicationToOccur();
+
+ assertEquals("PFER updated cache1", value, cache1.get(key));
+ assertEquals("PFER propagated to cache2 as expected", value,
cache2.get(key));
+
+ // replication to cache 1 should NOT happen.
+ cache2.putForExternalRead(key, value + "0");
+
+ assertEquals("PFER updated cache2", value, cache2.get(key));
+ assertEquals("Cache1 should be unaffected", value, cache1.get(key));
+ }
+
+ /**
+ * Tests that setting a cacheModeLocal=true Option prevents propagation
+ * of the putForExternalRead().
+ *
+ * @throws Exception
+ */
+ public void testSimpleCacheModeLocal() throws Exception
+ {
+ cacheModeLocalTest(false);
+ }
+
+ /**
+ * Tests that setting a cacheModeLocal=true Option prevents propagation
+ * of the putForExternalRead() when the call occurs inside a transaction.
+ *
+ * @throws Exception
+ */
+ public void testCacheModeLocalInTx() throws Exception
+ {
+ cacheModeLocalTest(true);
+ }
+
+ private TransactionTable getTransactionTable(Cache cache)
+ {
+ return TestingUtil.extractComponent(cache, TransactionTable.class);
+ }
+
+ /**
+ * Tests that suspended transactions do not leak. See JBCACHE-1246.
+ */
+ public void testMemLeakOnSuspendedTransactions() throws Exception
+ {
+ replListener2.expect(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.putForExternalRead(key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur();
+
+ TransactionTable tt1 = getTransactionTable(cache1);
+ TransactionTable tt2 = getTransactionTable(cache2);
+
+ assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
+ assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
+ assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale
global TXs";
+ assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale
local TXs";
+
+
System.out.println("PutForExternalReadTest.testMemLeakOnSuspendedTransactions");
+ //do not expectWithTx a PFER replication, as the node already exists so this is a
no-op
+ replListener2.expectWithTx(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.putForExternalRead(key, value);
+ cache1.put(key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur();
+
+ assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
+ assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
+ assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale
global TXs";
+ assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale
local TXs";
+
+ replListener2.expectWithTx(PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.put(key, value);
+ cache1.putForExternalRead(key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur();
+
+ assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
+ assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
+ assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale
global TXs";
+ assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale
local TXs";
+
+ //do not expectWithTx a PFER replication, as the node already exists so this is a
no-op
+ replListener2.expectWithTx(PutKeyValueCommand.class, PutKeyValueCommand.class);
+ tm1.begin();
+ cache1.put(key, value);
+ cache1.putForExternalRead(key, value);
+ cache1.put(key, value);
+ tm1.commit();
+ replListener2.waitForReplicationToOccur();
+
+ assert tt1.getNumGlobalTransactions() == 0 : "Cache 1 should have no stale
global TXs";
+ assert tt1.getNumLocalTransactions() == 0 : "Cache 1 should have no stale
local TXs";
+ assert tt2.getNumGlobalTransactions() == 0 : "Cache 2 should have no stale
global TXs";
+ assert tt2.getNumLocalTransactions() == 0 : "Cache 2 should have no stale
local TXs";
+ }
+
+ /**
+ * Tests that setting a cacheModeLocal=true Option prevents propagation
+ * of the putForExternalRead().
+ *
+ * @throws Exception
+ */
+ private void cacheModeLocalTest(boolean transactional) throws Exception
+ {
+ RPCManager rpcManager = EasyMock.createMock(RPCManager.class);
+ RPCManager originalRpcManager =
cache1.getConfiguration().getRuntimeConfig().getRPCManager();
+
+ // inject a mock RPC manager so that we can test whether calls made are sync or
async.
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(rpcManager);
+
+ // specify that we expectWithTx nothing will be called on the mock Rpc Manager.
+ replay(rpcManager);
+
+ // now try a simple replication. Since the RPCManager is a mock object it will not
actually replicate anything.
+ if (transactional)
+ tm1.begin();
+
+ cache1.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+ cache1.putForExternalRead(key, value);
+
+ if (transactional)
+ tm1.commit();
+
+ verify(rpcManager);
+ // cleanup
+ cache1.getConfiguration().getRuntimeConfig().setRPCManager(originalRpcManager);
+ cache1.remove(key);
+ }
+}
Modified: core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-12-10
15:10:12 UTC (rev 7275)
+++ core/branches/flat/src/test/java/org/jboss/starobrno/util/TestingUtil.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -426,7 +426,7 @@
public static void killTreeCaches(TreeCache... treeCaches)
{
- for (TreeCache tc: treeCaches)
+ for (TreeCache tc : treeCaches)
{
if (tc != null) killCaches(tc.getCache());
}
@@ -485,19 +485,25 @@
/**
* Clears transaction with the current thread in the given transaction manager.
+ *
* @param txManager a TransactionManager to be cleared
*/
- public static void killTransaction(TransactionManager txManager) {
- if (txManager != null) {
- try {
+ public static void killTransaction(TransactionManager txManager)
+ {
+ if (txManager != null)
+ {
+ try
+ {
txManager.rollback();
- } catch (Exception e) {
- // don't care
}
+ catch (Exception e)
+ {
+ // don't care
+ }
}
}
-
+
/**
* Clears any associated transactions with the current thread in the caches'
transaction managers.
*/
@@ -674,4 +680,18 @@
{
dumpCacheContents(Arrays.asList(caches));
}
+
+ /**
+ * Extracts a component of a given type from the cache's internal component
registry
+ *
+ * @param cache
+ * @param componentType
+ * @param <T>
+ * @return
+ */
+ public static <T> T extractComponent(Cache cache, Class<T> componentType)
+ {
+ ComponentRegistry cr = extractComponentRegistry(cache);
+ return cr.getComponent(componentType);
+ }
}
Modified:
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java
===================================================================
---
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java 2008-12-10
15:10:12 UTC (rev 7275)
+++
core/branches/flat/src/test/java/org/jboss/starobrno/util/internals/ReplicationListener.java 2008-12-10
16:16:42 UTC (rev 7276)
@@ -1,16 +1,20 @@
package org.jboss.starobrno.util.internals;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.jboss.starobrno.Cache;
-import org.jboss.starobrno.RPCManager;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.commands.remote.ReplicateCommand;
+import org.jboss.starobrno.commands.tx.CommitCommand;
import org.jboss.starobrno.commands.tx.PrepareCommand;
+import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.factories.ComponentRegistry;
import org.jboss.starobrno.io.ByteBuffer;
import org.jboss.starobrno.marshall.CacheMarshallerStarobrno;
import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
import org.jboss.starobrno.marshall.ExtendedMarshaller;
+import org.jboss.starobrno.remoting.RPCManager;
import org.jboss.starobrno.util.TestingUtil;
import org.jgroups.blocks.RpcDispatcher;
import org.jgroups.util.Buffer;
@@ -50,6 +54,8 @@
{
private CountDownLatch latch = new CountDownLatch(1);
private Set<Class<? extends ReplicableCommand>> expectedCommands;
+ Configuration configuration;
+ private static final Log log = LogFactory.getLog(ReplicationListener.class);
/**
* Builds a listener that will observe the given cache for recieving replication
commands.
@@ -65,6 +71,7 @@
realDispatcher.setMarshaller(delegate);
realDispatcher.setRequestMarshaller(delegate);
realDispatcher.setResponseMarshaller(delegate);
+ configuration = cache.getConfiguration();
}
private class MarshallerDelegate implements RpcDispatcher.Marshaller2
@@ -130,7 +137,7 @@
}
finally
{
- System.out.println("Processed command: " + realOne);
+ log.trace("Processed command: " + realOne);
Iterator<Class<? extends ReplicableCommand>> it =
expectedCommands.iterator();
while (it.hasNext())
{
@@ -138,7 +145,8 @@
if (realOne.containsCommandType(replicableCommandClass))
{
it.remove();
- } else if (realOne.getSingleModification() instanceof PrepareCommand)
//explicit transaction
+ }
+ else if (realOne.getSingleModification() instanceof PrepareCommand)
//explicit transaction
{
PrepareCommand prepareCommand = (PrepareCommand)
realOne.getSingleModification();
if (prepareCommand.containsModificationType(replicableCommandClass))
@@ -198,7 +206,7 @@
public Object objectFromByteBuffer(byte[] buffer, int i, int i1) throws Exception
{
- Object result = realOne.objectFromByteBuffer(buffer, i , i1);
+ Object result = realOne.objectFromByteBuffer(buffer, i, i1);
if (result instanceof ReplicateCommand && expectedCommands != null)
{
ReplicateCommand replicateCommand = (ReplicateCommand) result;
@@ -209,14 +217,22 @@
}
/**
+ * Waits for 1 minute
+ */
+ public void waitForReplicationToOccur()
+ {
+ waitForReplicationToOccur(60000);
+ }
+
+ /**
* Blocks for the elements specified through {@link #expect(Class[])} invocations to
be replicated in this cache.
* if replication does not occur in the give timeout then an exception is being
thrown.
*/
public void waitForReplicationToOccur(long timeoutMillis)
{
- System.out.println("enter...
ReplicationListener.waitForReplicationToOccur");
+ log.trace("enter... ReplicationListener.waitForReplicationToOccur");
waitForReplicationToOccur(timeoutMillis, TimeUnit.MILLISECONDS);
- System.out.println("exit...
ReplicationListener.waitForReplicationToOccur");
+ log.trace("exit... ReplicationListener.waitForReplicationToOccur");
}
/**
@@ -266,4 +282,12 @@
{
expect();
}
+
+ public void expectWithTx(Class<? extends ReplicableCommand>... writeCommands)
+ {
+ expect(PrepareCommand.class);
+ //this is because for async replication we have an 1pc transaction
+ if (configuration.getCacheMode().isSynchronous()) expect(CommitCommand.class);
+ }
+
}
\ No newline at end of file