how many tasks?
The Executor that rejects the task is configured externally. If 3 threads are not enough
you should either increase the number of threads or use a different rejection policy.
simple tasks quickly drains out a thread pool of size 3.
------
14:41:43,061 WARN [org.infinispan.distexec.DefaultExecutorService]
(notification-thread-0) ISPN000007: Failed local execution :
java.util.concurrent.RejectedExecutionException: Current thread pool executor queue:
[java.util.concurrent.FutureTask@cee8427, java.util.concurrent.FutureTask@7f262312,
java.util.concurrent.FutureTask@20c8b3f5]
at
org.jboss.as.capedwarf.services.SimpleThreadsHandler.rejectedExecution(SimpleThreadsHandler.java:35)
[jboss-as-capedwarf-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767)
[classes.jar:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658)
[classes.jar:1.6.0_37]
at
java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:92)
[classes.jar:1.6.0_37]
at
org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.invokeLocally(DefaultExecutorService.java:1110)
[infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
at
org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.execute(DefaultExecutorService.java:1067)
[infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
at
org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:486)
[infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
at
org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:472)
[infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
at
org.jboss.capedwarf.common.infinispan.InfinispanUtils.distribute(InfinispanUtils.java:103)
[capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.jboss.capedwarf.common.infinispan.InfinispanUtils.fire(InfinispanUtils.java:92)
[capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.jboss.capedwarf.datastore.notifications.AbstractCacheListener.executeCallable(AbstractCacheListener.java:67)
[capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.jboss.capedwarf.datastore.stats.AbstractEagerListener.executeCallables(AbstractEagerListener.java:52)
[capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.jboss.capedwarf.datastore.stats.AbstractEagerListener.onPreRemove(AbstractEagerListener.java:47)
[capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at
org.jboss.capedwarf.datastore.notifications.AbstractPutRemoveCacheListener.onRemove(AbstractPutRemoveCacheListener.java:62)
[capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
[classes.jar:1.6.0_37]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39)
[classes.jar:1.6.0_37]
at
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25)
[classes.jar:1.6.0_37]
at java.lang.reflect.Method.invoke(Method.java:597) [classes.jar:1.6.0_37]
at
org.infinispan.notifications.AbstractListenerImpl$ListenerInvocation$1.run(AbstractListenerImpl.java:200)
[infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
at
java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886)
[classes.jar:1.6.0_37]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908)
[classes.jar:1.6.0_37]
at java.lang.Thread.run(Thread.java:680) [classes.jar:1.6.0_37]
this is the code that invokes DistService:
/**
* Submit the task to distributed execution env, it could be a fire-n-forget way.
*/
public static <R> Future<R> fire(final String appId, final CacheName
template, final Callable<R> task, Object... keys) {
return distribute(appId, template, task, false, keys);
}
private static <R> Future<R> distribute(final String appId, final
CacheName template, final Callable<R> task, final boolean direct, final Object...
keys) {
if (cacheManager == null)
throw new IllegalArgumentException("CacheManager is null, should not be
here?!");
final Cache cache = getCache(appId, template);
try {
final ExecutorService executor = (direct ? ExecutorFactory.getDirectExecutor()
: ExecutorFactory.getInstance());
final DistributedExecutorService des = new DefaultExecutorService(cache,
executor);
return des.submit(task, keys);
} catch (Exception e) {
throw (e instanceof RuntimeException) ? (RuntimeException) e : new
RuntimeException(e);
}
}
where the non-direct executor service is
int maxPoolSize =
Integer.parseInt(System.getProperty("jboss.capedwarf.maxPoolSize",
"3"));
executor = new ThreadPoolExecutor(1, maxPoolSize, 10, TimeUnit.SECONDS, new
ArrayBlockingQueue<Runnable>(maxPoolSize), this);
and the task are something like this:
public abstract class AbstractUpdateTask<V> extends BaseTxTask<String, V,
Entity> {
private final CapedwarfEnvironment env;
private final Update update;
public AbstractUpdateTask(Update update) {
this.env = CapedwarfEnvironment.getThreadLocalInstance();
this.update = update;
}
protected Entity callInTx() throws Exception {
CapedwarfEnvironment previous = CapedwarfEnvironment.setThreadLocalInstance(env);
try {
return callInTxInternal();
} finally {
if (previous != null) {
CapedwarfEnvironment.setThreadLocalInstance(previous);
} else {
CapedwarfEnvironment.clearThreadLocalInstance();
}
}
}
private Entity callInTxInternal() throws Exception {
final DatastoreService service = DatastoreServiceFactory.getDatastoreService();
final String cacheKey = update.statsKind() + update.statsNamespace();
lock(cacheKey);
V value = getCache().get(cacheKey);
Key key = provideKey(value);
Entity entity;
String oldNamespace = NamespaceManager.get();
NamespaceManager.set(update.statsNamespace());
try {
entity = getOrCreateEntity(service, key);
entity = update.update(entity);
key = service.put(entity);
} finally {
NamespaceManager.set(oldNamespace);
}
getCache().put(cacheKey, updateValue(value, key));
return entity;
}
private void lock(String cacheKey) {
AdvancedCache<String, V> ac = getCache().getAdvancedCache();
if (ac.lock(cacheKey) == false)
throw new IllegalArgumentException("Cannot get a lock on key for " +
cacheKey);
}
private Entity getOrCreateEntity(DatastoreService service, Key key) {
if (key == null) {
return createNewEntity();
} else {
Map<Key, Entity> map = service.get(null, Collections.singleton(key));
if (map.isEmpty()) {
return createNewEntity();
} else {
return map.values().iterator().next();
}
}
}
private Entity createNewEntity() {
Entity entity = new Entity(update.statsKind());
update.initialize(entity);
return entity;
}
protected abstract Key provideKey(V value);
protected abstract V updateValue(V value, Key key);
}
_______________________________________________
infinispan-dev mailing list
infinispan-dev(a)lists.jboss.org
https://lists.jboss.org/mailman/listinfo/infinispan-dev