[infinispan-dev] dist exec draining thread pool

Mircea Markus mmarkus at redhat.com
Tue Jan 8 09:58:22 EST 2013


On 8 Jan 2013, at 14:00, Ales Justin wrote:

> Invoking a few
how many tasks?
The Executor that rejects the task is configured externally. If 3 threads are not enough you should either increase the number of threads or use a different rejection policy. 
 

> simple tasks quickly drains out a thread pool of size 3.
> 
> ------
> 
> 14:41:43,061 WARN  [org.infinispan.distexec.DefaultExecutorService] (notification-thread-0) ISPN000007: Failed local execution : java.util.concurrent.RejectedExecutionException: Current thread pool executor queue: [java.util.concurrent.FutureTask at cee8427, java.util.concurrent.FutureTask at 7f262312, java.util.concurrent.FutureTask at 20c8b3f5]
>        at org.jboss.as.capedwarf.services.SimpleThreadsHandler.rejectedExecution(SimpleThreadsHandler.java:35) [jboss-as-capedwarf-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:767) [classes.jar:1.6.0_37]
>        at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:658) [classes.jar:1.6.0_37]
>        at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:92) [classes.jar:1.6.0_37]
>        at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.invokeLocally(DefaultExecutorService.java:1110) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
>        at org.infinispan.distexec.DefaultExecutorService$DistributedTaskPart.execute(DefaultExecutorService.java:1067) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
>        at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:486) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
>        at org.infinispan.distexec.DefaultExecutorService.submit(DefaultExecutorService.java:472) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
>        at org.jboss.capedwarf.common.infinispan.InfinispanUtils.distribute(InfinispanUtils.java:103) [capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at org.jboss.capedwarf.common.infinispan.InfinispanUtils.fire(InfinispanUtils.java:92) [capedwarf-common-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at org.jboss.capedwarf.datastore.notifications.AbstractCacheListener.executeCallable(AbstractCacheListener.java:67) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at org.jboss.capedwarf.datastore.stats.AbstractEagerListener.executeCallables(AbstractEagerListener.java:52) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at org.jboss.capedwarf.datastore.stats.AbstractEagerListener.onPreRemove(AbstractEagerListener.java:47) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at org.jboss.capedwarf.datastore.notifications.AbstractPutRemoveCacheListener.onRemove(AbstractPutRemoveCacheListener.java:62) [capedwarf-datastore-1.0.0-SNAPSHOT.jar:1.0.0-SNAPSHOT]
>        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) [classes.jar:1.6.0_37]
>        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:39) [classes.jar:1.6.0_37]
>        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:25) [classes.jar:1.6.0_37]
>        at java.lang.reflect.Method.invoke(Method.java:597) [classes.jar:1.6.0_37]
>        at org.infinispan.notifications.AbstractListenerImpl$ListenerInvocation$1.run(AbstractListenerImpl.java:200) [infinispan-core-5.2.0.Beta6.jar:5.2.0.Beta6]
>        at java.util.concurrent.ThreadPoolExecutor$Worker.runTask(ThreadPoolExecutor.java:886) [classes.jar:1.6.0_37]
>        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:908) [classes.jar:1.6.0_37]
>        at java.lang.Thread.run(Thread.java:680) [classes.jar:1.6.0_37]
> 
> this is the code that invokes DistService:
> 
>    /**
>     * Submit the task to distributed execution env, it could be a fire-n-forget way.
>     */
>    public static <R> Future<R> fire(final String appId, final CacheName template, final Callable<R> task, Object... keys) {
>        return distribute(appId, template, task, false, keys);
>    }
> 
>    private static <R> Future<R> distribute(final String appId, final CacheName template, final Callable<R> task, final boolean direct, final Object... keys) {
>        if (cacheManager == null)
>            throw new IllegalArgumentException("CacheManager is null, should not be here?!");
> 
>        final Cache cache = getCache(appId, template);
>        try {
>            final ExecutorService executor = (direct ? ExecutorFactory.getDirectExecutor() : ExecutorFactory.getInstance());
>            final DistributedExecutorService des = new DefaultExecutorService(cache, executor);
>            return des.submit(task, keys);
>        } catch (Exception e) {
>            throw (e instanceof RuntimeException) ? (RuntimeException) e : new RuntimeException(e);
>        }
>    }
> 
> where the non-direct executor service is
> 
>            int maxPoolSize = Integer.parseInt(System.getProperty("jboss.capedwarf.maxPoolSize", "3"));
>            executor = new ThreadPoolExecutor(1, maxPoolSize, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<Runnable>(maxPoolSize), this);
> 
> and the task are something like this:
> 
> public abstract class AbstractUpdateTask<V> extends BaseTxTask<String, V, Entity> {
>    private final CapedwarfEnvironment env;
>    private final Update update;
> 
>    public AbstractUpdateTask(Update update) {
>        this.env = CapedwarfEnvironment.getThreadLocalInstance();
>        this.update = update;
>    }
> 
>    protected Entity callInTx() throws Exception {
>        CapedwarfEnvironment previous = CapedwarfEnvironment.setThreadLocalInstance(env);
>        try {
>            return callInTxInternal();
>        } finally {
>            if (previous != null) {
>                CapedwarfEnvironment.setThreadLocalInstance(previous);
>            } else {
>                CapedwarfEnvironment.clearThreadLocalInstance();
>            }
>        }
>    }
> 
>    private Entity callInTxInternal() throws Exception {
>        final DatastoreService service = DatastoreServiceFactory.getDatastoreService();
> 
>        final String cacheKey = update.statsKind() + update.statsNamespace();
> 
>        lock(cacheKey);
> 
>        V value = getCache().get(cacheKey);
>        Key key = provideKey(value);
>        Entity entity;
> 
>        String oldNamespace = NamespaceManager.get();
>        NamespaceManager.set(update.statsNamespace());
>        try {
>            entity = getOrCreateEntity(service, key);
>            entity = update.update(entity);
>            key = service.put(entity);
>        } finally {
>            NamespaceManager.set(oldNamespace);
>        }
> 
>        getCache().put(cacheKey, updateValue(value, key));
>        return entity;
>    }
> 
>    private void lock(String cacheKey) {
>        AdvancedCache<String, V> ac = getCache().getAdvancedCache();
>        if (ac.lock(cacheKey) == false)
>            throw new IllegalArgumentException("Cannot get a lock on key for " + cacheKey);
>    }
> 
>    private Entity getOrCreateEntity(DatastoreService service, Key key) {
>        if (key == null) {
>            return createNewEntity();
>        } else {
>            Map<Key, Entity> map = service.get(null, Collections.singleton(key));
>            if (map.isEmpty()) {
>                return createNewEntity();
>            } else {
>                return map.values().iterator().next();
>            }
>        }
>    }
> 
>    private Entity createNewEntity() {
>        Entity entity = new Entity(update.statsKind());
>        update.initialize(entity);
>        return entity;
>    }
> 
>    protected abstract Key provideKey(V value);
> 
>    protected abstract V updateValue(V value, Key key);
> }
> _______________________________________________
> infinispan-dev mailing list
> infinispan-dev at lists.jboss.org
> https://lists.jboss.org/mailman/listinfo/infinispan-dev

Cheers,
-- 
Mircea Markus
Infinispan lead (www.infinispan.org)




-------------- next part --------------
An HTML attachment was scrubbed...
URL: http://lists.jboss.org/pipermail/infinispan-dev/attachments/20130108/64a6893c/attachment-0001.html 


More information about the infinispan-dev mailing list