Author: mircea.markus
Date: 2009-02-19 08:03:56 -0500 (Thu, 19 Feb 2009)
New Revision: 7731
Added:
core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
core/branches/flat/src/main/resources/config-samples/all.xml
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
core/branches/flat/src/test/resources/configs/named-cache-test.xml
Log:
added support for replication queues
Modified:
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-02-19
12:53:45 UTC (rev 7730)
+++
core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -92,8 +92,7 @@
}
}
- private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand)
- throws Throwable {
+ private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand)
throws Throwable {
Object result;
try {
if (trace) log.trace("Invoking command " + cacheCommand + ", with
originLocal flag set to false.");
@@ -107,7 +106,7 @@
result = null;
}
} else {
- throw new RuntimeException("Do we still need to deal with non-visitable
commands?");
+ throw new RuntimeException("Do we still need to deal with non-visitable
commands? (" + cacheCommand.getClass().getName() + ")");
// result = cacheCommand.perform(null);
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-02-19
12:53:45 UTC (rev 7730)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -103,8 +103,7 @@
* If we are within one transaction we won't do any replication as replication
would only be performed at commit
* time. If the operation didn't originate locally we won't do any replication
either.
*/
- private Object handleCrudMethod(InvocationContext ctx, WriteCommand command)
- throws Throwable {
+ private Object handleCrudMethod(InvocationContext ctx, WriteCommand command) throws
Throwable {
boolean local = isLocalModeForced(ctx);
if (local && ctx.getTransaction() == null) return
invokeNextInterceptor(ctx, command);
// FIRST pass this call up the chain. Only if it succeeds (no exceptions) locally
do we attempt to replicate.
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java 2009-02-19
12:53:45 UTC (rev 7730)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -25,6 +25,7 @@
import org.horizon.commands.CommandsFactory;
import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.remote.ReplicateCommand;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
import org.horizon.factories.annotations.Inject;
@@ -88,7 +89,7 @@
}
protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean
sync, boolean useOutOfBandMessage) throws Throwable {
- replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync,
useOutOfBandMessage);
+ replicateCall(ctx, null, call, sync, useOutOfBandMessage);
}
protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync)
throws Throwable {
@@ -96,10 +97,10 @@
}
protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean
sync) throws Throwable {
- replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync,
false);
+ replicateCall(ctx, null, call, sync, false);
}
- protected void replicateCall(InvocationContext ctx, List<Address> recipients,
RPCCommand c, boolean sync, boolean useOutOfBandMessage) throws Throwable {
+ protected void replicateCall(InvocationContext ctx, List<Address> recipients,
ReplicableCommand c, boolean sync, boolean useOutOfBandMessage) throws Throwable {
long syncReplTimeout = configuration.getSyncReplTimeout();
if (ctx.hasOption(Options.FORCE_ASYNCHRONOUS)) sync = false;
@@ -118,7 +119,7 @@
replicateCall(recipients, c, sync, useOutOfBandMessage, syncReplTimeout);
}
- protected void replicateCall(List<Address> recipients, RPCCommand call, boolean
sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
+ protected void replicateCall(List<Address> recipients, ReplicableCommand call,
boolean sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
if (trace) log.trace("Broadcasting call " + call + " to recipient
list " + recipients);
if (!sync && replicationQueue != null) {
@@ -131,9 +132,10 @@
if (trace)
log.trace("Setting call recipients to " + callRecipients +
" since the original list of recipients passed in is null.");
}
+ ReplicateCommand command = commandsFactory.buildReplicateCommand(call);
List rsps = rpcManager.invokeRemotely(callRecipients,
- call,
+ command,
sync ? ResponseMode.SYNCHRONOUS :
ResponseMode.ASYNCHRONOUS, // is synchronised?
timeout,
useOutOfBandMessage
Modified: core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-02-19
12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -22,7 +22,6 @@
package org.horizon.remoting;
import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.RPCCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.commands.remote.ReplicateCommand;
import org.horizon.config.Configuration;
@@ -59,7 +58,7 @@
/**
* Holds the replication jobs.
*/
- final List<RPCCommand> elements = new LinkedList<RPCCommand>();
+ private final List<ReplicableCommand> elements = new
LinkedList<ReplicableCommand>();
/**
* For periodical replication
@@ -90,6 +89,9 @@
@Start
public synchronized void start() {
long interval = configuration.getReplQueueInterval();
+ if (log.isTraceEnabled()) {
+ log.trace("Starting replication queue, with interval=" + interval
+", and maxElements=" + maxElements);
+ }
this.maxElements = configuration.getReplQueueMaxElements();
// check again
enabled = configuration.isUseReplQueue();
@@ -98,7 +100,7 @@
public void run() {
flush();
}
- }, 500l, interval, TimeUnit.MILLISECONDS);
+ }, interval, interval, TimeUnit.MILLISECONDS);
}
}
@@ -117,7 +119,7 @@
/**
* Adds a new method call.
*/
- public void add(RPCCommand job) {
+ public void add(ReplicableCommand job) {
if (job == null)
throw new NullPointerException("job is null");
synchronized (elements) {
@@ -141,6 +143,7 @@
if (toReplicate.size() > 0) {
try {
+ if (log.isTraceEnabled()) log.trace("Flushing " +
toReplicate.size() + " elements " );
ReplicateCommand replicateCommand =
commandsFactory.buildReplicateCommand(toReplicate);
// send to all live nodes in the cluster
rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS,
configuration.getSyncReplTimeout());
@@ -150,4 +153,12 @@
}
}
}
+
+ public int getElementsCount() {
+ return elements.size();
+ }
+
+ public void reset() {
+ elements.clear();
+ }
}
\ No newline at end of file
Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-19 12:53:45 UTC
(rev 7730)
+++ core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-19 13:03:56 UTC
(rev 7731)
@@ -163,6 +163,12 @@
</namedCache>
+ <namedCache name="withReplicatinQueue">
+ <clustering>
+ <async useReplQueue="true" replQueueInterval="100"
replQueueMaxElements="200"/>
+ </clustering>
+ </namedCache>
+
<namedCache name="cacheWithCustomInterceptors">
<!--
Define custom interceptors. All custom interceptors need to extend
org.jboss.cache.interceptors.base.CommandInterceptor
Modified:
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-19
12:53:45 UTC (rev 7730)
+++
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -138,6 +138,8 @@
assert c.getTransactionManagerLookupClass() == null;
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert c.isUseReplQueue();
+ assert c.getReplQueueInterval() == 1234;
+ assert c.getReplQueueMaxElements() == 100;
assert c.isUseAsyncSerialization();
assert c.isFetchInMemoryState();
assert c.getStateRetrievalTimeout() == 15000;
Added: core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -0,0 +1,248 @@
+package org.horizon.replication;
+
+import org.horizon.Cache;
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.executors.ScheduledExecutorFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.remoting.ReplicationQueue;
+import org.horizon.test.MultipleCacheManagersTest;
+import org.horizon.test.TestingUtil;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Tests RepliationQueue functionality.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+@Test(groups = "functional", testName =
"replication.ReplicationQueueTest")
+public class ReplicationQueueTest extends MultipleCacheManagersTest {
+
+ Cache cache1;
+ Cache cache2;
+ private static final int REPL_QUEUE_INTERVAL = 5000;
+ private static final int REPL_QUEUE_MAX_ELEMENTS = 10;
+ long creationTime;
+
+ protected void createCacheManagers() throws Throwable {
+ GlobalConfiguration globalConfiguration =
GlobalConfiguration.getClusteredDefault();
+
globalConfiguration.setReplicationQueueScheduledExecutorFactoryClass(ReplQueueTestScheduledExecutorFactory.class.getName());
+
globalConfiguration.setReplicationQueueScheduledExecutorProperties(ReplQueueTestScheduledExecutorFactory.myProps);
+ CacheManager first = TestingUtil.createClusteredCacheManager(globalConfiguration);
+ CacheManager second =
TestingUtil.createClusteredCacheManager(globalConfiguration);
+ registerCacheManager(first, second);
+
+ Configuration config = getDefaultConfig();
+
config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ config.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+ config.setUseReplQueue(true);
+ config.setReplQueueInterval(REPL_QUEUE_INTERVAL);
+ config.setReplQueueMaxElements(REPL_QUEUE_MAX_ELEMENTS);
+ creationTime = System.currentTimeMillis();
+ manager(0).defineCache("replQueue", config);
+
+ Configuration conf2 = config.clone();
+ conf2.setUseReplQueue(false);
+ manager(1).defineCache("replQueue", conf2);
+
+ cache1 = cache(0, "replQueue");
+ cache2 = cache(1, "replQueue");
+ }
+
+ /**
+ * tests that the replication queue will use an appropriate executor defined through
+ * <tt>replicationQueueScheduledExecutor</tt> config param.
+ */
+ public void testApropriateExecutorIsUsed() {
+ assert ReplQueueTestScheduledExecutorFactory.methodCalled;
+ assert ReplQueueTestScheduledExecutorFactory.command != null;
+ assert ReplQueueTestScheduledExecutorFactory.delay == REPL_QUEUE_INTERVAL;
+ assert ReplQueueTestScheduledExecutorFactory.initialDelay == REPL_QUEUE_INTERVAL;
+ assert ReplQueueTestScheduledExecutorFactory.unit == TimeUnit.MILLISECONDS;
+ }
+
+ /**
+ * Make sure that replication will occur even if
<tt>replQueueMaxElements</tt> are not reached, but the
+ * <tt>replQueueInterval</tt> is reached.
+ */
+ public void testReplicationBasedOnTime() throws InterruptedException {
+ //only place one element, queue size is 10.
+ cache1.put("key", "value");
+ ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1,
ReplicationQueue.class);
+ assert replicationQueue != null;
+ assert replicationQueue.getElementsCount() == 1;
+ assert cache2.get("key") == null;
+ assert cache1.get("key").equals("value");
+
+ ReplQueueTestScheduledExecutorFactory.command.run();
+
+ //in next 5 secs, expect the replication to occur
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 5000) {
+ if (cache2.get("key") != null) break;
+ Thread.sleep(50);
+ }
+ assert cache2.get("key").equals("value");
+ assert replicationQueue.getElementsCount() == 0;
+ }
+
+ /**
+ * Make sure that replication will occur even if
<tt>replQueueMaxElements</tt> are not reached, but the
+ * <tt>replQueueInterval</tt> is reached.
+ */
+ public void testReplicationBasedOnTimeWithTx() throws Exception {
+ //only place one element, queue size is 10.
+ TransactionManager transactionManager = TestingUtil.getTransactionManager(cache1);
+ transactionManager.begin();
+ cache1.put("key", "value");
+ transactionManager.commit();
+
+ ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1,
ReplicationQueue.class);
+ assert replicationQueue != null;
+ assert replicationQueue.getElementsCount() == 1;
+ assert cache2.get("key") == null;
+ assert cache1.get("key").equals("value");
+
+ ReplQueueTestScheduledExecutorFactory.command.run();
+
+ //in next 5 secs, expect the replication to occur
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 5000) {
+ if (cache2.get("key") != null) break;
+ Thread.sleep(50);
+ }
+ assert cache2.get("key").equals("value");
+ assert replicationQueue.getElementsCount() == 0;
+ }
+
+
+ /**
+ * Make sure that replication will occur even if
<tt>replQueueMaxElements</tt> is reached, but the
+ * <tt>replQueueInterval</tt> is not reached.
+ */
+ public void testReplicationBasedOnSize() throws Exception {
+ //only place one element, queue size is 10.
+ for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+ cache1.put("key" + i, "value" + i);
+ }
+ //expect that in next 3 secs all commands are replicated
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 3000) {
+ if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+ Thread.sleep(50);
+ }
+ for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+ assert cache2.get("key" + i).equals("value" + i);
+ }
+ }
+
+ /**
+ * Make sure that replication will occur even if
<tt>replQueueMaxElements</tt> is reached, but the
+ * <tt>replQueueInterval</tt> is not reached.
+ */
+ public void testReplicationBasedOnSizeWithTx() throws Exception {
+ //only place one element, queue size is 10.
+ TransactionManager transactionManager = TestingUtil.getTransactionManager(cache1);
+ for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+ transactionManager.begin();
+ cache1.put("key" + i, "value" + i);
+ transactionManager.commit();
+ }
+ //expect that in next 3 secs all commands are replicated
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 3000) {
+ if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+ Thread.sleep(50);
+ }
+ for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+ assert cache2.get("key" + i).equals("value" + i);
+ }
+ }
+
+ /**
+ * Test that replication queue works fine when multiple threads are putting into the
queue.
+ */
+ public void testReplicationQueueMultipleThreads() throws Exception {
+ int numThreads = 4;
+ final int numLoopsPerThread = 3;
+ Thread[] threads = new Thread[numThreads];
+ final CountDownLatch latch = new CountDownLatch(1);
+
+ for (int i = 0; i < numThreads; i++) {
+ final int i1 = i;
+ threads[i] = new Thread() {
+ int index;
+
+ {
+ index = i1;
+ }
+
+ public void run() {
+ try {
+ latch.await();
+ }
+ catch (InterruptedException e) {
+ // do nothing
+ }
+ for (int j = 0; j < numLoopsPerThread; j++) {
+ cache1.put("key" + index + "_" + j,
"value");
+ }
+ }
+ };
+ threads[i].start();
+ }
+ latch.countDown();
+ // wait for threads to join
+ for (Thread t : threads) t.join();
+
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 3000) {
+ if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+ Thread.sleep(50);
+ }
+ assert cache2.size() == REPL_QUEUE_MAX_ELEMENTS;
+ ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1,
ReplicationQueue.class);
+ assert replicationQueue.getElementsCount() == numThreads * numLoopsPerThread -
REPL_QUEUE_MAX_ELEMENTS;
+ }
+
+
+ public static class ReplQueueTestScheduledExecutorFactory implements
ScheduledExecutorFactory {
+ static Properties myProps = new Properties();
+ static boolean methodCalled = false;
+ static Runnable command;
+ static long initialDelay;
+ static long delay;
+ static TimeUnit unit;
+
+ static {
+ myProps.put("aaa", "bbb");
+ myProps.put("ddd", "ccc");
+ }
+
+ public ScheduledExecutorService getScheduledExecutor(Properties p) {
+ assert p.equals(myProps);
+ methodCalled = true;
+ return new ScheduledThreadPoolExecutor(1) {
+ @Override
+ public ScheduledFuture<?> scheduleWithFixedDelay(Runnable commandP,
long initialDelayP, long delayP, TimeUnit unitP) {
+ command = commandP;
+ initialDelay = initialDelayP;
+ delay = delayP;
+ unit = unitP;
+ return null;
+ }
+ };
+ }
+ }
+
+
+}
Property changes on:
core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java 2009-02-19
12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -10,6 +10,7 @@
import org.horizon.logging.LogFactory;
import org.horizon.manager.CacheManager;
import org.horizon.manager.DefaultCacheManager;
+import org.horizon.remoting.ReplicationQueue;
import org.horizon.remoting.transport.Address;
import javax.transaction.TransactionManager;
@@ -35,11 +36,17 @@
for (Cache cache : runningCaches) {
removeInMemoryData(cache);
clearCacheLoader(cache);
+ clearReplicationQueues(cache);
InvocationContext invocationContext = ((AdvancedCache)
cache).getInvocationContextContainer().get();
if (invocationContext != null) invocationContext.reset();
}
}
+ private void clearReplicationQueues(Cache cache) {
+ ReplicationQueue queue = TestingUtil.extractComponent(cache,
ReplicationQueue.class);
+ if (queue != null) queue.reset();
+ }
+
@SuppressWarnings(value = "unchecked")
protected Set<Cache> getRunningCaches(CacheManager cacheManager) {
ConcurrentMap<String, Cache> caches = (ConcurrentMap<String, Cache>)
TestingUtil.extractField(DefaultCacheManager.class, cacheManager, "caches");
Modified:
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-19
12:53:45 UTC (rev 7730)
+++
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-19
13:03:56 UTC (rev 7731)
@@ -52,12 +52,21 @@
@BeforeClass
public void createBeforeClass() throws Throwable {
- if (cleanup == CleanupPhase.AFTER_TEST) createCacheManagers();
+ if (cleanup == CleanupPhase.AFTER_TEST) callCreateCacheManagers();
}
+ private void callCreateCacheManagers() {
+ try {
+ createCacheManagers();
+ } catch (Throwable th) {
+ th.printStackTrace();
+ log.error("Error in test setup: " + th);
+ }
+ }
+
@BeforeMethod
public void createBeforeMethod() throws Throwable {
- if (cleanup == CleanupPhase.AFTER_METHOD) createCacheManagers();
+ if (cleanup == CleanupPhase.AFTER_METHOD) callCreateCacheManagers();
}
@AfterClass
@@ -98,7 +107,7 @@
}
}
- final protected void registerCaches(CacheManager... cacheManagers) {
+ final protected void registerCacheManager(CacheManager... cacheManagers) {
this.cacheManagers.addAll(Arrays.asList(cacheManagers));
}
Modified: core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-02-19 12:53:45
UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java 2009-02-19 13:03:56
UTC (rev 7731)
@@ -636,4 +636,11 @@
GlobalConfiguration globalConfiguration =
GlobalConfiguration.getNonClusteredDefault();
return new DefaultCacheManager(globalConfiguration);
}
+
+ public static CacheManager createClusteredCacheManager(GlobalConfiguration
globalConfiguration) {
+ Properties newTransportProps = new Properties();
+ newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING,
JGroupsConfigBuilder.getJGroupsConfig());
+ globalConfiguration.setTransportProperties(newTransportProps);
+ return new DefaultCacheManager(globalConfiguration);
+ }
}
Modified: core/branches/flat/src/test/resources/configs/named-cache-test.xml
===================================================================
--- core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-19 12:53:45
UTC (rev 7730)
+++ core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-19 13:03:56
UTC (rev 7731)
@@ -53,7 +53,7 @@
<namedCache name="asyncReplQueue">
<clustering>
<stateRetrieval fetchInMemoryState="true"
timeout="15000"/>
- <async useReplQueue="true"/>
+ <async useReplQueue="true" replQueueInterval="1234"
replQueueMaxElements="100"/>
</clustering>
</namedCache>