[jbosscache-commits] JBoss Cache SVN: r7731 - in core/branches/flat/src: main/java/org/horizon/interceptors and 7 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Feb 19 08:03:56 EST 2009


Author: mircea.markus
Date: 2009-02-19 08:03:56 -0500 (Thu, 19 Feb 2009)
New Revision: 7731

Added:
   core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
   core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
   core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
   core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
   core/branches/flat/src/main/resources/config-samples/all.xml
   core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
   core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java
   core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
   core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
   core/branches/flat/src/test/resources/configs/named-cache-test.xml
Log:
added support for replication queues

Modified: core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/java/org/horizon/commands/remote/ReplicateCommand.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -92,8 +92,7 @@
       }
    }
 
-   private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand)
-         throws Throwable {
+   private Object processCommand(InvocationContext ctx, ReplicableCommand cacheCommand) throws Throwable {
       Object result;
       try {
          if (trace) log.trace("Invoking command " + cacheCommand + ", with originLocal flag set to false.");
@@ -107,7 +106,7 @@
                result = null;
             }
          } else {
-            throw new RuntimeException("Do we still need to deal with non-visitable commands?");
+            throw new RuntimeException("Do we still need to deal with non-visitable commands? (" + cacheCommand.getClass().getName() + ")");
 //            result = cacheCommand.perform(null);
          }
       }

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -103,8 +103,7 @@
     * If we are within one transaction we won't do any replication as replication would only be performed at commit
     * time. If the operation didn't originate locally we won't do any replication either.
     */
-   private Object handleCrudMethod(InvocationContext ctx, WriteCommand command)
-         throws Throwable {
+   private Object handleCrudMethod(InvocationContext ctx, WriteCommand command) throws Throwable {
       boolean local = isLocalModeForced(ctx);
       if (local && ctx.getTransaction() == null) return invokeNextInterceptor(ctx, command);
       // FIRST pass this call up the chain.  Only if it succeeds (no exceptions) locally do we attempt to replicate.

Modified: core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/base/BaseRpcInterceptor.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -25,6 +25,7 @@
 import org.horizon.commands.CommandsFactory;
 import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.remote.ReplicateCommand;
 import org.horizon.context.InvocationContext;
 import org.horizon.context.TransactionContext;
 import org.horizon.factories.annotations.Inject;
@@ -88,7 +89,7 @@
    }
 
    protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync, boolean useOutOfBandMessage) throws Throwable {
-      replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, useOutOfBandMessage);
+      replicateCall(ctx, null, call, sync, useOutOfBandMessage);
    }
 
    protected void replicateCall(InvocationContext ctx, RPCCommand call, boolean sync) throws Throwable {
@@ -96,10 +97,10 @@
    }
 
    protected void replicateCall(InvocationContext ctx, ReplicableCommand call, boolean sync) throws Throwable {
-      replicateCall(ctx, null, commandsFactory.buildReplicateCommand(call), sync, false);
+      replicateCall(ctx, null, call, sync, false);
    }
 
-   protected void replicateCall(InvocationContext ctx, List<Address> recipients, RPCCommand c, boolean sync, boolean useOutOfBandMessage) throws Throwable {
+   protected void replicateCall(InvocationContext ctx, List<Address> recipients, ReplicableCommand c, boolean sync, boolean useOutOfBandMessage) throws Throwable {
       long syncReplTimeout = configuration.getSyncReplTimeout();
 
       if (ctx.hasOption(Options.FORCE_ASYNCHRONOUS)) sync = false;
@@ -118,7 +119,7 @@
       replicateCall(recipients, c, sync, useOutOfBandMessage, syncReplTimeout);
    }
 
-   protected void replicateCall(List<Address> recipients, RPCCommand call, boolean sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
+   protected void replicateCall(List<Address> recipients, ReplicableCommand call, boolean sync, boolean useOutOfBandMessage, long timeout) throws Throwable {
       if (trace) log.trace("Broadcasting call " + call + " to recipient list " + recipients);
 
       if (!sync && replicationQueue != null) {
@@ -131,9 +132,10 @@
             if (trace)
                log.trace("Setting call recipients to " + callRecipients + " since the original list of recipients passed in is null.");
          }
+         ReplicateCommand command = commandsFactory.buildReplicateCommand(call);
 
          List rsps = rpcManager.invokeRemotely(callRecipients,
-                                               call,
+                                               command,
                                                sync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS, // is synchronised?
                                                timeout,
                                                useOutOfBandMessage

Modified: core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -22,7 +22,6 @@
 package org.horizon.remoting;
 
 import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.RPCCommand;
 import org.horizon.commands.ReplicableCommand;
 import org.horizon.commands.remote.ReplicateCommand;
 import org.horizon.config.Configuration;
@@ -59,7 +58,7 @@
    /**
     * Holds the replication jobs.
     */
-   final List<RPCCommand> elements = new LinkedList<RPCCommand>();
+   private final List<ReplicableCommand> elements = new LinkedList<ReplicableCommand>();
 
    /**
     * For periodical replication
@@ -90,6 +89,9 @@
    @Start
    public synchronized void start() {
       long interval = configuration.getReplQueueInterval();
+      if (log.isTraceEnabled()) {
+         log.trace("Starting replication queue, with interval=" + interval +", and maxElements=" + maxElements);
+      }
       this.maxElements = configuration.getReplQueueMaxElements();
       // check again
       enabled = configuration.isUseReplQueue();
@@ -98,7 +100,7 @@
             public void run() {
                flush();
             }
-         }, 500l, interval, TimeUnit.MILLISECONDS);
+         }, interval, interval, TimeUnit.MILLISECONDS);
       }
    }
 
@@ -117,7 +119,7 @@
    /**
     * Adds a new method call.
     */
-   public void add(RPCCommand job) {
+   public void add(ReplicableCommand job) {
       if (job == null)
          throw new NullPointerException("job is null");
       synchronized (elements) {
@@ -141,6 +143,7 @@
 
       if (toReplicate.size() > 0) {
          try {
+            if (log.isTraceEnabled()) log.trace("Flushing " + toReplicate.size() + " elements " );
             ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
             // send to all live nodes in the cluster
             rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
@@ -150,4 +153,12 @@
          }
       }
    }
+
+   public int getElementsCount() {
+      return elements.size();
+   }
+
+   public void reset() {
+      elements.clear();
+   }
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/main/resources/config-samples/all.xml	2009-02-19 13:03:56 UTC (rev 7731)
@@ -163,6 +163,12 @@
 
    </namedCache>
 
+   <namedCache name="withReplicatinQueue">
+      <clustering>
+         <async useReplQueue="true" replQueueInterval="100" replQueueMaxElements="200"/>
+      </clustering>
+   </namedCache>
+
    <namedCache name="cacheWithCustomInterceptors">
       <!--
       Define custom interceptors.  All custom interceptors need to extend org.jboss.cache.interceptors.base.CommandInterceptor

Modified: core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -138,6 +138,8 @@
       assert c.getTransactionManagerLookupClass() == null;
       assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       assert c.isUseReplQueue();
+      assert c.getReplQueueInterval() == 1234;
+      assert c.getReplQueueMaxElements() == 100;
       assert c.isUseAsyncSerialization();
       assert c.isFetchInMemoryState();
       assert c.getStateRetrievalTimeout() == 15000;

Added: core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -0,0 +1,248 @@
+package org.horizon.replication;
+
+import org.horizon.Cache;
+import org.horizon.config.Configuration;
+import org.horizon.config.GlobalConfiguration;
+import org.horizon.executors.ScheduledExecutorFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.remoting.ReplicationQueue;
+import org.horizon.test.MultipleCacheManagersTest;
+import org.horizon.test.TestingUtil;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.util.Properties;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.CountDownLatch;
+
+/**
+ * Tests RepliationQueue functionality.
+ *
+ * @author Mircea.Markus at jboss.com
+ */
+ at Test(groups = "functional", testName = "replication.ReplicationQueueTest")
+public class ReplicationQueueTest extends MultipleCacheManagersTest {
+
+   Cache cache1;
+   Cache cache2;
+   private static final int REPL_QUEUE_INTERVAL = 5000;
+   private static final int REPL_QUEUE_MAX_ELEMENTS = 10;
+   long creationTime;
+
+   protected void createCacheManagers() throws Throwable {
+      GlobalConfiguration globalConfiguration = GlobalConfiguration.getClusteredDefault();
+      globalConfiguration.setReplicationQueueScheduledExecutorFactoryClass(ReplQueueTestScheduledExecutorFactory.class.getName());
+      globalConfiguration.setReplicationQueueScheduledExecutorProperties(ReplQueueTestScheduledExecutorFactory.myProps);
+      CacheManager first = TestingUtil.createClusteredCacheManager(globalConfiguration);
+      CacheManager second = TestingUtil.createClusteredCacheManager(globalConfiguration);
+      registerCacheManager(first, second);
+
+      Configuration config = getDefaultConfig();
+      config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+      config.setCacheMode(Configuration.CacheMode.REPL_ASYNC);
+      config.setUseReplQueue(true);
+      config.setReplQueueInterval(REPL_QUEUE_INTERVAL);
+      config.setReplQueueMaxElements(REPL_QUEUE_MAX_ELEMENTS);
+      creationTime = System.currentTimeMillis();
+      manager(0).defineCache("replQueue", config);
+
+      Configuration conf2 = config.clone();
+      conf2.setUseReplQueue(false);
+      manager(1).defineCache("replQueue", conf2);
+
+      cache1 = cache(0, "replQueue");
+      cache2 = cache(1, "replQueue");
+   }
+
+   /**
+    * tests that the replication queue will use an appropriate executor defined through
+    * <tt>replicationQueueScheduledExecutor</tt> config param.
+    */
+   public void testApropriateExecutorIsUsed() {
+      assert ReplQueueTestScheduledExecutorFactory.methodCalled;
+      assert ReplQueueTestScheduledExecutorFactory.command != null;
+      assert ReplQueueTestScheduledExecutorFactory.delay == REPL_QUEUE_INTERVAL;
+      assert ReplQueueTestScheduledExecutorFactory.initialDelay == REPL_QUEUE_INTERVAL;
+      assert ReplQueueTestScheduledExecutorFactory.unit == TimeUnit.MILLISECONDS;
+   }
+
+   /**
+    * Make sure that replication will occur even if <tt>replQueueMaxElements</tt> are not reached, but the
+    * <tt>replQueueInterval</tt> is reached.
+    */
+   public void testReplicationBasedOnTime() throws InterruptedException {
+      //only place one element, queue size is 10. 
+      cache1.put("key", "value");
+      ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1, ReplicationQueue.class);
+      assert replicationQueue != null;
+      assert replicationQueue.getElementsCount() == 1;
+      assert cache2.get("key") == null;
+      assert cache1.get("key").equals("value");
+
+      ReplQueueTestScheduledExecutorFactory.command.run();
+
+      //in next 5 secs, expect the replication to occur
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 5000) {
+         if (cache2.get("key") != null) break;
+         Thread.sleep(50);
+      }
+      assert cache2.get("key").equals("value");
+      assert replicationQueue.getElementsCount() == 0;
+   }
+
+   /**
+    * Make sure that replication will occur even if <tt>replQueueMaxElements</tt> are not reached, but the
+    * <tt>replQueueInterval</tt> is reached.
+    */
+   public void testReplicationBasedOnTimeWithTx() throws Exception {
+      //only place one element, queue size is 10.
+      TransactionManager transactionManager = TestingUtil.getTransactionManager(cache1);
+      transactionManager.begin();
+      cache1.put("key", "value");
+      transactionManager.commit();
+
+      ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1, ReplicationQueue.class);
+      assert replicationQueue != null;
+      assert replicationQueue.getElementsCount() == 1;
+      assert cache2.get("key") == null;
+      assert cache1.get("key").equals("value");
+
+      ReplQueueTestScheduledExecutorFactory.command.run();
+
+      //in next 5 secs, expect the replication to occur
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 5000) {
+         if (cache2.get("key") != null) break;
+         Thread.sleep(50);
+      }
+      assert cache2.get("key").equals("value");
+      assert replicationQueue.getElementsCount() == 0;
+   }
+
+
+   /**
+    * Make sure that replication will occur even if <tt>replQueueMaxElements</tt> is reached, but the
+    * <tt>replQueueInterval</tt> is not reached.
+    */
+   public void testReplicationBasedOnSize() throws Exception {
+      //only place one element, queue size is 10.
+      for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+         cache1.put("key" + i, "value" + i);
+      }
+      //expect that in next 3 secs all commands are replicated
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 3000) {
+         if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+         Thread.sleep(50);
+      }
+      for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+         assert cache2.get("key" + i).equals("value" + i);
+      }
+   }
+
+   /**
+    * Make sure that replication will occur even if <tt>replQueueMaxElements</tt> is reached, but the
+    * <tt>replQueueInterval</tt> is not reached.
+    */
+   public void testReplicationBasedOnSizeWithTx() throws Exception {
+      //only place one element, queue size is 10.
+      TransactionManager transactionManager = TestingUtil.getTransactionManager(cache1);
+      for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+         transactionManager.begin();
+         cache1.put("key" + i, "value" + i);
+         transactionManager.commit();
+      }
+      //expect that in next 3 secs all commands are replicated
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 3000) {
+         if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+         Thread.sleep(50);
+      }
+      for (int i = 0; i < REPL_QUEUE_MAX_ELEMENTS; i++) {
+         assert cache2.get("key" + i).equals("value" + i);
+      }
+   }
+
+   /**
+    * Test that replication queue works fine when multiple threads are putting into the queue.
+    */
+   public void testReplicationQueueMultipleThreads() throws Exception {
+      int numThreads = 4;
+      final int numLoopsPerThread = 3;
+      Thread[] threads = new Thread[numThreads];
+      final CountDownLatch latch = new CountDownLatch(1);
+
+      for (int i = 0; i < numThreads; i++) {
+         final int i1 = i;
+         threads[i] = new Thread() {
+            int index;
+
+            {
+               index = i1;
+            }
+
+            public void run() {
+               try {
+                  latch.await();
+               }
+               catch (InterruptedException e) {
+                  // do nothing
+               }
+               for (int j = 0; j < numLoopsPerThread; j++) {
+                  cache1.put("key" + index + "_" + j, "value");
+               }
+            }
+         };
+         threads[i].start();
+      }
+      latch.countDown();
+      // wait for threads to join
+      for (Thread t : threads) t.join();
+
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 3000) {
+         if (cache2.size() == REPL_QUEUE_MAX_ELEMENTS) break;
+         Thread.sleep(50);
+      }
+      assert cache2.size() == REPL_QUEUE_MAX_ELEMENTS;
+      ReplicationQueue replicationQueue = TestingUtil.extractComponent(cache1, ReplicationQueue.class);
+      assert replicationQueue.getElementsCount() == numThreads * numLoopsPerThread - REPL_QUEUE_MAX_ELEMENTS;
+   }
+
+
+   public static class ReplQueueTestScheduledExecutorFactory implements ScheduledExecutorFactory {
+      static Properties myProps = new Properties();
+      static boolean methodCalled = false;
+      static Runnable command;
+      static long initialDelay;
+      static long delay;
+      static TimeUnit unit;
+
+      static {
+         myProps.put("aaa", "bbb");
+         myProps.put("ddd", "ccc");
+      }
+
+      public ScheduledExecutorService getScheduledExecutor(Properties p) {
+         assert p.equals(myProps);
+         methodCalled = true;
+         return new ScheduledThreadPoolExecutor(1) {
+            @Override
+            public ScheduledFuture<?> scheduleWithFixedDelay(Runnable commandP, long initialDelayP, long delayP, TimeUnit unitP) {
+               command = commandP;
+               initialDelay = initialDelayP;
+               delay = delayP;
+               unit = unitP;
+               return null;
+            }
+         };
+      }
+   }
+
+
+}


Property changes on: core/branches/flat/src/test/java/org/horizon/replication/ReplicationQueueTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/test/AbstractCacheTest.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -10,6 +10,7 @@
 import org.horizon.logging.LogFactory;
 import org.horizon.manager.CacheManager;
 import org.horizon.manager.DefaultCacheManager;
+import org.horizon.remoting.ReplicationQueue;
 import org.horizon.remoting.transport.Address;
 
 import javax.transaction.TransactionManager;
@@ -35,11 +36,17 @@
       for (Cache cache : runningCaches) {
          removeInMemoryData(cache);
          clearCacheLoader(cache);
+         clearReplicationQueues(cache);
          InvocationContext invocationContext = ((AdvancedCache) cache).getInvocationContextContainer().get();
          if (invocationContext != null) invocationContext.reset();
       }
    }
 
+   private void clearReplicationQueues(Cache cache) {
+      ReplicationQueue queue = TestingUtil.extractComponent(cache, ReplicationQueue.class);
+      if (queue != null) queue.reset();
+   }
+
    @SuppressWarnings(value = "unchecked")
    protected Set<Cache> getRunningCaches(CacheManager cacheManager) {
       ConcurrentMap<String, Cache> caches = (ConcurrentMap<String, Cache>) TestingUtil.extractField(DefaultCacheManager.class, cacheManager, "caches");

Modified: core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -52,12 +52,21 @@
 
    @BeforeClass
    public void createBeforeClass() throws Throwable {
-      if (cleanup == CleanupPhase.AFTER_TEST) createCacheManagers();
+      if (cleanup == CleanupPhase.AFTER_TEST) callCreateCacheManagers();
    }
 
+   private void callCreateCacheManagers() {
+      try {
+         createCacheManagers();
+      } catch (Throwable th) {
+         th.printStackTrace();
+         log.error("Error in test setup: " + th);
+      }
+   }
+
    @BeforeMethod
    public void createBeforeMethod() throws Throwable {
-      if (cleanup == CleanupPhase.AFTER_METHOD) createCacheManagers();
+      if (cleanup == CleanupPhase.AFTER_METHOD) callCreateCacheManagers();
    }
 
    @AfterClass
@@ -98,7 +107,7 @@
       }
    }
 
-   final protected void registerCaches(CacheManager... cacheManagers) {
+   final protected void registerCacheManager(CacheManager... cacheManagers) {
       this.cacheManagers.addAll(Arrays.asList(cacheManagers));
    }
 

Modified: core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/java/org/horizon/test/TestingUtil.java	2009-02-19 13:03:56 UTC (rev 7731)
@@ -636,4 +636,11 @@
       GlobalConfiguration globalConfiguration = GlobalConfiguration.getNonClusteredDefault();
       return new DefaultCacheManager(globalConfiguration);
    }
+
+   public static CacheManager createClusteredCacheManager(GlobalConfiguration globalConfiguration) {
+      Properties newTransportProps = new Properties();
+      newTransportProps.put(JGroupsTransport.CONFIGURATION_STRING, JGroupsConfigBuilder.getJGroupsConfig());
+      globalConfiguration.setTransportProperties(newTransportProps);
+      return new DefaultCacheManager(globalConfiguration);
+   }
 }

Modified: core/branches/flat/src/test/resources/configs/named-cache-test.xml
===================================================================
--- core/branches/flat/src/test/resources/configs/named-cache-test.xml	2009-02-19 12:53:45 UTC (rev 7730)
+++ core/branches/flat/src/test/resources/configs/named-cache-test.xml	2009-02-19 13:03:56 UTC (rev 7731)
@@ -53,7 +53,7 @@
    <namedCache name="asyncReplQueue">
       <clustering>
          <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
-         <async useReplQueue="true"/>
+         <async useReplQueue="true" replQueueInterval="1234" replQueueMaxElements="100"/>
       </clustering>
    </namedCache>
 




More information about the jbosscache-commits mailing list