JBoss Cache SVN: r6497 - core/trunk/src/main/java/org/jboss/cache/marshall.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 10:22:30 -0400 (Mon, 04 Aug 2008)
New Revision: 6497
Modified:
core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
Log:
Compat with older marshaller
Modified: core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-04 14:21:20 UTC (rev 6496)
+++ core/trunk/src/main/java/org/jboss/cache/marshall/AbstractMarshaller.java 2008-08-04 14:22:30 UTC (rev 6497)
@@ -104,7 +104,7 @@
public Object objectFromByteBuffer(byte[] bytes) throws Exception
{
- throw new RuntimeException("Needs to be overridden!");
+ return objectFromByteBuffer(bytes, 0, bytes.length);
}
public Object objectFromByteBuffer(byte[] bytes, int offset, int len) throws Exception
16 years, 5 months
JBoss Cache SVN: r6496 - benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-08-04 10:21:20 -0400 (Mon, 04 Aug 2008)
New Revision: 6496
Modified:
benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/config.sh
Log:
increased memory size
Modified: benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/config.sh
===================================================================
--- benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/config.sh 2008-08-04 14:20:21 UTC (rev 6495)
+++ benchmarks/benchmark-fwk/trunk/cache-products/coherence-3.3.1/config.sh 2008-08-04 14:21:20 UTC (rev 6496)
@@ -15,6 +15,6 @@
#--classpath was set
#additional JVM options
-export JVM_OPTIONS="$JVM_OPTIONS -Djava.net.preferIPv4Stack=true -Dtangosol.coherence.localhost=${MYTESTIP_1}"
+export JVM_OPTIONS="$JVM_OPTIONS -Xmx2048m -Djava.net.preferIPv4Stack=true -Dtangosol.coherence.localhost=${MYTESTIP_1}"
export JVM_OPTIONS="$JVM_OPTIONS -Dtangosol.coherence.cacheconfig=cache-config.xml"
export JVM_OPTIONS="$JVM_OPTIONS -DcacheBenchFwk.cacheWrapperClassName=org.cachebench.cachewrappers.Coherence331Wrapper"
16 years, 5 months
JBoss Cache SVN: r6495 - benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-08-04 10:20:21 -0400 (Mon, 04 Aug 2008)
New Revision: 6495
Modified:
benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/config.sh
Log:
increased memory size
Modified: benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/config.sh
===================================================================
--- benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/config.sh 2008-08-04 13:34:30 UTC (rev 6494)
+++ benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/config.sh 2008-08-04 14:20:21 UTC (rev 6495)
@@ -15,5 +15,5 @@
#--classpath was set
#additional JVM options
-JVM_OPTIONS="$JVM_OPTIONS -Djava.net.preferIPv4Stack=true -Djbosscache.config.validate=false"
+JVM_OPTIONS="$JVM_OPTIONS -Djava.net.preferIPv4Stack=true -Djbosscache.config.validate=false -Xmx2048m"
JVM_OPTIONS="$JVM_OPTIONS -DcacheBenchFwk.cacheWrapperClassName=org.cachebench.cachewrappers.JBossCache300Wrapper"
16 years, 5 months
JBoss Cache SVN: r6494 - in core/trunk/src/main/java/org/jboss/cache: notifications/annotation and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 09:34:30 -0400 (Mon, 04 Aug 2008)
New Revision: 6494
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
Log:
JBCACHE-1108 Javadocs
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 13:27:06 UTC (rev 6493)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 13:34:30 UTC (rev 6494)
@@ -412,6 +412,14 @@
this.syncRollbackPhase = syncRollbackPhase;
}
+ /**
+ * Sets the size of the asynchronous listener notification thread pool size. Defaults to 1, and if set to below 1,
+ * all async listeners (specified with {@link org.jboss.cache.notifications.annotation.CacheListener#sync()} are notified
+ * synchronously.
+ *
+ * @param listenerAsyncPoolSize number of threads in pool
+ * @since 3.0
+ */
public void setListenerAsyncPoolSize(int listenerAsyncPoolSize)
{
testImmutability("asyncListenerPoolSize");
@@ -721,7 +729,12 @@
}
/**
- * @return the size of the async listener thread pool. If this is < 1, all async listeners should be treated as sync listeners.
+ * Gets the size of the asynchronous listener notification thread pool size. Defaults to 1, and if set to below 1,
+ * all async listeners (specified with {@link org.jboss.cache.notifications.annotation.CacheListener#sync()} are notified
+ * synchronously.
+ *
+ * @return thread pool size
+ * @since 3.0
*/
public int getListenerAsyncPoolSize()
{
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04 13:27:06 UTC (rev 6493)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04 13:34:30 UTC (rev 6494)
@@ -58,8 +58,11 @@
* as locks may be held longer than necessary or intended to and may cause deadlocking in certain situations. See above paragraph
* on long-running tasks that should be run in a separate thread.
* </p>
+ * <b>Note</b>: Since 3.0, a new parameter, <tt>sync</tt>, has been introduced on this annotation. This defaults to <tt>true</tt>
+ * which provides the above semantics. Alternatively, if you set <tt>sync</tt> to <tt>false</tt>, then invocations are made in a
+ * <i>separate</i> thread, which will not cause any blocking on the caller or network thread. The separate thread is taken
+ * from a pool, which can be configured using {@link org.jboss.cache.config.Configuration#setListenerAsyncPoolSize(int)}.
* <p/>
- * <p/>
* <b>Summary of Notification Annotations</b>
* <table border="1" cellpadding="1" cellspacing="1" summary="Summary of notification annotations">
* <tr>
@@ -298,5 +301,12 @@
@Target(ElementType.TYPE)
public @interface CacheListener
{
+ /**
+ * Specifies whether callbacks on any class annotated with this annotation happens synchronously (in the caller's thread)
+ * or asynchronously (using a separate thread). Defaults to <tt>true</tt>.
+ *
+ * @return true if the expectation is that callbacks are called using the caller's thread; false if they are to be made in a separate thread.
+ * @since 3.0
+ */
boolean sync() default true;
}
16 years, 5 months
JBoss Cache SVN: r6493 - in core/trunk/src: main/java/org/jboss/cache/notifications and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 09:27:06 -0400 (Mon, 04 Aug 2008)
New Revision: 6493
Added:
core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
Log:
JBCACHE-1108 Move CacheListeners to their own thread pool, optionally enabled using an attribute on @CacheListener
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 13:27:06 UTC (rev 6493)
@@ -204,7 +204,7 @@
private List<CustomInterceptorConfig> customInterceptors = Collections.emptyList();
private boolean writeSkewCheck = false;
private int concurrencyLevel = 500;
- private int listenerAsyncPoolSize = 0;
+ private int listenerAsyncPoolSize = 1;
@Start(priority = 1)
private void correctIsolationLevels()
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/NotifierImpl.java 2008-08-04 13:27:06 UTC (rev 6493)
@@ -25,6 +25,7 @@
import org.jboss.cache.notifications.event.*;
import static org.jboss.cache.notifications.event.Event.Type.*;
import org.jboss.cache.util.ImmutableMapCopy;
+import org.jboss.cache.util.concurrent.WithinThreadExecutor;
import org.jgroups.View;
import javax.transaction.Transaction;
@@ -39,6 +40,10 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
/**
* Helper class that handles all notifications to registered listeners.
@@ -90,6 +95,10 @@
private Cache cache;
private boolean useMarshalledValueMaps;
private Configuration config;
+ // two separate executor services, one for sync and one for async listeners
+ private ExecutorService syncProcessor;
+ private ExecutorService asyncProcessor;
+ private static final AtomicInteger asyncNotifierThreadNumber = new AtomicInteger(0);
public NotifierImpl()
{
@@ -126,6 +135,13 @@
this.config = config;
}
+ @Stop
+ private void stop()
+ {
+ syncProcessor.shutdownNow();
+ asyncProcessor.shutdownNow();
+ }
+
@Destroy
protected void destroy()
{
@@ -136,6 +152,22 @@
protected void start()
{
useMarshalledValueMaps = config.isUseLazyDeserialization();
+ syncProcessor = new WithinThreadExecutor();
+ if (config.getListenerAsyncPoolSize() > 0)
+ {
+ asyncProcessor = Executors.newFixedThreadPool(config.getListenerAsyncPoolSize(), new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ return new Thread(r, "AsyncNotifier-" + asyncNotifierThreadNumber.getAndIncrement());
+ }
+ });
+ }
+ else
+ {
+ // use the same sync executor
+ asyncProcessor = syncProcessor;
+ }
}
/**
@@ -147,7 +179,7 @@
@SuppressWarnings("unchecked")
private void validateAndAddListenerInvocation(Object listener)
{
- testListenerClassValidity(listener.getClass());
+ boolean sync = testListenerClassValidity(listener.getClass());
boolean foundMethods = false;
// now try all methods on the listener for anything that we like. Note that only PUBLIC methods are scanned.
@@ -159,7 +191,7 @@
if (m.isAnnotationPresent(allowedMethodAnnotations[i]))
{
testListenerMethodValidity(m, parameterTypes[i], allowedMethodAnnotations[i].getName());
- addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m));
+ addListenerInvocation(allowedMethodAnnotations[i], new ListenerInvocation(listener, m, sync));
foundMethods = true;
}
}
@@ -169,12 +201,22 @@
log.warn("Attempted to register listener of class " + listener.getClass() + ", but no valid, public methods annotated with method-level event annotations found! Ignoring listener.");
}
- private static void testListenerClassValidity(Class<?> listenerClass)
+ /**
+ * Tests if a class is properly annotated as a CacheListener and returns whether callbacks on this class should be invoked
+ * synchronously or asynchronously.
+ *
+ * @param listenerClass class to inspect
+ * @return true if callbacks on this class should use the syncProcessor; false if it should use the asyncProcessor.
+ */
+ private static boolean testListenerClassValidity(Class<?> listenerClass)
{
- if (!listenerClass.isAnnotationPresent(CacheListener.class))
+ CacheListener cl = listenerClass.getAnnotation(CacheListener.class);
+ if (cl == null)
throw new IncorrectCacheListenerException("Cache listener class MUST be annotated with org.jboss.cache.notifications.annotation.CacheListener");
if (!Modifier.isPublic(listenerClass.getModifiers()))
throw new IncorrectCacheListenerException("Cache listener class MUST be public!");
+ return cl.sync();
+
}
private static void testListenerMethodValidity(Method m, Class allowedParameter, String annotationName)
@@ -616,36 +658,49 @@
*/
class ListenerInvocation
{
-
private final Object target;
-
private final Method method;
+ private final boolean sync;
- public ListenerInvocation(Object target, Method method)
+ public ListenerInvocation(Object target, Method method, boolean sync)
{
this.target = target;
this.method = method;
+ this.sync = sync;
}
- public void invoke(Event e)
+ public void invoke(final Event e)
{
- try
+ Runnable r = new Runnable()
{
- method.invoke(target, e);
- }
- catch (InvocationTargetException e1)
- {
- Throwable cause = e1.getCause();
- if (cause != null)
- throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
- else
- throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, e1);
- }
- catch (IllegalAccessException e1)
- {
- log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", e1);
- removeCacheListener(this.target);
- }
+
+ public void run()
+ {
+ try
+ {
+ method.invoke(target, e);
+ }
+ catch (InvocationTargetException exception)
+ {
+ Throwable cause = exception.getCause();
+ if (cause != null)
+ throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, cause);
+ else
+ throw new CacheException("Caught exception invoking method " + method + " on listener instance " + target, exception);
+ }
+ catch (IllegalAccessException exception)
+ {
+ log.warn("Unable to invoke method " + method + " on Object instance " + target + " - removing this target object from list of listeners!", exception);
+ removeCacheListener(target);
+ }
+ }
+ };
+
+ if (sync)
+ syncProcessor.execute(r);
+ else
+ asyncProcessor.execute(r);
+
}
}
Modified: core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04 12:49:34 UTC (rev 6492)
+++ core/trunk/src/main/java/org/jboss/cache/notifications/annotation/CacheListener.java 2008-08-04 13:27:06 UTC (rev 6493)
@@ -298,4 +298,5 @@
@Target(ElementType.TYPE)
public @interface CacheListener
{
+ boolean sync() default true;
}
Added: core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java (rev 0)
+++ core/trunk/src/main/java/org/jboss/cache/util/concurrent/WithinThreadExecutor.java 2008-08-04 13:27:06 UTC (rev 6493)
@@ -0,0 +1,89 @@
+package org.jboss.cache.util.concurrent;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * An executor that works within the current thread.
+ *
+ * @author Manik Surtani (<a href="mailto:manik@jboss.org">manik(a)jboss.org</a>)
+ * @see <a href="http://jcip.net/">Java Concurrency In Practice</a>
+ * @since 3.0
+ */
+public class WithinThreadExecutor implements ExecutorService
+{
+ boolean shutDown = false;
+
+ public void execute(Runnable command)
+ {
+ command.run();
+ }
+
+ public void shutdown()
+ {
+ shutDown = true;
+ }
+
+ public List<Runnable> shutdownNow()
+ {
+ shutDown = true;
+ return Collections.emptyList();
+ }
+
+ public boolean isShutdown()
+ {
+ return shutDown;
+ }
+
+ public boolean isTerminated()
+ {
+ return shutDown;
+ }
+
+ public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException
+ {
+ return shutDown;
+ }
+
+ public <T> Future<T> submit(Callable<T> task)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> Future<T> submit(Runnable task, T result)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public Future<?> submit(Runnable task)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> List<Future<T>> invokeAll(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks) throws InterruptedException, ExecutionException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public <T> T invokeAny(Collection<Callable<T>> tasks, long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Added: core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java (rev 0)
+++ core/trunk/src/test/java/org/jboss/cache/notifications/AsyncNotificationTest.java 2008-08-04 13:27:06 UTC (rev 6493)
@@ -0,0 +1,87 @@
+package org.jboss.cache.notifications;
+
+import org.jboss.cache.Cache;
+import org.jboss.cache.DefaultCacheFactory;
+import org.jboss.cache.notifications.annotation.CacheListener;
+import org.jboss.cache.notifications.annotation.NodeCreated;
+import org.jboss.cache.notifications.event.NodeCreatedEvent;
+import org.jboss.cache.util.TestingUtil;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CountDownLatch;
+
+@Test(groups = "functional")
+public class AsyncNotificationTest
+{
+ public void testAsyncNotification() throws InterruptedException
+ {
+ Cache<String, String> c = null;
+ try
+ {
+ c = new DefaultCacheFactory<String, String>().createCache();
+ CountDownLatch latch = new CountDownLatch(2);
+ AbstractListener syncListener = new Listener(latch);
+ AbstractListener asyncListener = new AsyncListener(latch);
+ c.addCacheListener(syncListener);
+ c.addCacheListener(asyncListener);
+ c.put("/a", "k", "v");
+ latch.await();
+ assert syncListener.caller == Thread.currentThread();
+ assert asyncListener.caller != Thread.currentThread();
+ }
+ finally
+ {
+ TestingUtil.killCaches(c);
+ }
+ }
+
+ public abstract static class AbstractListener
+ {
+ Thread caller;
+ CountDownLatch latch;
+
+ protected AbstractListener(CountDownLatch latch)
+ {
+ this.latch = latch;
+ }
+ }
+
+ @CacheListener(sync = true)
+ public static class Listener extends AbstractListener
+ {
+ public Listener(CountDownLatch latch)
+ {
+ super(latch);
+ }
+
+ @NodeCreated
+ public void handle(NodeCreatedEvent e)
+ {
+ if (e.isPre())
+ {
+ caller = Thread.currentThread();
+ latch.countDown();
+ }
+ }
+ }
+
+ @CacheListener(sync = false)
+ public static class AsyncListener extends AbstractListener
+ {
+ public AsyncListener(CountDownLatch latch)
+ {
+ super(latch);
+ }
+
+ @NodeCreated
+ public void handle(NodeCreatedEvent e)
+ {
+ if (e.isPre())
+ {
+ caller = Thread.currentThread();
+ latch.countDown();
+ }
+ }
+ }
+
+}
16 years, 5 months
JBoss Cache SVN: r6492 - in core/trunk/src: main/java/org/jboss/cache/config/parsing and 2 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 08:49:34 -0400 (Mon, 04 Aug 2008)
New Revision: 6492
Modified:
core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationSchemaTest.java
core/trunk/src/test/resources/configs/parser-test.xml
Log:
Added config elements for JBCACHE-1108 ( Move CacheListeners to their own thread pool, optionally enabled using an attribute on @CacheListener )
Modified: core/trunk/src/main/java/org/jboss/cache/config/Configuration.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 12:38:14 UTC (rev 6491)
+++ core/trunk/src/main/java/org/jboss/cache/config/Configuration.java 2008-08-04 12:49:34 UTC (rev 6492)
@@ -203,7 +203,8 @@
private int objectOutputStreamPoolSize = 50;
private List<CustomInterceptorConfig> customInterceptors = Collections.emptyList();
private boolean writeSkewCheck = false;
- private int concurrencyLevel = 50;
+ private int concurrencyLevel = 500;
+ private int listenerAsyncPoolSize = 0;
@Start(priority = 1)
private void correctIsolationLevels()
@@ -411,6 +412,12 @@
this.syncRollbackPhase = syncRollbackPhase;
}
+ public void setListenerAsyncPoolSize(int listenerAsyncPoolSize)
+ {
+ testImmutability("asyncListenerPoolSize");
+ this.listenerAsyncPoolSize = listenerAsyncPoolSize;
+ }
+
public void setBuddyReplicationConfig(BuddyReplicationConfig config)
{
testImmutability("buddyReplicationConfig");
@@ -713,6 +720,14 @@
return syncRollbackPhase;
}
+ /**
+ * @return the size of the async listener thread pool. If this is < 1, all async listeners should be treated as sync listeners.
+ */
+ public int getListenerAsyncPoolSize()
+ {
+ return listenerAsyncPoolSize;
+ }
+
public BuddyReplicationConfig getBuddyReplicationConfig()
{
return buddyReplicationConfig;
@@ -855,6 +870,7 @@
if (shutdownHookBehavior != that.shutdownHookBehavior) return false;
if (transactionManagerLookupClass != null ? !transactionManagerLookupClass.equals(that.transactionManagerLookupClass) : that.transactionManagerLookupClass != null)
return false;
+ if (listenerAsyncPoolSize != that.listenerAsyncPoolSize) return false;
return true;
}
@@ -950,6 +966,7 @@
*
* @return List of cutom interceptors, never null
*/
+ @SuppressWarnings("unchecked")
public List<CustomInterceptorConfig> getCustomInterceptors()
{
return customInterceptors == null ? Collections.EMPTY_LIST : customInterceptors;
Modified: core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-08-04 12:38:14 UTC (rev 6491)
+++ core/trunk/src/main/java/org/jboss/cache/config/parsing/XmlConfigurationParser.java 2008-08-04 12:49:34 UTC (rev 6492)
@@ -167,6 +167,7 @@
configureEviction(getSingleElement("eviction"));
configureCacheLoaders(getSingleElement("loaders"));
configureCustomInterceptors(getSingleElement("customInterceptors"));
+ configureListeners(getSingleElement("listeners"));
}
catch (Exception e)
{
@@ -225,6 +226,23 @@
config.setCustomInterceptors(interceptorConfigList);
}
+ private void configureListeners(Element element)
+ {
+ if (element == null) return; //this element is optional
+ String asyncPoolSizeStr = getAttributeValue(element, "asyncPoolSize");
+ if (asyncPoolSizeStr != null && !asyncPoolSizeStr.trim().equals(""))
+ {
+ try
+ {
+ config.setListenerAsyncPoolSize(Integer.parseInt(asyncPoolSizeStr));
+ }
+ catch (NumberFormatException nfe)
+ {
+ throw new ConfigurationException("Unable to parse the asyncPoolSize attribute of the listeners element. Was [" + asyncPoolSizeStr + "]");
+ }
+ }
+ }
+
private void configureBuddyReplication(Element element)
{
if (element == null) return;//buddy config might not exist, expect that
Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-08-04 12:38:14 UTC (rev 6491)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationParserTest.java 2008-08-04 12:49:34 UTC (rev 6492)
@@ -264,4 +264,9 @@
assert !config.isWriteSkewCheck();
assert config.getConcurrencyLevel() == 21;
}
+
+ public void testListenerAsyncThreads()
+ {
+ assert config.getListenerAsyncPoolSize() == 5;
+ }
}
Modified: core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationSchemaTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationSchemaTest.java 2008-08-04 12:38:14 UTC (rev 6491)
+++ core/trunk/src/test/java/org/jboss/cache/config/parsing/XmlConfigurationSchemaTest.java 2008-08-04 12:49:34 UTC (rev 6492)
@@ -10,12 +10,12 @@
import java.util.List;
/**
- * Tests that all the xml file used within tests are correct with respect to the schema definition.
- *
+ * Tests that all the xml file used within tests are correct with respect to the schema definition.
+ *
* @author Mircea.Markus(a)jboss.com
* @since 3.0
*/
-@Test (groups = "functional")
+@Test(groups = "functional")
public class XmlConfigurationSchemaTest
{
public static final String BASE_DIR_FOR_CONFIG = "./configs";
@@ -33,20 +33,21 @@
"policyPerRegion-eviction.xml",
"replSync.xml",
"string-property-replaced.xml"
- };
+ };
/**
* Simple test to prove that validation works.
*/
public void testSimpleFile()
{
- EceptionCountingErrorHanlder handler = new EceptionCountingErrorHanlder();
+ ExceptionCountingErrorHandler handler = new ExceptionCountingErrorHandler();
XmlConfigurationParser parser = new XmlConfigurationParser(handler);
for (String file : testFiles)
{
System.out.println("file = " + file);
parser.parseFile(BASE_DIR_FOR_CONFIG + File.separator + file);
- assert handler.noErrors() : "error during parsing";
+ for (Exception e : handler.exceptionList) e.printStackTrace();
+ assert handler.noErrors() : "error during parsing (file " + file + ")";
}
}
@@ -67,7 +68,7 @@
assert parser.isValidating();
}
- private static class EceptionCountingErrorHanlder implements ErrorHandler
+ private static class ExceptionCountingErrorHandler implements ErrorHandler
{
List<SAXParseException> exceptionList = new ArrayList<SAXParseException>();
@@ -88,7 +89,7 @@
private void handleDefault(SAXParseException exception)
{
- System.out.println("Error :" + exception.getMessage());
+ System.out.println("Error :" + exception.getMessage());
exceptionList.add(exception);
}
Modified: core/trunk/src/test/resources/configs/parser-test.xml
===================================================================
--- core/trunk/src/test/resources/configs/parser-test.xml 2008-08-04 12:38:14 UTC (rev 6491)
+++ core/trunk/src/test/resources/configs/parser-test.xml 2008-08-04 12:49:34 UTC (rev 6492)
@@ -110,4 +110,7 @@
<interceptor after="org.jboss.cache.interceptors.CallInterceptor"
class="org.jboss.cache.config.parsing.custominterceptors.AaaCustomInterceptor"/>
</customInterceptors>
+
+ <!-- the number of threads to use for asynchronous cache listeners - defaults to 1 -->
+ <listeners asyncPoolSize="5"/>
</jbosscache>
16 years, 5 months
JBoss Cache SVN: r6491 - benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/lib.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-08-04 08:38:14 -0400 (Mon, 04 Aug 2008)
New Revision: 6491
Modified:
benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/lib/jbosscache-core.jar
benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/lib/jgroups.jar
Log:
updated jboss cache version
Modified: benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/lib/jbosscache-core.jar
===================================================================
(Binary files differ)
Modified: benchmarks/benchmark-fwk/trunk/cache-products/jbosscache-3.0.0/lib/jgroups.jar
===================================================================
(Binary files differ)
16 years, 5 months
JBoss Cache SVN: r6490 - core/trunk/src/main/resources.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2008-08-04 08:28:44 -0400 (Mon, 04 Aug 2008)
New Revision: 6490
Modified:
core/trunk/src/main/resources/jbosscache-config-3.0.xsd
Log:
added a todo
Modified: core/trunk/src/main/resources/jbosscache-config-3.0.xsd
===================================================================
--- core/trunk/src/main/resources/jbosscache-config-3.0.xsd 2008-08-04 11:20:19 UTC (rev 6489)
+++ core/trunk/src/main/resources/jbosscache-config-3.0.xsd 2008-08-04 12:28:44 UTC (rev 6490)
@@ -1,6 +1,7 @@
<?xml version="1.0" encoding="UTF-8"?>
<xs:schema attributeFormDefault="unqualified" elementFormDefault="unqualified"
xmlns:xs="http://www.w3.org/2001/XMLSchema">
+ <!-- todo mmarkus - maxOccurs can be added for some elements, revisit that -->
<xs:element name="jbosscache">
<xs:complexType>
<xs:all>
16 years, 5 months
JBoss Cache SVN: r6489 - in core/trunk/src: test/java/org/jboss/cache/loader and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 07:20:19 -0400 (Mon, 04 Aug 2008)
New Revision: 6489
Modified:
core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
Log:
Refactored async cache loader
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java 2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoader.java 2008-08-04 11:20:19 UTC (rev 6489)
@@ -18,6 +18,11 @@
import java.util.Map;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -68,6 +73,9 @@
* expiration of messages within a separate thread and keeping other
* operations synchronous for reliability.
* </dd>
+ * <dt>cache.async.threadPoolSize</dt>
+ * <dd>The size of the async processor thread pool. Defaults to <tt>1</tt>. This
+ * property is new in JBoss Cache 3.0.</dd>
* </dl>
* For increased performance for many smaller transactions, use higher values
* for <code>cache.async.batchSize</code> and
@@ -90,9 +98,10 @@
private static final int DEFAULT_QUEUE_SIZE = 10000;
private AsyncCacheLoaderConfig config;
- private AsyncProcessor processor;
+ private ExecutorService executor;
private AtomicBoolean stopped = new AtomicBoolean(true);
private BlockingQueue<Modification> queue = new ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+ private List<Future> processorFutures;
public AsyncCacheLoader()
{
@@ -237,32 +246,52 @@
if (log.isInfoEnabled()) log.info("Async cache loader starting: " + this);
stopped.set(false);
super.start();
- processor = new AsyncProcessor();
- processor.start();
+ executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new ThreadFactory()
+ {
+ public Thread newThread(Runnable r)
+ {
+ Thread t = new Thread(r, "AsyncCacheLoader-" + threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+ for (int i = 0; i < config.getThreadPoolSize(); i++) processorFutures.add(executor.submit(new AsyncProcessor()));
}
@Override
public void stop()
{
stopped.set(true);
- if (processor != null)
+ if (executor != null)
{
- processor.stop();
+ for (Future f : processorFutures) f.cancel(true);
+ executor.shutdown();
+ try
+ {
+ boolean terminated = executor.isTerminated();
+ while (!terminated)
+ {
+ terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
}
+ executor = null;
super.stop();
}
- private void enqueue(Modification mod)
+ private void enqueue(final Modification mod)
throws CacheException, InterruptedException
{
if (stopped.get())
{
throw new CacheException("AsyncCacheLoader stopped; no longer accepting more entries.");
}
- if (trace)
- {
- log.trace("Enqueuing modification " + mod);
- }
+ if (trace) log.trace("Enqueuing modification " + mod);
queue.put(mod);
}
@@ -273,40 +302,9 @@
*/
private class AsyncProcessor implements Runnable
{
- private Thread t;
-
// Modifications to invoke as a single put
private final List<Modification> mods = new ArrayList<Modification>(config.getBatchSize());
- public void start()
- {
- if (t == null || !t.isAlive())
- {
- t = new Thread(this, "AsyncCacheLoader-" + threadId.getAndIncrement());
- t.setDaemon(true);
- t.start();
- }
- }
-
- public void stop()
- {
- if (t != null)
- {
- t.interrupt();
- try
- {
- t.join();
- }
- catch (InterruptedException e)
- {
- }
- }
- if (!queue.isEmpty())
- {
- log.warn("Async queue not yet empty, possibly interrupted");
- }
- }
-
public void run()
{
while (!Thread.interrupted())
@@ -364,16 +362,9 @@
catch (Exception e)
{
if (log.isWarnEnabled()) log.warn("Failed to process async modifications: " + e);
- log.debug("Exception: ", e);
+ if (log.isDebugEnabled()) log.debug("Exception: ", e);
}
}
-
- @Override
- public String toString()
- {
- return "TQ t=" + t;
- }
-
}
@Override
@@ -381,11 +372,11 @@
{
return super.toString() +
" delegate=[" + super.getCacheLoader() + "]" +
- " processor=" + processor +
" stopped=" + stopped +
" batchSize=" + config.getBatchSize() +
" returnOld=" + config.getReturnOld() +
" asyncPut=" + config.getUseAsyncPut() +
+ " threadPoolSize=" + config.getThreadPoolSize() +
" queue.remainingCapacity()=" + queue.remainingCapacity() +
" queue.peek()=" + queue.peek();
}
Modified: core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java 2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AsyncCacheLoaderConfig.java 2008-08-04 11:20:19 UTC (rev 6489)
@@ -15,6 +15,7 @@
private boolean returnOld = true;
private int queueSize = 0;
private boolean useAsyncPut = true;
+ private int threadPoolSize = 1;
/**
* Default constructor.
@@ -35,6 +36,17 @@
populateFromBaseConfig(base);
}
+ public int getThreadPoolSize()
+ {
+ return threadPoolSize;
+ }
+
+ public void setThreadPoolSize(int threadPoolSize)
+ {
+ testImmutability("threadPoolSize");
+ this.threadPoolSize = threadPoolSize;
+ }
+
public int getBatchSize()
{
return batchSize;
@@ -86,32 +98,24 @@
String s;
s = props.getProperty("cache.async.batchSize");
- if (s != null)
- {
- batchSize = Integer.parseInt(s);
- }
- if (batchSize <= 0)
- {
- throw new IllegalArgumentException("Invalid size: " + batchSize);
- }
+ if (s != null) batchSize = Integer.parseInt(s);
+ if (batchSize <= 0) throw new IllegalArgumentException("Invalid batch size: " + batchSize);
+ s = props.getProperty("cache.async.threadPoolSize");
+ if (s != null) threadPoolSize = Integer.parseInt(s);
+ if (threadPoolSize <= 0) throw new IllegalArgumentException("Invalid thread pool size: " + threadPoolSize);
+
+
s = props.getProperty("cache.async.returnOld");
- if (s != null)
- {
- returnOld = Boolean.valueOf(s);
- }
+ if (s != null) returnOld = Boolean.valueOf(s);
s = props.getProperty("cache.async.queueSize");
- if (s != null)
- {
- queueSize = Integer.parseInt(s);
- }
+ if (s != null) queueSize = Integer.parseInt(s);
s = props.getProperty("cache.async.put");
- if (s != null)
- {
- useAsyncPut = Boolean.valueOf(s);
- }
+ if (s != null) useAsyncPut = Boolean.valueOf(s);
+
+
}
@Override
Modified: core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java 2008-08-04 11:19:50 UTC (rev 6488)
+++ core/trunk/src/test/java/org/jboss/cache/loader/AsyncFileCacheLoaderTest.java 2008-08-04 11:20:19 UTC (rev 6489)
@@ -12,12 +12,14 @@
import static org.testng.AssertJUnit.assertEquals;
import static org.testng.AssertJUnit.assertTrue;
import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.util.Collections;
import java.util.HashMap;
+@Test(groups = "functional")
public class AsyncFileCacheLoaderTest extends AbstractCacheLoaderTestBase
{
private CacheSPI<Object, Object> cache;
@@ -150,4 +152,23 @@
assertEquals("X found", true, loader.exists(X));
loader.remove(X);
}
+
+ public void testMultipleThreads() throws Exception
+ {
+ configureCache(
+ "cache.async.queueSize=1\n" +
+ "cache.async.pollWait=10\n" +
+ "cache.async.threadPoolSize=5");
+ CacheLoader loader = cache.getCacheLoaderManager().getCacheLoader();
+ Fqn fqn = Fqn.fromString("/bound");
+ loader.remove(fqn);
+ // You can't really see it block though :-/
+ for (int i = 0; i < 50; i++)
+ {
+ cache.put(fqn, "key" + i, "value1");
+ }
+ Thread.sleep(1000);
+ assertEquals(50, loader.get(fqn).size());
+ loader.remove(fqn);
+ }
}
16 years, 5 months
JBoss Cache SVN: r6488 - core/trunk/src/main/java/org/jboss/cache/remoting/jgroups.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2008-08-04 07:19:50 -0400 (Mon, 04 Aug 2008)
New Revision: 6488
Modified:
core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
Log:
Moved exposedBAOS
Modified: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-08-04 11:19:36 UTC (rev 6487)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java 2008-08-04 11:19:50 UTC (rev 6488)
@@ -7,9 +7,9 @@
import org.jboss.cache.config.Configuration;
import org.jboss.cache.factories.annotations.Inject;
import org.jboss.cache.factories.annotations.NonVolatile;
+import org.jboss.cache.io.ExposedByteArrayOutputStream;
import org.jboss.cache.statetransfer.DefaultStateTransferManager;
import org.jboss.cache.statetransfer.StateTransferManager;
-import org.jboss.cache.util.ExposedByteArrayOutputStream;
import org.jboss.util.stream.MarshalledValueInputStream;
import org.jboss.util.stream.MarshalledValueOutputStream;
import org.jgroups.ExtendedMessageListener;
16 years, 5 months