[jbosscache-commits] JBoss Cache SVN: r7463 - in core/trunk/src/main/java/org/jboss/cache: factories and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 14 09:49:42 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-01-14 09:49:42 -0500 (Wed, 14 Jan 2009)
New Revision: 7463

Added:
   core/trunk/src/main/java/org/jboss/cache/Lifecycle.java
   core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java
Modified:
   core/trunk/src/main/java/org/jboss/cache/Cache.java
   core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
   core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
   core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java
Log:
Added a Lifecycle interface and a SynchronizedRestarter class, threadsafe resource restarting for the tcp cache loader and server

Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -94,7 +94,7 @@
  * @since 2.0.0
  */
 @ThreadSafe
-public interface Cache<K, V>
+public interface Cache<K, V> extends Lifecycle
 {
    /**
     * Retrieves the configuration of this cache.

Added: core/trunk/src/main/java/org/jboss/cache/Lifecycle.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Lifecycle.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/Lifecycle.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -0,0 +1,17 @@
+package org.jboss.cache;
+
+/**
+ * Defines lifecycle operations for various components
+ *
+ * @author Manik Surtani
+ */
+public interface Lifecycle
+{
+   void create() throws Exception;
+
+   void start() throws Exception;
+
+   void stop();
+
+   void destroy();
+}

Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -26,6 +26,7 @@
 import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.CacheStatus;
+import org.jboss.cache.Lifecycle;
 import org.jboss.cache.Version;
 import org.jboss.cache.config.Configuration;
 import org.jboss.cache.config.ConfigurationException;
@@ -78,7 +79,7 @@
  * @since 2.1.0
  */
 @NonVolatile
-public class ComponentRegistry
+public class ComponentRegistry implements Lifecycle
 {
    /**
     * Contains class definitions of component factories that can be used to construct certain components
@@ -220,7 +221,9 @@
       if (old != null && old.instance.equals(component))
       {
          if (trace)
+         {
             log.trace("Attempting to register a component equal to one that already exists under the same name (" + name + ").  Not doing anything.");
+         }
          return;
       }
 
@@ -340,7 +343,9 @@
       if (defaultFactories == null) scanDefaultFactories();
       Class<? extends ComponentFactory> cfClass = defaultFactories.get(componentClass);
       if (cfClass == null)
+      {
          throw new ConfigurationException("No registered default factory for component " + componentClass + " found!");
+      }
       // a component factory is a component too!  See if one has been created and exists in the registry
       ComponentFactory cf = getComponent(cfClass);
       if (cf == null)
@@ -348,7 +353,9 @@
          // hasn't yet been created.  Create and put in registry
          cf = instantiateFactory(cfClass);
          if (cf == null)
+         {
             throw new ConfigurationException("Unable to locate component factory for component " + componentClass);
+         }
          // we simply register this factory.  Registration will take care of constructing any dependencies.
          registerComponent(cf, cfClass);
       }
@@ -356,7 +363,9 @@
       // ensure the component factory is in the STARTED state!
       Component c = componentLookup.get(cfClass.getName());
       if (c.instance != cf)
+      {
          throw new ConfigurationException("Component factory " + cfClass + " incorrectly registered!");
+      }
       return cf;
    }
 
@@ -417,7 +426,9 @@
    protected <T> T getFromConfiguration(Class<T> componentClass)
    {
       if (log.isDebugEnabled())
+      {
          log.debug("Looking in configuration for an instance of " + componentClass + " that may have been injected from an external source.");
+      }
       Method getter = BeanUtils.getterMethod(Configuration.class, componentClass);
       T returnValue = null;
 
@@ -569,9 +580,13 @@
       if (!state.createAllowed())
       {
          if (state.needToDestroyFailedCache())
+         {
             destroy();
+         }
          else
+         {
             return;
+         }
       }
 
       try
@@ -594,7 +609,9 @@
       if (!state.startAllowed())
       {
          if (state.needToDestroyFailedCache())
+         {
             destroy(); // this will take us back to DESTROYED
+         }
 
          if (state.needCreateBeforeStart())
          {
@@ -602,7 +619,9 @@
             createdInStart = true;
          }
          else
+         {
             return;
+         }
       }
 
       try
@@ -670,7 +689,9 @@
             }
          }
          else
+         {
             return;
+         }
       }
 
       try
@@ -698,13 +719,21 @@
    {
       state = CacheStatus.FAILED;
       if (t instanceof CacheException)
+      {
          throw (CacheException) t;
+      }
       else if (t instanceof RuntimeException)
+      {
          throw (RuntimeException) t;
+      }
       else if (t instanceof Error)
+      {
          throw (Error) t;
+      }
       else
+      {
          throw new CacheException(t);
+      }
    }
 
    /**
@@ -760,7 +789,9 @@
       if (registerShutdownHook)
       {
          if (log.isTraceEnabled())
+         {
             log.trace("Registering a shutdown hook.  Configured behavior = " + getConfiguration().getShutdownHookBehavior());
+         }
          shutdownHook = new Thread()
          {
             @Override
@@ -783,7 +814,9 @@
       else
       {
          if (log.isTraceEnabled())
+         {
             log.trace("Not registering a shutdown hook.  Configured behavior = " + getConfiguration().getShutdownHookBehavior());
+         }
       }
    }
 
@@ -892,7 +925,9 @@
 
       // check if we have started.
       if (!state.allowInvocations())
+      {
          throw new IllegalStateException("Cache not in STARTED state, even after waiting " + getConfiguration().getStateRetrievalTimeout() + " millis.");
+      }
    }
 
    /**

Modified: core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -24,6 +24,7 @@
 import net.jcip.annotations.ThreadSafe;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.Lifecycle;
 import org.jboss.cache.Modification;
 import org.jboss.cache.RegionManager;
 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
@@ -62,7 +63,7 @@
  * @since 2.0.0
  */
 @ThreadSafe
-public interface CacheLoader
+public interface CacheLoader extends Lifecycle
 {
    /**
     * Sets the configuration.  This is called before {@link #create()} and {@link #start()}.
@@ -341,29 +342,4 @@
     * @param manager the region manager to use, or <code>null</code>.
     */
    void setRegionManager(RegionManager manager);
-
-   /**
-    * Lifecycle method, called when the cache loader is created.
-    *
-    * @throws java.lang.Exception
-    */
-   void create() throws java.lang.Exception;
-
-   /**
-    * Lifecycle method, called when the cache loader is started.
-    *
-    * @throws java.lang.Exception
-    */
-   void start() throws java.lang.Exception;
-
-   /**
-    * Lifecycle method, called when the cache loader is stopped.
-    */
-   void stop();
-
-   /**
-    * Lifecycle method, called when the cache loader is destroyed.
-    */
-   void destroy();
-
 }

Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -28,6 +28,7 @@
 import org.jboss.cache.Modification;
 import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
 import org.jboss.cache.loader.tcp.TcpCacheOperations;
+import org.jboss.cache.util.concurrent.SynchronizedRestarter;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -64,6 +65,7 @@
    volatile ObjectOutputStream out;
    private static final Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class);
    private static final boolean trace = log.isTraceEnabled();
+   private final SynchronizedRestarter restarter = new SynchronizedRestarter();
    private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD, PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD, REMOVE_DATA_METHOD;
 
    static
@@ -141,16 +143,16 @@
                   // sleep 250 ms
                   if (log.isDebugEnabled()) log.debug("Caught IOException.  Retrying.", e);
                   Thread.sleep(config.getReconnectWaitTime());
-                  restart();
+                  restarter.restartComponent(this);
                }
-               catch (IOException e1)
-               {
-                  if (trace) log.trace("Unable to reconnect", e1);
-               }
                catch (InterruptedException e1)
                {
                   Thread.currentThread().interrupt();
                }
+               catch (Exception e1)
+               {
+                  if (trace) log.trace("Unable to reconnect", e1);
+               }
             }
             else
             {
@@ -436,12 +438,6 @@
       }
    }
 
-   protected void restart() throws IOException
-   {
-      stop();
-      start();
-   }
-
    @Override
    public void loadEntireState(ObjectOutputStream os) throws Exception
    {

Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -32,6 +32,8 @@
 import org.jboss.cache.Node;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.jmx.CacheJmxWrapperMBean;
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
+import org.jboss.cache.util.concurrent.SynchronizedRestarter;
 
 import java.io.BufferedInputStream;
 import java.io.BufferedOutputStream;
@@ -46,7 +48,6 @@
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -67,7 +68,8 @@
    private CacheJmxWrapperMBean wrapper;
    private String config;
    private boolean running = true;
-   private final List<Connection> conns = Collections.synchronizedList(new LinkedList<Connection>());
+   private final Set<Connection> conns = new ConcurrentHashSet<Connection>();
+   private final SynchronizedRestarter restarter = new SynchronizedRestarter();
    /**
     * whether or not to start the server thread as a daemon.  Should be false if started from the command line, true if started as an MBean.
     */
@@ -197,12 +199,11 @@
             {
                try
                {
-                  TcpCacheServer.this.stop();
-                  TcpCacheServer.this.start();
+                  restarter.restartComponent(TcpCacheServer.this);
                }
                catch (Exception e)
                {
-                  throw new CacheException("Caught exception trying to restart cache server", e);
+                  throw new CacheException("Unable to restart TcpCacheServer", e);
                }
             }
          }
@@ -220,10 +221,7 @@
          // Connection.close() removes conn from the list,
          // so copy off the list first to avoid ConcurrentModificationException
          List<Connection> copy = new ArrayList<Connection>(conns);
-         for (Connection conn : copy)
-         {
-            conn.close();
-         }
+         for (Connection conn : copy) conn.close();
          conns.clear();
       }
 

Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java	2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -22,6 +22,7 @@
 package org.jboss.cache.loader.tcp;
 
 import org.jboss.cache.Cache;
+import org.jboss.cache.Lifecycle;
 import org.jboss.cache.jmx.CacheJmxWrapperMBean;
 
 import java.net.UnknownHostException;
@@ -33,16 +34,8 @@
  * @author Brian Stansberry
  * @version $Id$
  */
-public interface TcpCacheServerMBean
+public interface TcpCacheServerMBean extends Lifecycle
 {
-   void create() throws Exception;
-
-   void start() throws Exception;
-
-   void stop();
-
-   void destroy();
-
    String getBindAddress();
 
    void setBindAddress(String bind_addr) throws UnknownHostException;

Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java	                        (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java	2009-01-14 14:49:42 UTC (rev 7463)
@@ -0,0 +1,71 @@
+package org.jboss.cache.util.concurrent;
+
+import org.jboss.cache.Lifecycle;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A class that handles restarts of components via multiple threads.  Specifically, if a component needs to be restarted
+ * and several threads may demand a restart but only one thread should be allowed to restart the component, then use this
+ * class.
+ * <p/>
+ * What this class guarantees is that several threads may come in while a component is being restarted, but they will
+ * block until the restart is complete.
+ * <p/>
+ * This is different from other techniques in that:
+ * <ul>
+ * <li>A simple compare-and-swap to check whether another thread is already performing a restart will result in the
+ * requesting thread returning immediately and potentially attempting to use the resource being restarted.</li>
+ * <li>A synchronized method or use of a lock would result in the thread waiting for the restart to complete, but on
+ * completion will attempt to restart the component again.</li>
+ * </ul>
+ * This implementation combines a compare-and-swap to detect a concurrent restart, as well as registering for notification
+ * for when the restart completes and then parking the thread if the CAS variable still indicates a restart in progress,
+ * and finally deregistering itself in the end.
+ *
+ * @author Manik Surtani
+ */
+public class SynchronizedRestarter
+{
+   private AtomicBoolean restartInProgress = new AtomicBoolean(false);
+   private ConcurrentHashSet<Thread> restartWaiters = new ConcurrentHashSet<Thread>();
+
+   public void restartComponent(Lifecycle component) throws Exception
+   {
+      // will only enter this block if no one else is restarting the socket
+      // and will atomically set the flag so others won't enter
+      if (restartInProgress.compareAndSet(false, true))
+      {
+         try
+         {
+            component.stop();
+            component.start();
+         }
+         finally
+         {
+            restartInProgress.set(false);
+            for (Thread waiter : restartWaiters)
+            {
+               try
+               {
+                  LockSupport.unpark(waiter);
+               }
+               catch (Throwable t)
+               {
+                  // do nothing; continue notifying the rest
+               }
+            }
+         }
+      }
+      else
+      {
+         // register interest in being notified after the restart
+         restartWaiters.add(Thread.currentThread());
+         // check again to ensure the restarting thread hasn't finished, then wait for that thread to finish
+         if (restartInProgress.get()) LockSupport.park();
+         // de-register interest in notification
+         restartWaiters.remove(Thread.currentThread());
+      }
+   }
+}




More information about the jbosscache-commits mailing list