<html><head></head><body style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><div><div><div>On 8 Jan 2013, at 14:00, Ales Justin wrote:</div><br class="Apple-interchange-newline"><blockquote type="cite"><div>Invoking a few</div></blockquote><div>how many tasks?</div>The Executor that rejects the task is configured externally. If 3 threads are not enough you should either increase the number of threads or use a different rejection policy.&nbsp;<div>&nbsp;</div><br><blockquote type="cite"><div> simple tasks quickly drains out a thread pool of size 3.<br><br>------<br><br>14:41:43,061 WARN &nbsp;[org.infinispan.distexec.DefaultExecutorService] (notification-thread-0) ISPN000007: Failed local execution : java.util.concurrent.RejectedExecutionException: Current thread pool executor queue: [java.util.concurrent.FutureTask@cee8427, java.util.concurrent.FutureTask@7f262312, java.util.concurrent.FutureTask@20c8b3f5]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.as.capedwarf.services.SimpleThreadsHandler.rejectedExecution(SimpleThreadsHandler.java:35) [jboss-as-capedwarf-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:92) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.invokeLocally(DefaultExecutorService.java:1110) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.execute(DefaultExecutorService.java:1067) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:486) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:472) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.common.infinispan.InfinispanUtils.distribute(InfinispanUtils.java:103) [capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.common.infinispan.InfinispanUtils.fire(InfinispanUtils.java:92) [capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.datastore.notifications.AbstractCacheListener.executeCallable(AbstractCacheListener.java:67) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.datastore.stats.AbstractEagerListener.executeCallables(AbstractEagerListener.java:52) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.datastore.stats.AbstractEagerListener.onPreRemove(AbstractEagerListener.java:47) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.jboss.capedwarf.datastore.notifications.AbstractPutRemoveCacheListener.onRemove(AbstractPutRemoveCacheListener.java:62) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.reflect.Method.invoke(Method.java:597) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at org.infinispan.notifications.AbstractListenerImpl$ListenerInvocation$1.run(AbstractListenerImpl.java:200) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [classes.jar:1.6.0_37]<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;at java.lang.Thread.run(Thread.java:680) [classes.jar:1.6.0_37]<br><br>this is the code that invokes DistService:<br><br> &nbsp;&nbsp;&nbsp;/**<br> &nbsp;&nbsp;&nbsp;&nbsp;* Submit the task to distributed execution env, it could be a fire-n-forget way.<br> &nbsp;&nbsp;&nbsp;&nbsp;*/<br> &nbsp;&nbsp;&nbsp;public static &lt;R&gt; Future&lt;R&gt; fire(final String appId, final CacheName template, final Callable&lt;R&gt; task, Object... keys) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return distribute(appId, template, task, false, keys);<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;private static &lt;R&gt; Future&lt;R&gt; distribute(final String appId, final CacheName template, final Callable&lt;R&gt; task, final boolean direct, final Object... keys) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (cacheManager == null)<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new IllegalArgumentException("CacheManager is null, should not be here?!");<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final Cache cache = getCache(appId, template);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final ExecutorService executor = (direct ? ExecutorFactory.getDirectExecutor() : ExecutorFactory.getInstance());<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final DistributedExecutorService des = new DefaultExecutorService(cache, executor);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return des.submit(task, keys);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} catch (Exception e) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br> &nbsp;&nbsp;&nbsp;}<br><br>where the non-direct executor service is<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;int maxPoolSize = Integer.parseInt(System.getProperty("jboss.capedwarf.maxPoolSize", "3"));<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;executor = new ThreadPoolExecutor(1, maxPoolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue&lt;Runnable&gt;(maxPoolSize), this);<br><br>and the task are something like this:<br><br>public abstract class AbstractUpdateTask&lt;V&gt; extends BaseTxTask&lt;String, V, Entity&gt; {<br> &nbsp;&nbsp;&nbsp;private final CapedwarfEnvironment env;<br> &nbsp;&nbsp;&nbsp;private final Update update;<br><br> &nbsp;&nbsp;&nbsp;public AbstractUpdateTask(Update update) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;this.env = CapedwarfEnvironment.getThreadLocalInstance();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;this.update = update;<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;protected Entity callInTx() throws Exception {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CapedwarfEnvironment previous = CapedwarfEnvironment.setThreadLocalInstance(env);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return callInTxInternal();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} finally {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (previous != null) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CapedwarfEnvironment.setThreadLocalInstance(previous);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;CapedwarfEnvironment.clearThreadLocalInstance();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;private Entity callInTxInternal() throws Exception {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final DatastoreService service = DatastoreServiceFactory.getDatastoreService();<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;final String cacheKey = update.statsKind() + update.statsNamespace();<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;lock(cacheKey);<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;V value = getCache().get(cacheKey);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Key key = provideKey(value);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Entity entity;<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;String oldNamespace = NamespaceManager.get();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;NamespaceManager.set(update.statsNamespace());<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;entity = getOrCreateEntity(service, key);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;entity = update.update(entity);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;key = service.put(entity);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} finally {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;NamespaceManager.set(oldNamespace);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;getCache().put(cacheKey, updateValue(value, key));<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return entity;<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;private void lock(String cacheKey) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;AdvancedCache&lt;String, V&gt; ac = getCache().getAdvancedCache();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (ac.lock(cacheKey) == false)<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;throw new IllegalArgumentException("Cannot get a lock on key for " + cacheKey);<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;private Entity getOrCreateEntity(DatastoreService service, Key key) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (key == null) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return createNewEntity();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Map&lt;Key, Entity&gt; map = service.get(null, Collections.singleton(key));<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (map.isEmpty()) {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return createNewEntity();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;} else {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return map.values().iterator().next();<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;private Entity createNewEntity() {<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Entity entity = new Entity(update.statsKind());<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;update.initialize(entity);<br> &nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;return entity;<br> &nbsp;&nbsp;&nbsp;}<br><br> &nbsp;&nbsp;&nbsp;protected abstract Key provideKey(V value);<br><br> &nbsp;&nbsp;&nbsp;protected abstract V updateValue(V value, Key key);<br>}<br>_______________________________________________<br>infinispan-dev mailing list<br><a href="mailto:infinispan-dev@lists.jboss.org">infinispan-dev@lists.jboss.org</a><br>https://lists.jboss.org/mailman/listinfo/infinispan-dev<br></div></blockquote></div><br><div apple-content-edited="true">
<span class="Apple-style-span" style="border-collapse: separate; color: rgb(0, 0, 0); font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: 2; text-align: -webkit-auto; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-border-horizontal-spacing: 0px; -webkit-border-vertical-spacing: 0px; -webkit-text-decorations-in-effect: none; -webkit-text-size-adjust: auto; -webkit-text-stroke-width: 0px; font-size: medium; "><span class="Apple-style-span" style="border-collapse: separate; color: rgb(0, 0, 0); font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: 2; text-align: -webkit-auto; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-border-horizontal-spacing: 0px; -webkit-border-vertical-spacing: 0px; -webkit-text-decorations-in-effect: none; -webkit-text-size-adjust: auto; -webkit-text-stroke-width: 0px; font-size: medium; "><div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><span class="Apple-style-span" style="border-collapse: separate; color: rgb(0, 0, 0); font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: 2; text-align: -webkit-auto; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-border-horizontal-spacing: 0px; -webkit-border-vertical-spacing: 0px; -webkit-text-decorations-in-effect: none; -webkit-text-size-adjust: auto; -webkit-text-stroke-width: 0px; font-size: medium; "><div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><span class="Apple-style-span" style="border-collapse: separate; color: rgb(0, 0, 0); font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: 2; text-align: -webkit-auto; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-border-horizontal-spacing: 0px; -webkit-border-vertical-spacing: 0px; -webkit-text-decorations-in-effect: none; -webkit-text-size-adjust: auto; -webkit-text-stroke-width: 0px; font-size: medium; "><div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><span class="Apple-style-span" style="border-collapse: separate; color: rgb(0, 0, 0); font-family: Helvetica; font-style: normal; font-variant: normal; font-weight: normal; letter-spacing: normal; line-height: normal; orphans: 2; text-align: -webkit-auto; text-indent: 0px; text-transform: none; white-space: normal; widows: 2; word-spacing: 0px; -webkit-border-horizontal-spacing: 0px; -webkit-border-vertical-spacing: 0px; -webkit-text-decorations-in-effect: none; -webkit-text-size-adjust: auto; -webkit-text-stroke-width: 0px; font-size: medium; "><div style="word-wrap: break-word; -webkit-nbsp-mode: space; -webkit-line-break: after-white-space; "><div>Cheers,</div><div>--&nbsp;<br>Mircea Markus</div><div>Infinispan lead&nbsp;(<a href="http://www.infinispan.org">www.infinispan.org</a>)</div><div><br></div></div></span></div></span></div></span></div></span><br class="Apple-interchange-newline"></span><br class="Apple-interchange-newline">
</div>
<br></div></body></html>