[jbosscache-commits] JBoss Cache SVN: r5104 - in core/trunk/src: main/java/org/jboss/cache/loader and 1 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 9 10:02:25 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-01-09 10:02:25 -0500 (Wed, 09 Jan 2008)
New Revision: 5104

Modified:
   core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java
   core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
Log:
JBCACHE-1260 - TcpDelegatingCacheLoader to be made tolerant of TcpCacheServer restarts

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java	2008-01-09 11:52:25 UTC (rev 5103)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/EvictionInterceptor.java	2008-01-09 15:02:25 UTC (rev 5104)
@@ -41,7 +41,6 @@
    @Inject
    void setRegionManager(RegionManager regionManager)
    {
-      if (trace) log.trace("Having region manager " + regionManager + " injected.");
       this.regionManager = regionManager;
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2008-01-09 11:52:25 UTC (rev 5103)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2008-01-09 15:02:25 UTC (rev 5104)
@@ -6,6 +6,9 @@
  */
 package org.jboss.cache.loader;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.CacheException;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Modification;
 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
@@ -16,7 +19,10 @@
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.io.ObjectOutputStream;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
 import java.net.Socket;
+import java.net.SocketException;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -41,27 +47,31 @@
    private TcpDelegatingCacheLoaderConfig config;
    ObjectInputStream in;
    ObjectOutputStream out;
+   private static Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class);
+   private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD, PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD, REMOVE_DATA_METHOD;
 
-
-   /**
-    * Default constructor.
-    */
-   public TcpDelegatingCacheLoader()
+   static
    {
-      // Empty.
-   }
+      try
+      {
+         GET_CHILDREN_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_getChildrenNames", Fqn.class);
+         GET_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_get", Fqn.class);
+         EXISTS_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_exists", Fqn.class);
+         PUT_KEY_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class, Object.class, Object.class);
+         PUT_DATA_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", Fqn.class, Map.class);
+         REMOVE_KEY_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class, Object.class);
+         REMOVE_DATA_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_removeData", Fqn.class);
+         REMOVE_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_remove", Fqn.class);
+         PUT_MODS_METHOD = TcpDelegatingCacheLoader.class.getDeclaredMethod("_put", List.class);
 
-   /**
-    * Allows programmatic configuration.
-    *
-    * @param host The host on which to look up the remote object.
-    * @param port The port on which to look up the remote object.
-    */
-   public TcpDelegatingCacheLoader(String host, int port)
-   {
-      this.config = new TcpDelegatingCacheLoaderConfig(host, port);
+      }
+      catch (Exception e)
+      {
+         log.fatal("Unable to initialise reflection methods", e);
+      }
    }
 
+
    /**
     * Allows configuration via XML config file.
     */
@@ -82,9 +92,105 @@
       return config;
    }
 
+   /**
+    * Invokes the specified Method with the specified parameters, catching SocketExceptions and attempting to reconnect
+    * to the TcpCacheServer if necessary.
+    *
+    * @param m      method to invoke
+    * @param params parameters
+    * @return method return value
+    */
+   protected Object invokeWithRetries(Method m, Object... params)
+   {
+      long endTime = System.currentTimeMillis() + config.getTimeout();
+      do
+      {
+         try
+         {
+            return m.invoke(this, params);
+         }
+         catch (IllegalAccessException e)
+         {
+            log.error("Should never get here!", e);
+         }
+         catch (InvocationTargetException e)
+         {
+            if (e.getCause() instanceof SocketException)
+            {
+               try
+               {
+                  // sleep 250 ms
+                  Thread.sleep(config.getReconnectWaitTime());
+                  restart();
+               }
+               catch (IOException e1)
+               {
+                  // IOException starting; sleep a bit and retry
+               }
+               catch (InterruptedException e1)
+               {
+                  // do nothing
+               }
+            }
+         }
+      } while (System.currentTimeMillis() < endTime);
+      throw new CacheException("Unable to communicate with TCPCacheServer(" + config.getHost() + ":" + config.getPort() + ") after " + config.getTimeout() + " millis, with reconnects every " + config.getReconnectWaitTime() + " millis.");
+   }
+
+   // ------------------ CacheLoader interface methods, which delegate to retry-aware methods
+
    public Set<?> getChildrenNames(Fqn fqn) throws Exception
    {
-      Set cn = null;
+
+      return (Set<?>) invokeWithRetries(GET_CHILDREN_METHOD, fqn);
+   }
+
+   public Map<Object, Object> get(Fqn name) throws Exception
+   {
+      return (Map<Object, Object>) invokeWithRetries(GET_METHOD, name);
+   }
+
+   public boolean exists(Fqn name) throws Exception
+   {
+      return (Boolean) invokeWithRetries(EXISTS_METHOD, name);
+   }
+
+   public Object put(Fqn name, Object key, Object value) throws Exception
+   {
+      return invokeWithRetries(PUT_KEY_METHOD, name, key, value);
+   }
+
+   public void put(Fqn name, Map<Object, Object> attributes) throws Exception
+   {
+      invokeWithRetries(PUT_DATA_METHOD, name, attributes);
+   }
+
+   @Override
+   public void put(List<Modification> modifications) throws Exception
+   {
+      invokeWithRetries(PUT_MODS_METHOD, modifications);
+   }
+
+   public Object remove(Fqn fqn, Object key) throws Exception
+   {
+      return invokeWithRetries(REMOVE_KEY_METHOD, fqn, key);
+   }
+
+   public void remove(Fqn fqn) throws Exception
+   {
+      invokeWithRetries(REMOVE_METHOD, fqn);
+   }
+
+   public void removeData(Fqn fqn) throws Exception
+   {
+      invokeWithRetries(REMOVE_DATA_METHOD, fqn);
+   }
+
+   // ------------------ Retry-aware CacheLoader interface method counterparts
+
+   protected Set<?> _getChildrenNames(Fqn fqn) throws Exception
+   {
+      Set cn;
       synchronized (out)
       {
          out.reset();
@@ -104,7 +210,7 @@
       return cn;
    }
 
-   public Map<Object, Object> get(Fqn name) throws Exception
+   protected Map<Object, Object> _get(Fqn name) throws Exception
    {
       synchronized (out)
       {
@@ -122,7 +228,7 @@
       }
    }
 
-   public boolean exists(Fqn name) throws Exception
+   protected boolean _exists(Fqn name) throws Exception
    {
       synchronized (out)
       {
@@ -140,7 +246,7 @@
       }
    }
 
-   public Object put(Fqn name, Object key, Object value) throws Exception
+   protected Object _put(Fqn name, Object key, Object value) throws Exception
    {
       synchronized (out)
       {
@@ -160,7 +266,7 @@
       }
    }
 
-   public void put(Fqn name, Map<Object, Object> attributes) throws Exception
+   protected void _put(Fqn name, Map<Object, Object> attributes) throws Exception
    {
       synchronized (out)
       {
@@ -178,51 +284,8 @@
       }
    }
 
-   @Override
-   public void start() throws Exception
+   protected void _put(List<Modification> modifications) throws Exception
    {
-      init();
-   }
-
-   @Override
-   public void stop()
-   {
-      try
-      {
-         if (in != null) in.close();
-      }
-      catch (IOException e)
-      {
-      }
-      try
-      {
-         if (out != null) out.close();
-      }
-      catch (IOException e)
-      {
-      }
-      try
-      {
-         if (sock != null) sock.close();
-      }
-      catch (IOException e)
-      {
-      }
-   }
-
-
-   private void init() throws IOException
-   {
-      sock = new Socket(config.getHost(), config.getPort());
-      out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
-      out.flush();
-      in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
-   }
-
-
-   @Override
-   public void put(List<Modification> modifications) throws Exception
-   {
       synchronized (out)
       {
          out.reset();
@@ -230,7 +293,7 @@
          out.writeByte(TcpCacheOperations.PUT_LIST);
          int length = modifications.size();
          out.writeInt(length);
-         for (Modification m : modifications) 
+         for (Modification m : modifications)
          {
             m.writeExternal(out);
          }
@@ -243,7 +306,7 @@
       }
    }
 
-   public Object remove(Fqn fqn, Object key) throws Exception
+   protected Object _remove(Fqn fqn, Object key) throws Exception
    {
       synchronized (out)
       {
@@ -262,7 +325,7 @@
       }
    }
 
-   public void remove(Fqn fqn) throws Exception
+   protected void _remove(Fqn fqn) throws Exception
    {
       synchronized (out)
       {
@@ -279,7 +342,7 @@
       }
    }
 
-   public void removeData(Fqn fqn) throws Exception
+   protected void _removeData(Fqn fqn) throws Exception
    {
       synchronized (out)
       {
@@ -296,7 +359,51 @@
       }
    }
 
+   // ----------------- Lifecycle and no-op methods
+
+
    @Override
+   public void start() throws IOException
+   {
+      sock = new Socket(config.getHost(), config.getPort());
+      out = new ObjectOutputStream(new BufferedOutputStream(sock.getOutputStream()));
+      out.flush();
+      in = new ObjectInputStream(new BufferedInputStream(sock.getInputStream()));
+   }
+
+   @Override
+   public void stop()
+   {
+      try
+      {
+         if (in != null) in.close();
+      }
+      catch (IOException e)
+      {
+      }
+      try
+      {
+         if (out != null) out.close();
+      }
+      catch (IOException e)
+      {
+      }
+      try
+      {
+         if (sock != null) sock.close();
+      }
+      catch (IOException e)
+      {
+      }
+   }
+
+   protected void restart() throws IOException
+   {
+      stop();
+      start();
+   }
+
+   @Override
    public void loadEntireState(ObjectOutputStream os) throws Exception
    {
       throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
@@ -319,5 +426,4 @@
    {
       throw new UnsupportedOperationException("operation is not currently supported - need to define semantics first");
    }
-
 }

Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java	2008-01-09 11:52:25 UTC (rev 5103)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoaderConfig.java	2008-01-09 15:02:25 UTC (rev 5104)
@@ -13,6 +13,8 @@
 
    private String host = "localhost";
    private int port = 7500;
+   private int timeout = 5000;
+   private int reconnectWaitTime = 500;
 
    public TcpDelegatingCacheLoaderConfig()
    {
@@ -33,14 +35,16 @@
    /**
     * For use by {@link TcpDelegatingCacheLoader}.
     *
-    * @param host hostname of the delegate
-    * @param port port the delegate is listening on
+    * @param host    hostname of the delegate
+    * @param port    port the delegate is listening on
+    * @param timeout after which to throw an IOException
     */
-   TcpDelegatingCacheLoaderConfig(String host, int port)
+   TcpDelegatingCacheLoaderConfig(String host, int port, int timeout)
    {
       setClassName(TcpDelegatingCacheLoader.class.getName());
       this.host = host;
       this.port = port;
+      this.timeout = timeout;
    }
 
    public String getHost()
@@ -65,6 +69,29 @@
       this.port = port;
    }
 
+   public int getTimeout()
+   {
+      return timeout;
+   }
+
+   public void setTimeout(int timeout)
+   {
+      testImmutability("timeout");
+      this.timeout = timeout;
+   }
+
+   public int getReconnectWaitTime()
+   {
+      return reconnectWaitTime;
+   }
+
+   public void setReconnectWaitTime(int reconnectWaitTime)
+   {
+      testImmutability("reconnectWaitTime");
+      this.reconnectWaitTime = reconnectWaitTime;
+   }
+
+   @Override
    public void setProperties(Properties props)
    {
       super.setProperties(props);
@@ -78,8 +105,21 @@
       {
          this.port = Integer.parseInt(s);
       }
+
+      s = props.getProperty("timeout");
+      if (s != null && s.length() > 0)
+      {
+         this.timeout = Integer.parseInt(s);
+      }
+
+      s = props.getProperty("reconnectWaitTime");
+      if (s != null && s.length() > 0)
+      {
+         this.reconnectWaitTime = Integer.parseInt(s);
+      }
    }
 
+   @Override
    public boolean equals(Object obj)
    {
       if (obj instanceof TcpDelegatingCacheLoaderConfig && equalsExcludingProperties(obj))
@@ -87,16 +127,19 @@
          TcpDelegatingCacheLoaderConfig other = (TcpDelegatingCacheLoaderConfig) obj;
 
          return safeEquals(host, other.host)
-                 && (port == other.port);
+               && (port == other.port) && (timeout == other.timeout) && (reconnectWaitTime == other.reconnectWaitTime);
       }
       return false;
    }
 
+   @Override
    public int hashCode()
    {
       int result = hashCodeExcludingProperties();
       result = 31 * result + (host == null ? 0 : host.hashCode());
       result = 31 * result + port;
+      result = 31 * result + timeout;
+      result = 31 * result + reconnectWaitTime;
 
       return result;
    }
@@ -106,6 +149,4 @@
    {
       return (TcpDelegatingCacheLoaderConfig) super.clone();
    }
-
-
 }
\ No newline at end of file

Modified: core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2008-01-09 11:52:25 UTC (rev 5103)
+++ core/trunk/src/test/java/org/jboss/cache/loader/TcpCacheLoaderTest.java	2008-01-09 15:02:25 UTC (rev 5104)
@@ -1,21 +1,39 @@
 package org.jboss.cache.loader;
 
+import org.jboss.cache.CacheException;
+import org.jboss.cache.interceptors.OrderedSynchronizationHandler;
 import org.jboss.cache.loader.tcp.TcpCacheServer;
 import org.jboss.cache.misc.TestingUtil;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.Event;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
 
+import javax.transaction.Synchronization;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
 /**
  * Tests the TcpDelegatingCacheLoader
  *
  * @author Bela Ban
  * @version $Id$
  */
+ at Test(groups = "functional")
 public class TcpCacheLoaderTest extends CacheLoaderTestsBase
 {
+   protected static final int CACHE_SERVER_RESTART_DELAY_MS = 1000;
+   protected static final int TCP_CACHE_LOADER_TIMEOUT_MS = 2000;
+   protected static int START_COUNT = 0;
    static TcpCacheServer cache_server = null;
 
-   static
+   @BeforeClass
+   public static void startCacheServer()
    {
-      Thread runner = new Thread()
+      Thread t = new Thread()
       {
          public void run()
          {
@@ -25,9 +43,10 @@
                cache_server = new TcpCacheServer();
                cache_server.setBindAddress("127.0.0.1");
                cache_server.setPort(12121);
-               cache_server.setConfig("META-INF/local-service.xml"); // must be in classpath (./etc/META-INF)
+               cache_server.setConfig("META-INF/local-service.xml"); // must be in classpath
                cache_server.create();
                cache_server.start();
+               START_COUNT++;
             }
             catch (Exception ex)
             {
@@ -35,41 +54,194 @@
             }
          }
       };
+      t.setDaemon(true);
+      t.start();
+      // give the cache server 2 secs to start up
+      TestingUtil.sleepThread(2000);
+   }
 
-      Runtime.getRuntime().addShutdownHook(new Thread()
+   @AfterClass
+   public static void stopCacheServer()
+   {
+      if (cache_server != null)
       {
-         public void run()
-         {
-            if (cache_server != null)
-            {
-               System.out.println("Stopping TcpCacheServer");
-               cache_server.stop();
-            }
-         }
-      });
+         System.out.println("Stopping TcpCacheServer");
+         cache_server.stop();
+      }
+   }
 
-      runner.start();
+   protected static void restartCacheServer()
+   {
+      stopCacheServer();
+      startCacheServer();
    }
 
+   @Override
    public void testPartialLoadAndStore()
    {
       // do nothing
    }
 
+   @Override
    public void testBuddyBackupStore()
    {
       // do nothing
    }
 
-
    protected void configureCache() throws Exception
    {
       cache.getConfiguration().setCacheLoaderConfig(getSingleCacheLoaderConfig("",
-              "org.jboss.cache.loader.TcpDelegatingCacheLoader",
-              "host=127.0.0.1\nport=12121", false, true, false));
+            TcpDelegatingCacheLoader.class.getName(),
+            "host=127.0.0.1\nport=12121\ntimeout=" + TCP_CACHE_LOADER_TIMEOUT_MS, false, true, false));
 
       // give the tcp cache server time to start up
-      TestingUtil.sleepThread(2000);
+      //TestingUtil.sleepThread(2000);
    }
 
+   // restart tests
+
+   public void testCacheServerRestartMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = true;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+      // a restart of the cache server will happen before the cache loader interceptor is called.
+      cache.put(FQN, "key", "value");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+   }
+
+   public void testCacheServerDelayedRestartMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = false;
+      restarter.delayedRestart = true;
+      restarter.startAfter = CACHE_SERVER_RESTART_DELAY_MS;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+
+      // the cache server will STOP before the cache laoder interceptor is called.
+      // it will be restarted in a separate thread, startAfter millis later.
+      // this should be less than the TcpCacheLoader timeout.
+      cache.put(FQN, "key", "value");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(Collections.singletonMap("key", "value"));
+   }
+
+   public void testCacheServerTimeoutMidCall() throws Exception
+   {
+      CacheServerRestarter restarter = new CacheServerRestarter();
+      restarter.restart = false;
+      restarter.delayedRestart = true;
+      restarter.startAfter = -1;
+      cache.addCacheListener(restarter);
+      int oldStartCount = START_COUNT;
+
+      // the cache server will STOP before the cache laoder interceptor is called.
+      // it will be restarted in a separate thread, startAfter millis later.
+      // this should be less than the TcpCacheLoader timeout.
+      try
+      {
+         cache.put(FQN, "key", "value");
+         assert false : "Should have failed";
+      }
+      catch (CacheException expected)
+      {
+
+      }
+
+      assert oldStartCount == START_COUNT : "Cache server should NOT have restarted!";
+      // start the TCP server again
+      startCacheServer();
+      assert loader.get(FQN) == null;
+   }
+
+   public void testCacheServerRestartMidTransaction() throws Exception
+   {
+      int oldStartCount = START_COUNT;
+      cache.getTransactionManager().begin();
+      cache.put(FQN, "key", "value");
+      restartCacheServer();
+      cache.put(FQN, "key2", "value2");
+      cache.getTransactionManager().commit();
+
+      Map m = new HashMap();
+      m.put("key", "value");
+      m.put("key2", "value2");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(m);
+   }
+
+   public void testCacheServerRestartMidTransactionAfterPrepare() throws Exception
+   {
+      int oldStartCount = START_COUNT;
+      cache.getTransactionManager().begin();
+      OrderedSynchronizationHandler.getInstance(cache.getTransactionManager().getTransaction()).registerAtTail(
+            new Synchronization()
+            {
+
+               public void beforeCompletion()
+               {
+                  // this will be called after the cache's prepare() phase.  Restart the cache server.
+                  restartCacheServer();
+               }
+
+               public void afterCompletion(int i)
+               {
+                  // do nothing
+               }
+            }
+      );
+
+      cache.put(FQN, "key", "value");
+      cache.put(FQN, "key2", "value2");
+      cache.getTransactionManager().commit();
+
+      Map m = new HashMap();
+      m.put("key", "value");
+      m.put("key2", "value2");
+
+      assert oldStartCount + 1 == START_COUNT : "Cache server should have restarted!";
+      assert loader.get(FQN).equals(m);
+
+   }
+
+   @CacheListener
+   public static class CacheServerRestarter
+   {
+      boolean restart;
+      boolean delayedRestart;
+      int startAfter;
+
+      @NodeCreated
+      public void restart(Event e)
+      {
+         if (e.isPre())
+         {
+            if (restart)
+            {
+               restartCacheServer();
+            }
+            else if (delayedRestart)
+            {
+               stopCacheServer();
+               new Thread()
+               {
+                  public void run()
+                  {
+                     if (startAfter > 0)
+                     {
+                        TestingUtil.sleepThread(startAfter);
+                        startCacheServer();
+                     }
+                  }
+               }.start();
+            }
+         }
+      }
+   }
 }
\ No newline at end of file




More information about the jbosscache-commits mailing list