[jbosscache-commits] JBoss Cache SVN: r7378 - in core/trunk/src: main/java/org/jboss/cache/loader/tcp and 2 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Mon Jan 5 12:35:41 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-05 12:35:41 -0500 (Mon, 05 Jan 2009)
New Revision: 7378

Modified:
   core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
   core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java
   core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java
   core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
Log:
Fixed TCP cache server, and re-enabled tests

Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2009-01-05 17:35:41 UTC (rev 7378)
@@ -36,6 +36,7 @@
 import java.io.ObjectOutputStream;
 import java.lang.reflect.InvocationTargetException;
 import java.lang.reflect.Method;
+import java.net.ConnectException;
 import java.net.Socket;
 import java.util.List;
 import java.util.Map;
@@ -391,11 +392,19 @@
    @Override
    public void start() throws IOException
    {
-      sock = new Socket(config.getHost(), config.getPort());
-      sock.setSoTimeout(config.getReadTimeout());
-      out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
-      out.flush();
-      in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+      try
+      {
+         sock = new Socket(config.getHost(), config.getPort());
+         sock.setSoTimeout(config.getReadTimeout());
+         out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+         out.flush();
+         in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+      }
+      catch (ConnectException ce)
+      {
+         log.info("Unable to connect to TCP socket on interface " + config.getHost() + " and port " + config.getPort());
+         throw ce;
+      }
    }
 
    @Override

Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java	2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java	2009-01-05 17:35:41 UTC (rev 7378)
@@ -45,7 +45,6 @@
 import java.net.UnknownHostException;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
@@ -215,33 +214,30 @@
 
    public void stop()
    {
-      if (running)
+      running = false;
+      synchronized (conns)
       {
-         running = false;
-         synchronized (conns)
+         // Connection.close() removes conn from the list,
+         // so copy off the list first to avoid ConcurrentModificationException
+         List<Connection> copy = new ArrayList<Connection>(conns);
+         for (Connection conn : copy)
          {
-            // Connection.close() removes conn from the list,
-            // so copy off the list first to avoid ConcurrentModificationException
-            List<Connection> copy = new ArrayList<Connection>(conns);
-            for (Connection conn : copy)
-            {
-               conn.close();
-            }
-            conns.clear();
+            conn.close();
          }
+         conns.clear();
+      }
 
-         if (srv_sock != null)
+      if (srv_sock != null)
+      {
+         try
          {
-            try
-            {
-               srv_sock.close();
-            }
-            catch (IOException e)
-            {
-               // nada
-            }
-            srv_sock = null;
+            srv_sock.close();
          }
+         catch (IOException e)
+         {
+            // nada
+         }
+         srv_sock = null;
       }
    }
 
@@ -384,10 +380,6 @@
                      {
                         map = Collections.emptyMap();
                      }
-                     else
-                     {
-                        map = new HashMap(map); // TODO: copy this since FastCopyHashMap has issues with serialization at the moment
-                     }
                      output.writeObject(map);
                      break;
                   case TcpCacheOperations.EXISTS:

Modified: core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java	2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java	2009-01-05 17:35:41 UTC (rev 7378)
@@ -1,10 +1,10 @@
 package org.jboss.cache;
 
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterClass;
 import org.testng.annotations.Test;
-import org.jboss.cache.util.TestingUtil;
 
 /**
  * @author Mircea.Markus at jboss.com
@@ -14,7 +14,22 @@
 {
    protected CacheSPI<K, V> cache;
 
+   /**
+    * This method will always be called before {@link #create()}.  If you override this, make sure you annotate the
+    * overridden method with {@link org.testng.annotations.BeforeClass}.
+    *
+    * @throws Exception Just in case
+    */
    @BeforeClass
+   public void preCreate() throws Exception
+   {
+      // no op, made for overriding.
+   }
+
+   // Due to some weirdness with TestNG, it always appends the package and class name to the method names
+   // provided on dependsOnMethods unless it thinks there already is a package.  This does accept regular expressions
+   // though so .*. works.  Otherwise it won't detect overridden methods in subclasses.
+   @BeforeClass(dependsOnMethods = "org.jboss.*.preCreate")
    protected void create() throws Exception
    {
       cache = createCache();

Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java	2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java	2009-01-05 17:35:41 UTC (rev 7378)
@@ -2,11 +2,16 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.*;
-import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Modification;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.UnitTestCacheFactory;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.CacheLoaderConfig;
 import org.jboss.cache.statetransfer.DefaultStateTransferManager;
 import org.jboss.cache.transaction.TransactionSetup;
 import org.jboss.cache.util.TestingUtil;
@@ -14,15 +19,21 @@
 import org.jboss.util.stream.MarshalledValueOutputStream;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.CopyOnWriteArraySet;
 import java.util.concurrent.CountDownLatch;
 
@@ -61,6 +72,7 @@
    {
       CacheSPI<Object, Object> result = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(false, getClass());
       Configuration c = result.getConfiguration();
+      c.setEvictionConfig(null);
       c.setCacheMode(Configuration.CacheMode.LOCAL);
       c.setTransactionManagerLookupClass(TransactionSetup.getManagerLookup());
       configureCache(result);
@@ -76,7 +88,9 @@
    /**
     * Subclass if you need any further cfg after the cache starts.
     */
-   protected void postConfigure() { }
+   protected void postConfigure()
+   {
+   }
 
    abstract protected void configureCache(CacheSPI cache) throws Exception;
 
@@ -893,8 +907,6 @@
 
    public void testRemoveData4() throws Exception
    {
-
-
       Set keys;
       Fqn key = Fqn.fromString("/x/y/z/");
       cache.put(key, "keyA", "valA");
@@ -1527,7 +1539,8 @@
          {
             assertTrue(set.contains(names[i]));
          }
-      } else
+      }
+      else
       {
          assertNull(set);
       }
@@ -1946,7 +1959,8 @@
          if (nested == null)
          {
             return super.hashCode();
-         } else
+         }
+         else
          {
             return 13 + nested.hashCode();
          }
@@ -2258,7 +2272,8 @@
             fqns.add(f);
             loader.put(f, "k", "v");
          }
-      } else
+      }
+      else
       {
          loader.put(fqn, "k", "v");
       }
@@ -2406,7 +2421,7 @@
 
       return cache.getConfiguration().getRuntimeConfig().getTransactionManager();
    }
-   
+
    public void testSetData() throws Exception
    {
       log.info("testSetData");
@@ -2422,7 +2437,7 @@
       log.info("GET");
       loaderMap = loader.get(key);
       assertEquals(map, loaderMap);
-      
+
       assertNull(cache.get(key, "x"));
       assertEquals("c", cache.get(key, "c"));
       assertEquals("a", cache.get(key, "a"));

Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2009-01-05 17:35:41 UTC (rev 7378)
@@ -1,255 +1,315 @@
 package org.jboss.cache.loader;
 
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
+import org.jboss.cache.loader.tcp.TcpCacheServer;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
 import org.testng.annotations.Test;
 
+import javax.transaction.Synchronization;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
 /**
  * Tests the TcpDelegatingCacheLoader
  *
  * @author Bela Ban
  * @version $Id$
  */
- at Test(groups = "functional", enabled = false, testName = "loader.TcpCacheLoaderTest")
-// TODO re-enable!!
-public class TcpCacheLoaderTest //extends CacheLoaderTestsBase
+ at Test(groups = "functional", enabled = true, testName = "loader.TcpCacheLoaderTest", sequential = true)
+public class TcpCacheLoaderTest extends CacheLoaderTestsBase
 {
-//   protected static final int CACHE_SERVER_RESTART_DELAY_MS = 1000;
-//   protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 2000;
-//   protected static int START_COUNT = 0;
-//   static TcpCacheServer cacheServer = null;
-//
-//   @BeforeClass
-//   public static void startCacheServer()
-//   {
-//      final CountDownLatch startedSignal = new CountDownLatch(1);
-//
-//      Thread t = new Thread()
-//      {
-//         public void run()
-//         {
-//            try
-//            {
-//               cacheServer = new TcpCacheServer();
-//               cacheServer.setBindAddress("127.0.0.1");
-//               cacheServer.setPort(12121);
-//               Configuration config = UnitTestCacheConfigurationFactory.createConfiguration(Configuration.CacheMode.LOCAL, true);
-//               CacheSPI cache = (CacheSPI) new DefaultCacheFactory<Object, Object>().createCache(config);
-//               cacheServer.setCache(cache);
-//               cacheServer.create();
-//               cacheServer.start();
-//               START_COUNT++;
-//               startedSignal.countDown();
-//            }
-//            catch (Exception ex)
-//            {
-//               ex.printStackTrace();
-//            }
-//         }
-//
-//      };
-//      t.setDaemon(true);
-//      t.start();
-//
-//      // Wait for the cache server to start up.
-//      boolean started = false;
-//      try
-//      {
-//         started = startedSignal.await(120, TimeUnit.SECONDS);
-//      }
-//      catch (InterruptedException e)
-//      {
-//         // do nothing
-//      }
-//
-//      if (!started)
-//      {
-//         // the TcpCacheServer was unable to start up for some reason!!
-//         throw new RuntimeException("Unable to start the TcpCacheServer after 120 seconds!!");
-//      }
-//   }
-//
-//   @AfterClass
-//   public static void stopCacheServer()
-//   {
-//      if (cacheServer != null)
-//      {
-//         cacheServer.stop();
-//      }
-//   }
-//
-//   protected static void restartCacheServer()
-//   {
-//      stopCacheServer();
-//      startCacheServer();
-//   }
-//
-//   @Override
-//   public void testPartialLoadAndStore()
-//   {
-//      // do nothing
-//   }
-//
-//   @Override
-//   public void testBuddyBackupStore()
-//   {
-//      // do nothing
-//   }
-//
-//   protected void configureCache() throws Exception
-//   {
-//      cache.getConfiguration().setCacheLoaderConfig(getSingleCacheLoaderConfig("",
-//            TcpDelegatingCacheLoader.class.getName(),
-//            "host=127.0.0.1\nport=12121\ntimeout=" + TCP_CACHE_LOADER_TIMEOUT_MS, false, true, false));
-//   }
-//
-//   // restart tests
-//   public void testCacheServerRestartMidCall() throws Exception
-//   {
-//      CacheServerRestarter restarter = new CacheServerRestarter();
-//      restarter.restart = true;
-//      cache.addCacheListener(restarter);
-//      int oldStartCount = START_COUNT;
-//      // a restart of the cache server will happen before the cache loader interceptor is called.
-//      cache.put(FQN, "key", "value");
-//
-//      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-//      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
-//   }
-//
-//   public void testCacheServerDelayedRestartMidCall() throws Exception
-//   {
-//      CacheServerRestarter restarter = new CacheServerRestarter();
-//      restarter.restart = false;
-//      restarter.delayedRestart = true;
-//      restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
-//      cache.addCacheListener(restarter);
-//      int oldStartCount = START_COUNT;
-//
-//      // the cache server will STOP before the cache laoder interceptor is called.
-//      // it will be restarted in a separate thread, startAfter millis later.
-//      // this should be less than the TcpCacheLoader timeout.
-//      cache.put(FQN, "key", "value");
-//
-//      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-//      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
-//   }
-//
-//   public void testCacheServerTimeoutMidCall() throws Exception
-//   {
-//      CacheServerRestarter restarter = new CacheServerRestarter();
-//      restarter.restart = false;
-//      restarter.delayedRestart = true;
-//      restarter.startAfter = -1;
-//      cache.addCacheListener(restarter);
-//      int oldStartCount = START_COUNT;
-//
-//      // the cache server will STOP before the cache laoder interceptor is called.
-//      // it will be restarted in a separate thread, startAfter millis later.
-//      // this should be less than the TcpCacheLoader timeout.
-//      try
-//      {
-//         cache.put(FQN, "key", "value");
-//         assert false : "Should have failed";
-//      }
-//      catch (CacheException expected)
-//      {
-//
-//      }
-//
-//      assert oldStartCount == START_COUNT : "Cache server should NOT have restarted!";
-//      // start the TCP server again
-//      startCacheServer();
-//      assert loader.get(FQN) == null;
-//   }
-//
-//   public void testCacheServerRestartMidTransaction() throws Exception
-//   {
-//      int oldStartCount = START_COUNT;
-//      cache.getTransactionManager().begin();
-//      cache.put(FQN, "key", "value");
-//      restartCacheServer();
-//      cache.put(FQN, "key2", "value2");
-//      cache.getTransactionManager().commit();
-//
-//      Map m = new HashMap();
-//      m.put("key", "value");
-//      m.put("key2", "value2");
-//
-//      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-//      assert loader.get(FQN).equals(m);
-//   }
-//
-//   public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
-//   {
-//      int oldStartCount = START_COUNT;
-//      cache.getTransactionManager().begin();
-//
-//      cache.put(FQN, "key", "value");
-//      cache.put(FQN, "key2", "value2");
-//
-//      GlobalTransaction gtx = cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
-//      OrderedSynchronizationHandler osh = cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
-//
-////      OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
-//      osh.registerAtTail(
-//            new Synchronization()
-//            {
-//
-//               public void beforeCompletion()
-//               {
-//                  // this will be called after the cache's prepare() phase.  Restart the cache server.
-//                  restartCacheServer();
-//               }
-//
-//               public void afterCompletion(int i)
-//               {
-//                  // do nothing
-//               }
-//            }
-//      );
-//
-//      cache.getTransactionManager().commit();
-//
-//      Map m = new HashMap();
-//      m.put("key", "value");
-//      m.put("key2", "value2");
-//
-//      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-//      assert loader.get(FQN).equals(m);
-//
-//   }
-//
-//   @CacheListener
-//   public static class CacheServerRestarter
-//   {
-//      boolean restart;
-//      boolean delayedRestart;
-//      int startAfter;
-//
-//      @NodeCreated
-//      public void restart(Event e)
-//      {
-//         if (e.isPre())
-//         {
-//            if (restart)
-//            {
-//               restartCacheServer();
-//            }
-//            else if (delayedRestart)
-//            {
-//               stopCacheServer();
-//               new Thread()
-//               {
-//                  public void run()
-//                  {
-//                     if (startAfter > 0)
-//                     {
-//                        TestingUtil.sleepThread(startAfter);
-//                        startCacheServer();
-//                     }
-//                  }
-//               }.start();
-//            }
-//         }
-//      }
-//   }
+   protected static final String TCP_CACHE_SERVER_HOST = "127.0.0.1";
+   protected static final int TCP_CACHE_SERVER_PORT = 12121;
+   protected static final int CACHE_SERVER_RESTART_DELAY_MS = 250;
+   protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 1000;
+   protected static int START_COUNT = 0;
+   static volatile TcpCacheServer cacheServer = null;
+
+   @Override
+   @BeforeClass
+   public void preCreate()
+   {
+      if (cacheServer != null) stopCacheServer();
+      startCacheServer();
+   }
+
+   private static void startCacheServer()
+   {
+      final CountDownLatch startedSignal = new CountDownLatch(1);
+
+      Thread t = new Thread()
+      {
+         public void run()
+         {
+            try
+            {
+               cacheServer = new TcpCacheServer();
+               cacheServer.setBindAddress(TCP_CACHE_SERVER_HOST);
+               cacheServer.setPort(TCP_CACHE_SERVER_PORT);
+               Configuration config = UnitTestCacheConfigurationFactory.createConfiguration(Configuration.CacheMode.LOCAL, true);
+               // disable eviction!!
+               config.setEvictionConfig(null);
+               CacheSPI cache = (CacheSPI) new DefaultCacheFactory<Object, Object>().createCache(config);
+               cacheServer.setCache(cache);
+               cacheServer.create();
+               cacheServer.start();
+               START_COUNT++;
+               startedSignal.countDown();
+            }
+            catch (Exception ex)
+            {
+               ex.printStackTrace();
+            }
+         }
+
+      };
+      t.setDaemon(true);
+      t.start();
+
+      // Wait for the cache server to start up.
+      boolean started = false;
+      try
+      {
+         started = startedSignal.await(120, TimeUnit.SECONDS);
+      }
+      catch (InterruptedException e)
+      {
+         // do nothing
+      }
+
+      if (!started)
+      {
+         // the TcpCacheServer was unable to start up for some reason!!
+         throw new RuntimeException("Unable to start the TcpCacheServer after 120 seconds!!");
+      }
+   }
+
+   @AfterClass
+   public static void stopCacheServer()
+   {
+      if (cacheServer != null)
+      {
+         cacheServer.stop();
+         cacheServer = null;
+      }
+   }
+
+   @AfterMethod
+   public void removeRestarters()
+   {
+      if (cache != null)
+      {
+         Set<Object> restarters = new HashSet<Object>();
+         for (Object listener : cache.getCacheListeners())
+         {
+            if (listener instanceof CacheServerRestarter) restarters.add(listener);
+         }
+         try
+         {
+            for (Object restarter : restarters) cache.removeCacheListener(restarter);
+         }
+         catch (Exception ignored)
+         {
+            // ignored
+         }
+      }
+   }
+
+   protected static void restartCacheServer()
+   {
+      stopCacheServer();
+      startCacheServer();
+   }
+
+   @Override
+   public void testPartialLoadAndStore()
+   {
+      // do nothing
+   }
+
+   @Override
+   public void testBuddyBackupStore()
+   {
+      // do nothing
+   }
+
+   protected void configureCache(CacheSPI cache) throws Exception
+   {
+      CacheLoaderConfig clc = new CacheLoaderConfig();
+      TcpDelegatingCacheLoaderConfig tcpCfg = new TcpDelegatingCacheLoaderConfig(TCP_CACHE_SERVER_HOST, TCP_CACHE_SERVER_PORT, TCP_CACHE_LOADER_TIMEOUT_MS);
+      tcpCfg.setReconnectWaitTime(CACHE_SERVER_RESTART_DELAY_MS);
+      tcpCfg.setFetchPersistentState(false);
+      clc.addIndividualCacheLoaderConfig(tcpCfg);
+      cache.getConfiguration().setCacheLoaderConfig(clc);
+   }
+
+   // restart tests
+   public void testCacheServerRestartMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = true;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+      // a restart of the cache server will happen before the cache loader interceptor is called.
+      cache.put(FQN, "key", "value");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+   }
+
+   public void testCacheServerDelayedRestartMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = false;
+      restarter.delayedRestart = true;
+      restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+
+      // the cache server will STOP before the cache laoder interceptor is called.
+      // it will be restarted in a separate thread, startAfter millis later.
+      // this should be less than the TcpCacheLoader timeout.
+      cache.put(FQN, "key", "value");
+
+      assert oldStartCount < START_COUNT : "Cache server should have restarted! old = " + oldStartCount + " and count = " + START_COUNT;
+      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+   }
+
+   public void testCacheServerTimeoutMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = false;
+      restarter.delayedRestart = true;
+      restarter.startAfter = -1;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+
+      // the cache server will STOP before the cache laoder interceptor is called.
+      // it will be restarted in a separate thread, startAfter millis later.
+      // this should be less than the TcpCacheLoader timeout.
+      try
+      {
+         cache.put(FQN, "key", "value");
+         assert false : "Should have failed";
+      }
+      catch (CacheException expected)
+      {
+
+      }
+
+      assert oldStartCount == START_COUNT : "Cache server should NOT have restarted!";
+      // start the TCP server again
+      startCacheServer();
+      assert loader.get(FQN) == null;
+   }
+
+   public void testCacheServerRestartMidTransaction() throws Exception
+   {
+      int oldStartCount = START_COUNT;
+      cache.getTransactionManager().begin();
+      cache.put(FQN, "key", "value");
+      restartCacheServer();
+      cache.put(FQN, "key2", "value2");
+      cache.getTransactionManager().commit();
+
+      Map m = new HashMap();
+      m.put("key", "value");
+      m.put("key2", "value2");
+
+      assert oldStartCount < START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(m);
+   }
+
+   public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
+   {
+      int oldStartCount = START_COUNT;
+      cache.getTransactionManager().begin();
+
+      cache.put(FQN, "key", "value");
+      cache.put(FQN, "key2", "value2");
+
+      GlobalTransaction gtx = cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
+      OrderedSynchronizationHandler osh = cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+
+//      OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+      osh.registerAtTail(
+            new Synchronization()
+            {
+
+               public void beforeCompletion()
+               {
+                  // this will be called after the cache's prepare() phase.  Restart the cache server.
+                  restartCacheServer();
+               }
+
+               public void afterCompletion(int i)
+               {
+                  // do nothing
+               }
+            }
+      );
+
+      cache.getTransactionManager().commit();
+
+      Map m = new HashMap();
+      m.put("key", "value");
+      m.put("key2", "value2");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(m);
+
+   }
+
+   @CacheListener
+   public static class CacheServerRestarter
+   {
+      boolean restart;
+      boolean delayedRestart;
+      int startAfter;
+
+      @NodeCreated
+      public void restart(Event e)
+      {
+         if (e.isPre())
+         {
+            if (restart)
+            {
+               restartCacheServer();
+            }
+            else if (delayedRestart)
+            {
+               stopCacheServer();
+               new Thread()
+               {
+                  public void run()
+                  {
+                     if (startAfter > 0)
+                     {
+                        TestingUtil.sleepThread(startAfter);
+                        startCacheServer();
+                     }
+                  }
+               }.start();
+            }
+         }
+      }
+   }
 }




More information about the jbosscache-commits mailing list