[jbosscache-commits] JBoss Cache SVN: r7463 - in core/trunk/src/main/java/org/jboss/cache: factories and 3 other directories.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Wed Jan 14 09:49:42 EST 2009
Author: manik.surtani at jboss.com
Date: 2009-01-14 09:49:42 -0500 (Wed, 14 Jan 2009)
New Revision: 7463
Added:
core/trunk/src/main/java/org/jboss/cache/Lifecycle.java
core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java
Modified:
core/trunk/src/main/java/org/jboss/cache/Cache.java
core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java
Log:
Added a Lifecycle interface and a SynchronizedRestarter class, threadsafe resource restarting for the tcp cache loader and server
Modified: core/trunk/src/main/java/org/jboss/cache/Cache.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Cache.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/Cache.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -94,7 +94,7 @@
* @since 2.0.0
*/
@ThreadSafe
-public interface Cache<K, V>
+public interface Cache<K, V> extends Lifecycle
{
/**
* Retrieves the configuration of this cache.
Added: core/trunk/src/main/java/org/jboss/cache/Lifecycle.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/Lifecycle.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/Lifecycle.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -0,0 +1,17 @@
+package org.jboss.cache;
+
+/**
+ * Defines lifecycle operations for various components
+ *
+ * @author Manik Surtani
+ */
+public interface Lifecycle
+{
+ void create() throws Exception;
+
+ void start() throws Exception;
+
+ void stop();
+
+ void destroy();
+}
Modified: core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/factories/ComponentRegistry.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -26,6 +26,7 @@
import org.jboss.cache.CacheException;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.CacheStatus;
+import org.jboss.cache.Lifecycle;
import org.jboss.cache.Version;
import org.jboss.cache.config.Configuration;
import org.jboss.cache.config.ConfigurationException;
@@ -78,7 +79,7 @@
* @since 2.1.0
*/
@NonVolatile
-public class ComponentRegistry
+public class ComponentRegistry implements Lifecycle
{
/**
* Contains class definitions of component factories that can be used to construct certain components
@@ -220,7 +221,9 @@
if (old != null && old.instance.equals(component))
{
if (trace)
+ {
log.trace("Attempting to register a component equal to one that already exists under the same name (" + name + "). Not doing anything.");
+ }
return;
}
@@ -340,7 +343,9 @@
if (defaultFactories == null) scanDefaultFactories();
Class<? extends ComponentFactory> cfClass = defaultFactories.get(componentClass);
if (cfClass == null)
+ {
throw new ConfigurationException("No registered default factory for component " + componentClass + " found!");
+ }
// a component factory is a component too! See if one has been created and exists in the registry
ComponentFactory cf = getComponent(cfClass);
if (cf == null)
@@ -348,7 +353,9 @@
// hasn't yet been created. Create and put in registry
cf = instantiateFactory(cfClass);
if (cf == null)
+ {
throw new ConfigurationException("Unable to locate component factory for component " + componentClass);
+ }
// we simply register this factory. Registration will take care of constructing any dependencies.
registerComponent(cf, cfClass);
}
@@ -356,7 +363,9 @@
// ensure the component factory is in the STARTED state!
Component c = componentLookup.get(cfClass.getName());
if (c.instance != cf)
+ {
throw new ConfigurationException("Component factory " + cfClass + " incorrectly registered!");
+ }
return cf;
}
@@ -417,7 +426,9 @@
protected <T> T getFromConfiguration(Class<T> componentClass)
{
if (log.isDebugEnabled())
+ {
log.debug("Looking in configuration for an instance of " + componentClass + " that may have been injected from an external source.");
+ }
Method getter = BeanUtils.getterMethod(Configuration.class, componentClass);
T returnValue = null;
@@ -569,9 +580,13 @@
if (!state.createAllowed())
{
if (state.needToDestroyFailedCache())
+ {
destroy();
+ }
else
+ {
return;
+ }
}
try
@@ -594,7 +609,9 @@
if (!state.startAllowed())
{
if (state.needToDestroyFailedCache())
+ {
destroy(); // this will take us back to DESTROYED
+ }
if (state.needCreateBeforeStart())
{
@@ -602,7 +619,9 @@
createdInStart = true;
}
else
+ {
return;
+ }
}
try
@@ -670,7 +689,9 @@
}
}
else
+ {
return;
+ }
}
try
@@ -698,13 +719,21 @@
{
state = CacheStatus.FAILED;
if (t instanceof CacheException)
+ {
throw (CacheException) t;
+ }
else if (t instanceof RuntimeException)
+ {
throw (RuntimeException) t;
+ }
else if (t instanceof Error)
+ {
throw (Error) t;
+ }
else
+ {
throw new CacheException(t);
+ }
}
/**
@@ -760,7 +789,9 @@
if (registerShutdownHook)
{
if (log.isTraceEnabled())
+ {
log.trace("Registering a shutdown hook. Configured behavior = " + getConfiguration().getShutdownHookBehavior());
+ }
shutdownHook = new Thread()
{
@Override
@@ -783,7 +814,9 @@
else
{
if (log.isTraceEnabled())
+ {
log.trace("Not registering a shutdown hook. Configured behavior = " + getConfiguration().getShutdownHookBehavior());
+ }
}
}
@@ -892,7 +925,9 @@
// check if we have started.
if (!state.allowInvocations())
+ {
throw new IllegalStateException("Cache not in STARTED state, even after waiting " + getConfiguration().getStateRetrievalTimeout() + " millis.");
+ }
}
/**
Modified: core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/CacheLoader.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -24,6 +24,7 @@
import net.jcip.annotations.ThreadSafe;
import org.jboss.cache.CacheSPI;
import org.jboss.cache.Fqn;
+import org.jboss.cache.Lifecycle;
import org.jboss.cache.Modification;
import org.jboss.cache.RegionManager;
import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
@@ -62,7 +63,7 @@
* @since 2.0.0
*/
@ThreadSafe
-public interface CacheLoader
+public interface CacheLoader extends Lifecycle
{
/**
* Sets the configuration. This is called before {@link #create()} and {@link #start()}.
@@ -341,29 +342,4 @@
* @param manager the region manager to use, or <code>null</code>.
*/
void setRegionManager(RegionManager manager);
-
- /**
- * Lifecycle method, called when the cache loader is created.
- *
- * @throws java.lang.Exception
- */
- void create() throws java.lang.Exception;
-
- /**
- * Lifecycle method, called when the cache loader is started.
- *
- * @throws java.lang.Exception
- */
- void start() throws java.lang.Exception;
-
- /**
- * Lifecycle method, called when the cache loader is stopped.
- */
- void stop();
-
- /**
- * Lifecycle method, called when the cache loader is destroyed.
- */
- void destroy();
-
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/TcpDelegatingCacheLoader.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -28,6 +28,7 @@
import org.jboss.cache.Modification;
import org.jboss.cache.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
import org.jboss.cache.loader.tcp.TcpCacheOperations;
+import org.jboss.cache.util.concurrent.SynchronizedRestarter;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -64,6 +65,7 @@
volatile ObjectOutputStream out;
private static final Log log = LogFactory.getLog(TcpDelegatingCacheLoader.class);
private static final boolean trace = log.isTraceEnabled();
+ private final SynchronizedRestarter restarter = new SynchronizedRestarter();
private static Method GET_CHILDREN_METHOD, GET_METHOD, PUT_KEY_METHOD, PUT_DATA_METHOD, REMOVE_KEY_METHOD, REMOVE_METHOD, PUT_MODS_METHOD, EXISTS_METHOD, REMOVE_DATA_METHOD;
static
@@ -141,16 +143,16 @@
// sleep 250 ms
if (log.isDebugEnabled()) log.debug("Caught IOException. Retrying.", e);
Thread.sleep(config.getReconnectWaitTime());
- restart();
+ restarter.restartComponent(this);
}
- catch (IOException e1)
- {
- if (trace) log.trace("Unable to reconnect", e1);
- }
catch (InterruptedException e1)
{
Thread.currentThread().interrupt();
}
+ catch (Exception e1)
+ {
+ if (trace) log.trace("Unable to reconnect", e1);
+ }
}
else
{
@@ -436,12 +438,6 @@
}
}
- protected void restart() throws IOException
- {
- stop();
- start();
- }
-
@Override
public void loadEntireState(ObjectOutputStream os) throws Exception
{
Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServer.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -32,6 +32,8 @@
import org.jboss.cache.Node;
import org.jboss.cache.NodeSPI;
import org.jboss.cache.jmx.CacheJmxWrapperMBean;
+import org.jboss.cache.util.concurrent.ConcurrentHashSet;
+import org.jboss.cache.util.concurrent.SynchronizedRestarter;
import java.io.BufferedInputStream;
import java.io.BufferedOutputStream;
@@ -46,7 +48,6 @@
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -67,7 +68,8 @@
private CacheJmxWrapperMBean wrapper;
private String config;
private boolean running = true;
- private final List<Connection> conns = Collections.synchronizedList(new LinkedList<Connection>());
+ private final Set<Connection> conns = new ConcurrentHashSet<Connection>();
+ private final SynchronizedRestarter restarter = new SynchronizedRestarter();
/**
* whether or not to start the server thread as a daemon. Should be false if started from the command line, true if started as an MBean.
*/
@@ -197,12 +199,11 @@
{
try
{
- TcpCacheServer.this.stop();
- TcpCacheServer.this.start();
+ restarter.restartComponent(TcpCacheServer.this);
}
catch (Exception e)
{
- throw new CacheException("Caught exception trying to restart cache server", e);
+ throw new CacheException("Unable to restart TcpCacheServer", e);
}
}
}
@@ -220,10 +221,7 @@
// Connection.close() removes conn from the list,
// so copy off the list first to avoid ConcurrentModificationException
List<Connection> copy = new ArrayList<Connection>(conns);
- for (Connection conn : copy)
- {
- conn.close();
- }
+ for (Connection conn : copy) conn.close();
conns.clear();
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java 2009-01-14 14:15:11 UTC (rev 7462)
+++ core/trunk/src/main/java/org/jboss/cache/loader/tcp/TcpCacheServerMBean.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -22,6 +22,7 @@
package org.jboss.cache.loader.tcp;
import org.jboss.cache.Cache;
+import org.jboss.cache.Lifecycle;
import org.jboss.cache.jmx.CacheJmxWrapperMBean;
import java.net.UnknownHostException;
@@ -33,16 +34,8 @@
* @author Brian Stansberry
* @version $Id$
*/
-public interface TcpCacheServerMBean
+public interface TcpCacheServerMBean extends Lifecycle
{
- void create() throws Exception;
-
- void start() throws Exception;
-
- void stop();
-
- void destroy();
-
String getBindAddress();
void setBindAddress(String bind_addr) throws UnknownHostException;
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/SynchronizedRestarter.java 2009-01-14 14:49:42 UTC (rev 7463)
@@ -0,0 +1,71 @@
+package org.jboss.cache.util.concurrent;
+
+import org.jboss.cache.Lifecycle;
+
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.LockSupport;
+
+/**
+ * A class that handles restarts of components via multiple threads. Specifically, if a component needs to be restarted
+ * and several threads may demand a restart but only one thread should be allowed to restart the component, then use this
+ * class.
+ * <p/>
+ * What this class guarantees is that several threads may come in while a component is being restarted, but they will
+ * block until the restart is complete.
+ * <p/>
+ * This is different from other techniques in that:
+ * <ul>
+ * <li>A simple compare-and-swap to check whether another thread is already performing a restart will result in the
+ * requesting thread returning immediately and potentially attempting to use the resource being restarted.</li>
+ * <li>A synchronized method or use of a lock would result in the thread waiting for the restart to complete, but on
+ * completion will attempt to restart the component again.</li>
+ * </ul>
+ * This implementation combines a compare-and-swap to detect a concurrent restart, as well as registering for notification
+ * for when the restart completes and then parking the thread if the CAS variable still indicates a restart in progress,
+ * and finally deregistering itself in the end.
+ *
+ * @author Manik Surtani
+ */
+public class SynchronizedRestarter
+{
+ private AtomicBoolean restartInProgress = new AtomicBoolean(false);
+ private ConcurrentHashSet<Thread> restartWaiters = new ConcurrentHashSet<Thread>();
+
+ public void restartComponent(Lifecycle component) throws Exception
+ {
+ // will only enter this block if no one else is restarting the socket
+ // and will atomically set the flag so others won't enter
+ if (restartInProgress.compareAndSet(false, true))
+ {
+ try
+ {
+ component.stop();
+ component.start();
+ }
+ finally
+ {
+ restartInProgress.set(false);
+ for (Thread waiter : restartWaiters)
+ {
+ try
+ {
+ LockSupport.unpark(waiter);
+ }
+ catch (Throwable t)
+ {
+ // do nothing; continue notifying the rest
+ }
+ }
+ }
+ }
+ else
+ {
+ // register interest in being notified after the restart
+ restartWaiters.add(Thread.currentThread());
+ // check again to ensure the restarting thread hasn't finished, then wait for that thread to finish
+ if (restartInProgress.get()) LockSupport.park();
+ // de-register interest in notification
+ restartWaiters.remove(Thread.currentThread());
+ }
+ }
+}
More information about the jbosscache-commits
mailing list