[jbosscache-commits] JBoss Cache SVN: r7378 - in core/trunk/src: main/java/org/jboss/cache/loader/tcp and 2 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Mon Jan 5 12:35:41 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-01-05 12:35:41 -0500 (Mon, 05 Jan 2009)
New Revision: 7378
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java
core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java
core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
Log:
Fixed TCP cache server, and re-enabled tests
Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2009-01-05 17:35:41 UTC (rev 7378)
@@ -36,6 +36,7 @@
import java.io.ObjectOutputStream;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
+import java.net.ConnectException;
import java.net.Socket;
import java.util.List;
import java.util.Map;
@@ -391,11 +392,19 @@
@Override
public void start() throws IOException
{
- sock = new Socket(config.getHost(), config.getPort());
- sock.setSoTimeout(config.getReadTimeout());
- out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
- out.flush();
- in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+ try
+ {
+ sock = new Socket(config.getHost(), config.getPort());
+ sock.setSoTimeout(config.getReadTimeout());
+ out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ out.flush();
+ in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+ }
+ catch (ConnectException ce)
+ {
+ log.info("Unable to connect to TCP socket on interface " + config.getHost() + " and port " + config.getPort());
+ throw ce;
+ }
}
@Override
Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2009-01-05 17:35:41 UTC (rev 7378)
@@ -45,7 +45,6 @@
import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
@@ -215,33 +214,30 @@
public void stop()
{
- if (running)
+ running = false;
+ synchronized (conns)
{
- running = false;
- synchronized (conns)
+ // Connection.close() removes conn from the list,
+ // so copy off the list first to avoid ConcurrentModificationException
+ List<Connection> copy = new ArrayList<Connection>(conns);
+ for (Connection conn : copy)
{
- // Connection.close() removes conn from the list,
- // so copy off the list first to avoid ConcurrentModificationException
- List<Connection> copy = new ArrayList<Connection>(conns);
- for (Connection conn : copy)
- {
- conn.close();
- }
- conns.clear();
+ conn.close();
}
+ conns.clear();
+ }
- if (srv_sock != null)
+ if (srv_sock != null)
+ {
+ try
{
- try
- {
- srv_sock.close();
- }
- catch (IOException e)
- {
- // nada
- }
- srv_sock = null;
+ srv_sock.close();
}
+ catch (IOException e)
+ {
+ // nada
+ }
+ srv_sock = null;
}
}
@@ -384,10 +380,6 @@
{
map = Collections.emptyMap();
}
- else
- {
- map = new HashMap(map); // TODO: copy this since FastCopyHashMap has issues with serialization at the moment
- }
output.writeObject(map);
break;
case TcpCacheOperations.EXISTS:
Modified: core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java 2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/AbstractSingleCacheTest.java 2009-01-05 17:35:41 UTC (rev 7378)
@@ -1,10 +1,10 @@
package org.jboss.cache;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeClass;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.AfterClass;
import org.testng.annotations.Test;
-import org.jboss.cache.util.TestingUtil;
/**
* @author Mircea.Markus at jboss.com
@@ -14,7 +14,22 @@
{
protected CacheSPI<K, V> cache;
+ /**
+ * This method will always be called before {@link #create()}. If you override this, make sure you annotate the
+ * overridden method with {@link org.testng.annotations.BeforeClass}.
+ *
+ * @throws Exception Just in case
+ */
@BeforeClass
+ public void preCreate() throws Exception
+ {
+ // no op, made for overriding.
+ }
+
+ // Due to some weirdness with TestNG, it always appends the package and class name to the method names
+ // provided on dependsOnMethods unless it thinks there already is a package. This does accept regular expressions
+ // though so .*. works. Otherwise it won't detect overridden methods in subclasses.
+ @BeforeClass(dependsOnMethods = "org.jboss.*.preCreate")
protected void create() throws Exception
{
cache = createCache();
Modified: core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java 2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/loader/CacheLoaderTestsBase.java 2009-01-05 17:35:41 UTC (rev 7378)
@@ -2,11 +2,16 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.*;
-import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.AbstractSingleCacheTest;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.Fqn;
+import org.jboss.cache.Modification;
+import org.jboss.cache.Node;
+import org.jboss.cache.NodeSPI;
+import org.jboss.cache.UnitTestCacheFactory;
import org.jboss.cache.buddyreplication.BuddyManager;
import org.jboss.cache.config.Configuration;
-import org.jboss.cache.config.CacheLoaderConfig;
import org.jboss.cache.statetransfer.DefaultStateTransferManager;
import org.jboss.cache.transaction.TransactionSetup;
import org.jboss.cache.util.TestingUtil;
@@ -14,15 +19,21 @@
import org.jboss.util.stream.MarshalledValueOutputStream;
import static org.testng.AssertJUnit.*;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
-import org.testng.annotations.BeforeMethod;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.Serializable;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
@@ -61,6 +72,7 @@
{
CacheSPI<Object, Object> result = (CacheSPI<Object, Object>) new UnitTestCacheFactory<Object, Object>().createCache(false, getClass());
Configuration c = result.getConfiguration();
+ c.setEvictionConfig(null);
c.setCacheMode(Configuration.CacheMode.LOCAL);
c.setTransactionManagerLookupClass(TransactionSetup.getManagerLookup());
configureCache(result);
@@ -76,7 +88,9 @@
/**
* Subclass if you need any further cfg after the cache starts.
*/
- protected void postConfigure() { }
+ protected void postConfigure()
+ {
+ }
abstract protected void configureCache(CacheSPI cache) throws Exception;
@@ -893,8 +907,6 @@
public void testRemoveData4() throws Exception
{
-
-
Set keys;
Fqn key = Fqn.fromString("/x/y/z/");
cache.put(key, "keyA", "valA");
@@ -1527,7 +1539,8 @@
{
assertTrue(set.contains(names[i]));
}
- } else
+ }
+ else
{
assertNull(set);
}
@@ -1946,7 +1959,8 @@
if (nested == null)
{
return super.hashCode();
- } else
+ }
+ else
{
return 13 + nested.hashCode();
}
@@ -2258,7 +2272,8 @@
fqns.add(f);
loader.put(f, "k", "v");
}
- } else
+ }
+ else
{
loader.put(fqn, "k", "v");
}
@@ -2406,7 +2421,7 @@
return cache.getConfiguration().getRuntimeConfig().getTransactionManager();
}
-
+
public void testSetData() throws Exception
{
log.info("testSetData");
@@ -2422,7 +2437,7 @@
log.info("GET");
loaderMap = loader.get(key);
assertEquals(map, loaderMap);
-
+
assertNull(cache.get(key, "x"));
assertEquals("c", cache.get(key, "c"));
assertEquals("a", cache.get(key, "a"));
Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2009-01-05 17:29:33 UTC (rev 7377)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2009-01-05 17:35:41 UTC (rev 7378)
@@ -1,255 +1,315 @@
package org.jboss.cache.loader;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.CacheSPI;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.config.CacheLoaderConfig;
+import org.jboss.cache.config.Configuration;
+import org.jboss.cache.factories.UnitTestCacheConfigurationFactory;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
+import org.jboss.cache.loader.tcp.TcpCacheServer;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.Event;
+import org.jboss.cache.transaction.GlobalTransaction;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeClass;
import org.testng.annotations.Test;
+import javax.transaction.Synchronization;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
/**
* Tests the TcpDelegatingCacheLoader
*
* @author Bela Ban
* @version $Id$
*/
- at Test(groups = "functional", enabled = false, testName = "loader.TcpCacheLoaderTest")
-// TODO re-enable!!
-public class TcpCacheLoaderTest //extends CacheLoaderTestsBase
+ at Test(groups = "functional", enabled = true, testName = "loader.TcpCacheLoaderTest", sequential = true)
+public class TcpCacheLoaderTest extends CacheLoaderTestsBase
{
-// protected static final int CACHE_SERVER_RESTART_DELAY_MS = 1000;
-// protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 2000;
-// protected static int START_COUNT = 0;
-// static TcpCacheServer cacheServer = null;
-//
-// @BeforeClass
-// public static void startCacheServer()
-// {
-// final CountDownLatch startedSignal = new CountDownLatch(1);
-//
-// Thread t = new Thread()
-// {
-// public void run()
-// {
-// try
-// {
-// cacheServer = new TcpCacheServer();
-// cacheServer.setBindAddress("127.0.0.1");
-// cacheServer.setPort(12121);
-// Configuration config = UnitTestCacheConfigurationFactory.createConfiguration(Configuration.CacheMode.LOCAL, true);
-// CacheSPI cache = (CacheSPI) new DefaultCacheFactory<Object, Object>().createCache(config);
-// cacheServer.setCache(cache);
-// cacheServer.create();
-// cacheServer.start();
-// START_COUNT++;
-// startedSignal.countDown();
-// }
-// catch (Exception ex)
-// {
-// ex.printStackTrace();
-// }
-// }
-//
-// };
-// t.setDaemon(true);
-// t.start();
-//
-// // Wait for the cache server to start up.
-// boolean started = false;
-// try
-// {
-// started = startedSignal.await(120, TimeUnit.SECONDS);
-// }
-// catch (InterruptedException e)
-// {
-// // do nothing
-// }
-//
-// if (!started)
-// {
-// // the TcpCacheServer was unable to start up for some reason!!
-// throw new RuntimeException("Unable to start the TcpCacheServer after 120 seconds!!");
-// }
-// }
-//
-// @AfterClass
-// public static void stopCacheServer()
-// {
-// if (cacheServer != null)
-// {
-// cacheServer.stop();
-// }
-// }
-//
-// protected static void restartCacheServer()
-// {
-// stopCacheServer();
-// startCacheServer();
-// }
-//
-// @Override
-// public void testPartialLoadAndStore()
-// {
-// // do nothing
-// }
-//
-// @Override
-// public void testBuddyBackupStore()
-// {
-// // do nothing
-// }
-//
-// protected void configureCache() throws Exception
-// {
-// cache.getConfiguration().setCacheLoaderConfig(getSingleCacheLoaderConfig("",
-// TcpDelegatingCacheLoader.class.getName(),
-// "host=127.0.0.1\nport=12121\ntimeout=" + TCP_CACHE_LOADER_TIMEOUT_MS, false, true, false));
-// }
-//
-// // restart tests
-// public void testCacheServerRestartMidCall() throws Exception
-// {
-// CacheServerRestarter restarter = new CacheServerRestarter();
-// restarter.restart = true;
-// cache.addCacheListener(restarter);
-// int oldStartCount = START_COUNT;
-// // a restart of the cache server will happen before the cache loader interceptor is called.
-// cache.put(FQN, "key", "value");
-//
-// assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-// assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
-// }
-//
-// public void testCacheServerDelayedRestartMidCall() throws Exception
-// {
-// CacheServerRestarter restarter = new CacheServerRestarter();
-// restarter.restart = false;
-// restarter.delayedRestart = true;
-// restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
-// cache.addCacheListener(restarter);
-// int oldStartCount = START_COUNT;
-//
-// // the cache server will STOP before the cache laoder interceptor is called.
-// // it will be restarted in a separate thread, startAfter millis later.
-// // this should be less than the TcpCacheLoader timeout.
-// cache.put(FQN, "key", "value");
-//
-// assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-// assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
-// }
-//
-// public void testCacheServerTimeoutMidCall() throws Exception
-// {
-// CacheServerRestarter restarter = new CacheServerRestarter();
-// restarter.restart = false;
-// restarter.delayedRestart = true;
-// restarter.startAfter = -1;
-// cache.addCacheListener(restarter);
-// int oldStartCount = START_COUNT;
-//
-// // the cache server will STOP before the cache laoder interceptor is called.
-// // it will be restarted in a separate thread, startAfter millis later.
-// // this should be less than the TcpCacheLoader timeout.
-// try
-// {
-// cache.put(FQN, "key", "value");
-// assert false : "Should have failed";
-// }
-// catch (CacheException expected)
-// {
-//
-// }
-//
-// assert oldStartCount == START_COUNT : "Cache server should NOT have restarted!";
-// // start the TCP server again
-// startCacheServer();
-// assert loader.get(FQN) == null;
-// }
-//
-// public void testCacheServerRestartMidTransaction() throws Exception
-// {
-// int oldStartCount = START_COUNT;
-// cache.getTransactionManager().begin();
-// cache.put(FQN, "key", "value");
-// restartCacheServer();
-// cache.put(FQN, "key2", "value2");
-// cache.getTransactionManager().commit();
-//
-// Map m = new HashMap();
-// m.put("key", "value");
-// m.put("key2", "value2");
-//
-// assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-// assert loader.get(FQN).equals(m);
-// }
-//
-// public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
-// {
-// int oldStartCount = START_COUNT;
-// cache.getTransactionManager().begin();
-//
-// cache.put(FQN, "key", "value");
-// cache.put(FQN, "key2", "value2");
-//
-// GlobalTransaction gtx = cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
-// OrderedSynchronizationHandler osh = cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
-//
-//// OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
-// osh.registerAtTail(
-// new Synchronization()
-// {
-//
-// public void beforeCompletion()
-// {
-// // this will be called after the cache's prepare() phase. Restart the cache server.
-// restartCacheServer();
-// }
-//
-// public void afterCompletion(int i)
-// {
-// // do nothing
-// }
-// }
-// );
-//
-// cache.getTransactionManager().commit();
-//
-// Map m = new HashMap();
-// m.put("key", "value");
-// m.put("key2", "value2");
-//
-// assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
-// assert loader.get(FQN).equals(m);
-//
-// }
-//
-// @CacheListener
-// public static class CacheServerRestarter
-// {
-// boolean restart;
-// boolean delayedRestart;
-// int startAfter;
-//
-// @NodeCreated
-// public void restart(Event e)
-// {
-// if (e.isPre())
-// {
-// if (restart)
-// {
-// restartCacheServer();
-// }
-// else if (delayedRestart)
-// {
-// stopCacheServer();
-// new Thread()
-// {
-// public void run()
-// {
-// if (startAfter > 0)
-// {
-// TestingUtil.sleepThread(startAfter);
-// startCacheServer();
-// }
-// }
-// }.start();
-// }
-// }
-// }
-// }
+ protected static final String TCP_CACHE_SERVER_HOST = "127.0.0.1";
+ protected static final int TCP_CACHE_SERVER_PORT = 12121;
+ protected static final int CACHE_SERVER_RESTART_DELAY_MS = 250;
+ protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 1000;
+ protected static int START_COUNT = 0;
+ static volatile TcpCacheServer cacheServer = null;
+
+ @Override
+ @BeforeClass
+ public void preCreate()
+ {
+ if (cacheServer != null) stopCacheServer();
+ startCacheServer();
+ }
+
+ private static void startCacheServer()
+ {
+ final CountDownLatch startedSignal = new CountDownLatch(1);
+
+ Thread t = new Thread()
+ {
+ public void run()
+ {
+ try
+ {
+ cacheServer = new TcpCacheServer();
+ cacheServer.setBindAddress(TCP_CACHE_SERVER_HOST);
+ cacheServer.setPort(TCP_CACHE_SERVER_PORT);
+ Configuration config = UnitTestCacheConfigurationFactory.createConfiguration(Configuration.CacheMode.LOCAL, true);
+ // disable eviction!!
+ config.setEvictionConfig(null);
+ CacheSPI cache = (CacheSPI) new DefaultCacheFactory<Object, Object>().createCache(config);
+ cacheServer.setCache(cache);
+ cacheServer.create();
+ cacheServer.start();
+ START_COUNT++;
+ startedSignal.countDown();
+ }
+ catch (Exception ex)
+ {
+ ex.printStackTrace();
+ }
+ }
+
+ };
+ t.setDaemon(true);
+ t.start();
+
+ // Wait for the cache server to start up.
+ boolean started = false;
+ try
+ {
+ started = startedSignal.await(120, TimeUnit.SECONDS);
+ }
+ catch (InterruptedException e)
+ {
+ // do nothing
+ }
+
+ if (!started)
+ {
+ // the TcpCacheServer was unable to start up for some reason!!
+ throw new RuntimeException("Unable to start the TcpCacheServer after 120 seconds!!");
+ }
+ }
+
+ @AfterClass
+ public static void stopCacheServer()
+ {
+ if (cacheServer != null)
+ {
+ cacheServer.stop();
+ cacheServer = null;
+ }
+ }
+
+ @AfterMethod
+ public void removeRestarters()
+ {
+ if (cache != null)
+ {
+ Set<Object> restarters = new HashSet<Object>();
+ for (Object listener : cache.getCacheListeners())
+ {
+ if (listener instanceof CacheServerRestarter) restarters.add(listener);
+ }
+ try
+ {
+ for (Object restarter : restarters) cache.removeCacheListener(restarter);
+ }
+ catch (Exception ignored)
+ {
+ // ignored
+ }
+ }
+ }
+
+ protected static void restartCacheServer()
+ {
+ stopCacheServer();
+ startCacheServer();
+ }
+
+ @Override
+ public void testPartialLoadAndStore()
+ {
+ // do nothing
+ }
+
+ @Override
+ public void testBuddyBackupStore()
+ {
+ // do nothing
+ }
+
+ protected void configureCache(CacheSPI cache) throws Exception
+ {
+ CacheLoaderConfig clc = new CacheLoaderConfig();
+ TcpDelegatingCacheLoaderConfig tcpCfg = new TcpDelegatingCacheLoaderConfig(TCP_CACHE_SERVER_HOST, TCP_CACHE_SERVER_PORT, TCP_CACHE_LOADER_TIMEOUT_MS);
+ tcpCfg.setReconnectWaitTime(CACHE_SERVER_RESTART_DELAY_MS);
+ tcpCfg.setFetchPersistentState(false);
+ clc.addIndividualCacheLoaderConfig(tcpCfg);
+ cache.getConfiguration().setCacheLoaderConfig(clc);
+ }
+
+ // restart tests
+ public void testCacheServerRestartMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = true;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+ // a restart of the cache server will happen before the cache loader interceptor is called.
+ cache.put(FQN, "key", "value");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+ assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+ }
+
+ public void testCacheServerDelayedRestartMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = false;
+ restarter.delayedRestart = true;
+ restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+
+ // the cache server will STOP before the cache laoder interceptor is called.
+ // it will be restarted in a separate thread, startAfter millis later.
+ // this should be less than the TcpCacheLoader timeout.
+ cache.put(FQN, "key", "value");
+
+ assert oldStartCount < START_COUNT : "Cache server should have restarted! old = " + oldStartCount + " and count = " + START_COUNT;
+ assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+ }
+
+ public void testCacheServerTimeoutMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = false;
+ restarter.delayedRestart = true;
+ restarter.startAfter = -1;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+
+ // the cache server will STOP before the cache laoder interceptor is called.
+ // it will be restarted in a separate thread, startAfter millis later.
+ // this should be less than the TcpCacheLoader timeout.
+ try
+ {
+ cache.put(FQN, "key", "value");
+ assert false : "Should have failed";
+ }
+ catch (CacheException expected)
+ {
+
+ }
+
+ assert oldStartCount == START_COUNT : "Cache server should NOT have restarted!";
+ // start the TCP server again
+ startCacheServer();
+ assert loader.get(FQN) == null;
+ }
+
+ public void testCacheServerRestartMidTransaction() throws Exception
+ {
+ int oldStartCount = START_COUNT;
+ cache.getTransactionManager().begin();
+ cache.put(FQN, "key", "value");
+ restartCacheServer();
+ cache.put(FQN, "key2", "value2");
+ cache.getTransactionManager().commit();
+
+ Map m = new HashMap();
+ m.put("key", "value");
+ m.put("key2", "value2");
+
+ assert oldStartCount < START_COUNT : "Cache server should have restarted!";
+ assert loader.get(FQN).equals(m);
+ }
+
+ public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
+ {
+ int oldStartCount = START_COUNT;
+ cache.getTransactionManager().begin();
+
+ cache.put(FQN, "key", "value");
+ cache.put(FQN, "key2", "value2");
+
+ GlobalTransaction gtx = cache.getTransactionTable().get(cache.getTransactionManager().getTransaction());
+ OrderedSynchronizationHandler osh = cache.getTransactionTable().get(gtx).getOrderedSynchronizationHandler();
+
+// OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+ osh.registerAtTail(
+ new Synchronization()
+ {
+
+ public void beforeCompletion()
+ {
+ // this will be called after the cache's prepare() phase. Restart the cache server.
+ restartCacheServer();
+ }
+
+ public void afterCompletion(int i)
+ {
+ // do nothing
+ }
+ }
+ );
+
+ cache.getTransactionManager().commit();
+
+ Map m = new HashMap();
+ m.put("key", "value");
+ m.put("key2", "value2");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+ assert loader.get(FQN).equals(m);
+
+ }
+
+ @CacheListener
+ public static class CacheServerRestarter
+ {
+ boolean restart;
+ boolean delayedRestart;
+ int startAfter;
+
+ @NodeCreated
+ public void restart(Event e)
+ {
+ if (e.isPre())
+ {
+ if (restart)
+ {
+ restartCacheServer();
+ }
+ else if (delayedRestart)
+ {
+ stopCacheServer();
+ new Thread()
+ {
+ public void run()
+ {
+ if (startAfter > 0)
+ {
+ TestingUtil.sleepThread(startAfter);
+ startCacheServer();
+ }
+ }
+ }.start();
+ }
+ }
+ }
+ }
}
More information about the jbosscache-commits
mailing list