Author: manik.surtani(a)jboss.com
Date: 2008-01-09 10:02:25 -0500 (Wed, 09 Jan 2008)
New Revision: 5104
Modified:
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java
core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
Log:
JBCACHE-1260 - TcpDelegatingCacheLoader to be made tolerant of TcpCacheServer restarts
Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-01-09
11:52:25 UTC (rev 5103)
+++
core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java 2008-01-09
15:02:25 UTC (rev 5104)
@@ -41,7 +41,6 @@
@Inject
void setRegionManager(RegionManager regionManager)
{
- if (trace) log.trace("Having region manager " + regionManager + "
injected.");
this.regionManager = regionManager;
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2008-01-09
11:52:25 UTC (rev 5103)
+++
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2008-01-09
15:02:25 UTC (rev 5104)
@@ -6,6 +6,9 @@
*/
package org.jboss.cache.loader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
import org.jboss.cache.Fqn;
import org.jboss.cache.Modification;
import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
@@ -16,7 +19,10 @@
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
import java.net.Socket;
+import java.net.SocketException;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,27 +47,31 @@
private TcpDelegatingCacheLoaderConfig config;
ObjectInputStream in;
ObjectOutputStream out;
+ private static Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class);
+ private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD,
PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD,
REMOVE_DATA_METHOD;
-
- /**
- * Default constructor.
- */
- public TcpDelegatingCacheLoader()
+ static
{
- // Empty.
- }
+ try
+ {
+ GET_CHILDREN_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_getChildrenNames",
Fqn.class);
+ GET_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_get",
Fqn.class);
+ EXISTS_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_exists", Fqn.class);
+ PUT_KEY_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class,
Object.class, Object.class);
+ PUT_DATA_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class, Map.class);
+ REMOVE_KEY_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class,
Object.class);
+ REMOVE_DATA_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_removeData", Fqn.class);
+ REMOVE_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class);
+ PUT_MODS_METHOD =
TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", List.class);
- /**
- * Allows programmatic configuration.
- *
- * @param host The host on which to look up the remote object.
- * @param port The port on which to look up the remote object.
- */
- public TcpDelegatingCacheLoader(String host, int port)
- {
- this.config = new TcpDelegatingCacheLoaderConfig(host, port);
+ }
+ catch (Exception e)
+ {
+ log.fatal("Unable to initialise reflection methods", e);
+ }
}
+
/**
* Allows configuration via XML config file.
*/
@@ -82,9 +92,105 @@
return config;
}
+ /**
+ * Invokes the specified Method with the specified parameters, catching
SocketExceptions and attempting to reconnect
+ * to the TcpCacheServer if necessary.
+ *
+ * @param m method to invoke
+ * @param params parameters
+ * @return method return value
+ */
+ protected Object invokeWithRetries(Method m, Object... params)
+ {
+ long endTime = System.currentTimeMillis() + config.getTimeout();
+ do
+ {
+ try
+ {
+ return m.invoke(this, params);
+ }
+ catch (IllegalAccessException e)
+ {
+ log.error("Should never get here!", e);
+ }
+ catch (InvocationTargetException e)
+ {
+ if (e.getCause() instanceof SocketException)
+ {
+ try
+ {
+ // sleep 250 ms
+ Thread.sleep(config.getReconnectWaitTime());
+ restart();
+ }
+ catch (IOException e1)
+ {
+ // IOException starting; sleep a bit and retry
+ }
+ catch (InterruptedException e1)
+ {
+ // do nothing
+ }
+ }
+ }
+ } while (System.currentTimeMillis() < endTime);
+ throw new CacheException("Unable to communicate with TCPCacheServer(" +
config.getHost() + ":" + config.getPort() + ") after " +
config.getTimeout() + " millis, with reconnects every " +
config.getReconnectWaitTime() + " millis.");
+ }
+
+ // ------------------ CacheLoader interface methods, which delegate to retry-aware
methods
+
public Set<?> getChildrenNames(Fqn fqn) throws Exception
{
- Set cn = null;
+
+ return (Set<?>) invokeWithRetries(GET_CHILDREN_METHOD, fqn);
+ }
+
+ public Map<Object, Object> get(Fqn name) throws Exception
+ {
+ return (Map<Object, Object>) invokeWithRetries(GET_METHOD, name);
+ }
+
+ public boolean exists(Fqn name) throws Exception
+ {
+ return (Boolean) invokeWithRetries(EXISTS_METHOD, name);
+ }
+
+ public Object put(Fqn name, Object key, Object value) throws Exception
+ {
+ return invokeWithRetries(PUT_KEY_METHOD, name, key, value);
+ }
+
+ public void put(Fqn name, Map<Object, Object> attributes) throws Exception
+ {
+ invokeWithRetries(PUT_DATA_METHOD, name, attributes);
+ }
+
+ @Override
+ public void put(List<Modification> modifications) throws Exception
+ {
+ invokeWithRetries(PUT_MODS_METHOD, modifications);
+ }
+
+ public Object remove(Fqn fqn, Object key) throws Exception
+ {
+ return invokeWithRetries(REMOVE_KEY_METHOD, fqn, key);
+ }
+
+ public void remove(Fqn fqn) throws Exception
+ {
+ invokeWithRetries(REMOVE_METHOD, fqn);
+ }
+
+ public void removeData(Fqn fqn) throws Exception
+ {
+ invokeWithRetries(REMOVE_DATA_METHOD, fqn);
+ }
+
+ // ------------------ Retry-aware CacheLoader interface method counterparts
+
+ protected Set<?> _getChildrenNames(Fqn fqn) throws Exception
+ {
+ Set cn;
synchronized (out)
{
out.reset();
@@ -104,7 +210,7 @@
return cn;
}
- public Map<Object, Object> get(Fqn name) throws Exception
+ protected Map<Object, Object> _get(Fqn name) throws Exception
{
synchronized (out)
{
@@ -122,7 +228,7 @@
}
}
- public boolean exists(Fqn name) throws Exception
+ protected boolean _exists(Fqn name) throws Exception
{
synchronized (out)
{
@@ -140,7 +246,7 @@
}
}
- public Object put(Fqn name, Object key, Object value) throws Exception
+ protected Object _put(Fqn name, Object key, Object value) throws Exception
{
synchronized (out)
{
@@ -160,7 +266,7 @@
}
}
- public void put(Fqn name, Map<Object, Object> attributes) throws Exception
+ protected void _put(Fqn name, Map<Object, Object> attributes) throws Exception
{
synchronized (out)
{
@@ -178,51 +284,8 @@
}
}
- @Override
- public void start() throws Exception
+ protected void _put(List<Modification> modifications) throws Exception
{
- init();
- }
-
- @Override
- public void stop()
- {
- try
- {
- if (in != null) in.close();
- }
- catch (IOException e)
- {
- }
- try
- {
- if (out != null) out.close();
- }
- catch (IOException e)
- {
- }
- try
- {
- if (sock != null) sock.close();
- }
- catch (IOException e)
- {
- }
- }
-
-
- private void init() throws IOException
- {
- sock = new Socket(config.getHost(), config.getPort());
- out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
- out.flush();
- in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
- }
-
-
- @Override
- public void put(List<Modification> modifications) throws Exception
- {
synchronized (out)
{
out.reset();
@@ -230,7 +293,7 @@
out.writeByte(TcpCacheOperations.PUT_LIST);
int length = modifications.size();
out.writeInt(length);
- for (Modification m : modifications)
+ for (Modification m : modifications)
{
m.writeExternal(out);
}
@@ -243,7 +306,7 @@
}
}
- public Object remove(Fqn fqn, Object key) throws Exception
+ protected Object _remove(Fqn fqn, Object key) throws Exception
{
synchronized (out)
{
@@ -262,7 +325,7 @@
}
}
- public void remove(Fqn fqn) throws Exception
+ protected void _remove(Fqn fqn) throws Exception
{
synchronized (out)
{
@@ -279,7 +342,7 @@
}
}
- public void removeData(Fqn fqn) throws Exception
+ protected void _removeData(Fqn fqn) throws Exception
{
synchronized (out)
{
@@ -296,7 +359,51 @@
}
}
+ // ----------------- Lifecycle and no-op methods
+
+
@Override
+ public void start() throws IOException
+ {
+ sock = new Socket(config.getHost(), config.getPort());
+ out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+ out.flush();
+ in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+ }
+
+ @Override
+ public void stop()
+ {
+ try
+ {
+ if (in != null) in.close();
+ }
+ catch (IOException e)
+ {
+ }
+ try
+ {
+ if (out != null) out.close();
+ }
+ catch (IOException e)
+ {
+ }
+ try
+ {
+ if (sock != null) sock.close();
+ }
+ catch (IOException e)
+ {
+ }
+ }
+
+ protected void restart() throws IOException
+ {
+ stop();
+ start();
+ }
+
+ @Override
public void loadEntireState(ObjectOutputStream os) throws Exception
{
throw new UnsupportedOperationException("operation is not currently supported
- need to define semantics first");
@@ -319,5 +426,4 @@
{
throw new UnsupportedOperationException("operation is not currently supported
- need to define semantics first");
}
-
}
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java
===================================================================
---
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java 2008-01-09
11:52:25 UTC (rev 5103)
+++
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java 2008-01-09
15:02:25 UTC (rev 5104)
@@ -13,6 +13,8 @@
private String host = "localhost";
private int port = 7500;
+ private int timeout = 5000;
+ private int reconnectWaitTime = 500;
public TcpDelegatingCacheLoaderConfig()
{
@@ -33,14 +35,16 @@
/**
* For use by {@link TcpDelegatingCacheLoader}.
*
- * @param host hostname of the delegate
- * @param port port the delegate is listening on
+ * @param host hostname of the delegate
+ * @param port port the delegate is listening on
+ * @param timeout after which to throw an IOException
*/
- TcpDelegatingCacheLoaderConfig(String host, int port)
+ TcpDelegatingCacheLoaderConfig(String host, int port, int timeout)
{
setClassName(TcpDelegatingCacheLoader.class.getName());
this.host = host;
this.port = port;
+ this.timeout = timeout;
}
public String getHost()
@@ -65,6 +69,29 @@
this.port = port;
}
+ public int getTimeout()
+ {
+ return timeout;
+ }
+
+ public void setTimeout(int timeout)
+ {
+ testImmutability("timeout");
+ this.timeout = timeout;
+ }
+
+ public int getReconnectWaitTime()
+ {
+ return reconnectWaitTime;
+ }
+
+ public void setReconnectWaitTime(int reconnectWaitTime)
+ {
+ testImmutability("reconnectWaitTime");
+ this.reconnectWaitTime = reconnectWaitTime;
+ }
+
+ @Override
public void setProperties(Properties props)
{
super.setProperties(props);
@@ -78,8 +105,21 @@
{
this.port = Integer.parseInt(s);
}
+
+ s = props.getProperty("timeout");
+ if (s != null && s.length() > 0)
+ {
+ this.timeout = Integer.parseInt(s);
+ }
+
+ s = props.getProperty("reconnectWaitTime");
+ if (s != null && s.length() > 0)
+ {
+ this.reconnectWaitTime = Integer.parseInt(s);
+ }
}
+ @Override
public boolean equals(Object obj)
{
if (obj instanceof TcpDelegatingCacheLoaderConfig &&
equalsExcludingProperties(obj))
@@ -87,16 +127,19 @@
TcpDelegatingCacheLoaderConfig other = (TcpDelegatingCacheLoaderConfig) obj;
return safeEquals(host, other.host)
- && (port == other.port);
+ && (port == other.port) && (timeout == other.timeout)
&& (reconnectWaitTime == other.reconnectWaitTime);
}
return false;
}
+ @Override
public int hashCode()
{
int result = hashCodeExcludingProperties();
result = 31 * result + (host == null ? 0 : host.hashCode());
result = 31 * result + port;
+ result = 31 * result + timeout;
+ result = 31 * result + reconnectWaitTime;
return result;
}
@@ -106,6 +149,4 @@
{
return (TcpDelegatingCacheLoaderConfig) super.clone();
}
-
-
}
\ No newline at end of file
Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2008-01-09
11:52:25 UTC (rev 5103)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java 2008-01-09
15:02:25 UTC (rev 5104)
@@ -1,21 +1,39 @@
package org.jboss.cache.loader;
+import org.jboss.cache.CacheException;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
import org.jboss.cache.loader.tcp.TcpCacheServer;
import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.Event;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+import javax.transaction.Synchronization;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
/**
* Tests the TcpDelegatingCacheLoader
*
* @author Bela Ban
* @version $Id$
*/
+@Test(groups = "functional")
public class TcpCacheLoaderTest extends CacheLoaderTestsBase
{
+ protected static final int CACHE_SERVER_RESTART_DELAY_MS = 1000;
+ protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 2000;
+ protected static int START_COUNT = 0;
static TcpCacheServer cache_server = null;
- static
+ @BeforeClass
+ public static void startCacheServer()
{
- Thread runner = new Thread()
+ Thread t = new Thread()
{
public void run()
{
@@ -25,9 +43,10 @@
cache_server = new TcpCacheServer();
cache_server.setBindAddress("127.0.0.1");
cache_server.setPort(12121);
- cache_server.setConfig("META-INF/local-service.xml"); // must be
in classpath (./etc/META-INF)
+ cache_server.setConfig("META-INF/local-service.xml"); // must be
in classpath
cache_server.create();
cache_server.start();
+ START_COUNT++;
}
catch (Exception ex)
{
@@ -35,41 +54,194 @@
}
}
};
+ t.setDaemon(true);
+ t.start();
+ // give the cache server 2 secs to start up
+ TestingUtil.sleepThread(2000);
+ }
- Runtime.getRuntime().addShutdownHook(new Thread()
+ @AfterClass
+ public static void stopCacheServer()
+ {
+ if (cache_server != null)
{
- public void run()
- {
- if (cache_server != null)
- {
- System.out.println("Stopping TcpCacheServer");
- cache_server.stop();
- }
- }
- });
+ System.out.println("Stopping TcpCacheServer");
+ cache_server.stop();
+ }
+ }
- runner.start();
+ protected static void restartCacheServer()
+ {
+ stopCacheServer();
+ startCacheServer();
}
+ @Override
public void testPartialLoadAndStore()
{
// do nothing
}
+ @Override
public void testBuddyBackupStore()
{
// do nothing
}
-
protected void configureCache() throws Exception
{
cache.getConfiguration().setCacheLoaderConfig(getSingleCacheLoaderConfig("",
- "org.jboss.cache.loader.TcpDelegatingCacheLoader",
- "host=127.0.0.1\nport=12121", false, true, false));
+ TcpDelegatingCacheLoader.class.getName(),
+ "host=127.0.0.1\nport=12121\ntimeout=" +
TCP_CACHE_LOADER_TIMEOUT_MS, false, true, false));
// give the tcp cache server time to start up
- TestingUtil.sleepThread(2000);
+ //TestingUtil.sleepThread(2000);
}
+ // restart tests
+
+ public void testCacheServerRestartMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = true;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+ // a restart of the cache server will happen before the cache loader interceptor is
called.
+ cache.put(FQN, "key", "value");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have
restarted!";
+ assert loader.get(FQN).equals(Collections.singletonMap("key",
"value"));
+ }
+
+ public void testCacheServerDelayedRestartMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = false;
+ restarter.delayedRestart = true;
+ restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+
+ // the cache server will STOP before the cache laoder interceptor is called.
+ // it will be restarted in a separate thread, startAfter millis later.
+ // this should be less than the TcpCacheLoader timeout.
+ cache.put(FQN, "key", "value");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have
restarted!";
+ assert loader.get(FQN).equals(Collections.singletonMap("key",
"value"));
+ }
+
+ public void testCacheServerTimeoutMidCall() throws Exception
+ {
+ CacheServerRestarter restarter = new CacheServerRestarter();
+ restarter.restart = false;
+ restarter.delayedRestart = true;
+ restarter.startAfter = -1;
+ cache.addCacheListener(restarter);
+ int oldStartCount = START_COUNT;
+
+ // the cache server will STOP before the cache laoder interceptor is called.
+ // it will be restarted in a separate thread, startAfter millis later.
+ // this should be less than the TcpCacheLoader timeout.
+ try
+ {
+ cache.put(FQN, "key", "value");
+ assert false : "Should have failed";
+ }
+ catch (CacheException expected)
+ {
+
+ }
+
+ assert oldStartCount == START_COUNT : "Cache server should NOT have
restarted!";
+ // start the TCP server again
+ startCacheServer();
+ assert loader.get(FQN) == null;
+ }
+
+ public void testCacheServerRestartMidTransaction() throws Exception
+ {
+ int oldStartCount = START_COUNT;
+ cache.getTransactionManager().begin();
+ cache.put(FQN, "key", "value");
+ restartCacheServer();
+ cache.put(FQN, "key2", "value2");
+ cache.getTransactionManager().commit();
+
+ Map m = new HashMap();
+ m.put("key", "value");
+ m.put("key2", "value2");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have
restarted!";
+ assert loader.get(FQN).equals(m);
+ }
+
+ public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
+ {
+ int oldStartCount = START_COUNT;
+ cache.getTransactionManager().begin();
+
OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+ new Synchronization()
+ {
+
+ public void beforeCompletion()
+ {
+ // this will be called after the cache's prepare() phase. Restart
the cache server.
+ restartCacheServer();
+ }
+
+ public void afterCompletion(int i)
+ {
+ // do nothing
+ }
+ }
+ );
+
+ cache.put(FQN, "key", "value");
+ cache.put(FQN, "key2", "value2");
+ cache.getTransactionManager().commit();
+
+ Map m = new HashMap();
+ m.put("key", "value");
+ m.put("key2", "value2");
+
+ assert oldStartCount + 1 == START_COUNT : "Cache server should have
restarted!";
+ assert loader.get(FQN).equals(m);
+
+ }
+
+ @CacheListener
+ public static class CacheServerRestarter
+ {
+ boolean restart;
+ boolean delayedRestart;
+ int startAfter;
+
+ @NodeCreated
+ public void restart(Event e)
+ {
+ if (e.isPre())
+ {
+ if (restart)
+ {
+ restartCacheServer();
+ }
+ else if (delayedRestart)
+ {
+ stopCacheServer();
+ new Thread()
+ {
+ public void run()
+ {
+ if (startAfter > 0)
+ {
+ TestingUtil.sleepThread(startAfter);
+ startCacheServer();
+ }
+ }
+ }.start();
+ }
+ }
+ }
+ }
}
\ No newline at end of file