[jboss-cvs] JBossAS SVN: r106958 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Jul 20 17:52:30 EDT 2010
Author: pferraro
Date: 2010-07-20 17:52:30 -0400 (Tue, 20 Jul 2010)
New Revision: 106958
Added:
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java
Modified:
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java
projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java
Log:
Extract cache invoker from DistributedCacheManagerImpl, for ease of testing
Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java 2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java 2010-07-20 21:52:30 UTC (rev 106958)
@@ -30,8 +30,10 @@
*/
public interface CacheInvoker
{
- <R> R invoke(Operation<R> operation);
+ <R> R invoke(Cache<String, AtomicMap<Object, Object>> cache, Operation<R> operation);
+ void setForceSynchronous(boolean forceSynchronous);
+
interface Operation<R>
{
R invoke(Cache<String, AtomicMap<Object, Object>> cache);
Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java 2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java 2010-07-20 21:52:30 UTC (rev 106958)
@@ -50,7 +50,7 @@
CacheContainer container = this.registry.getCacheContainer(this.cacheContainerName);
SessionAttributeStorage<T> storage = this.getSessionAttributeStorage(manager);
- return new DistributedCacheManagerImpl<T>(manager, container, storage, 10, 100);
+ return new DistributedCacheManagerImpl<T>(manager, container, storage, new RetryingCacheInvoker(10, 100));
}
@SuppressWarnings("unchecked")
Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java 2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java 2010-07-20 21:52:30 UTC (rev 106958)
@@ -42,9 +42,7 @@
import org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.remoting.transport.jgroups.SuspectException;
import org.infinispan.transaction.tm.BatchModeTransactionManager;
-import org.infinispan.util.concurrent.TimeoutException;
import org.jboss.logging.Logger;
import org.jboss.web.tomcat.service.session.distributedcache.spi.BatchingManager;
import org.jboss.web.tomcat.service.session.distributedcache.spi.DistributableSessionMetadata;
@@ -58,7 +56,7 @@
* @author Paul Ferraro
*/
@Listener
-public class DistributedCacheManagerImpl<T extends OutgoingDistributableSessionData> implements DistributedCacheManager<T>, CacheInvoker
+public class DistributedCacheManagerImpl<T extends OutgoingDistributableSessionData> implements DistributedCacheManager<T>
{
static String mask(String sessionId)
{
@@ -83,20 +81,18 @@
private final LocalDistributableSessionManager manager;
private final CacheContainer container;
private final SessionAttributeStorage<T> attributeStorage;
- private final int[] backOffIntervals;
+ private final CacheInvoker invoker;
private Cache<String, AtomicMap<Object, Object>> cache;
private BatchingManager batchingManager;
private boolean passivationEnabled = false;
- private volatile boolean forceSynchronous = false;
-
- public DistributedCacheManagerImpl(LocalDistributableSessionManager manager, CacheContainer container, SessionAttributeStorage<T> attributeStorage, int... backOffIntervals)
+ public DistributedCacheManagerImpl(LocalDistributableSessionManager manager, CacheContainer container, SessionAttributeStorage<T> attributeStorage, CacheInvoker invoker)
{
this.manager = manager;
this.container = container;
this.attributeStorage = attributeStorage;
- this.backOffIntervals = backOffIntervals;
+ this.invoker = invoker;
}
@Override
@@ -188,7 +184,7 @@
}
};
- this.invoke(operation);
+ this.invoker.invoke(this.cache, operation);
}
@Override
@@ -214,7 +210,7 @@
}
};
- AtomicMap<Object, Object> data = this.invoke(operation);
+ AtomicMap<Object, Object> data = this.invoker.invoke(this.cache, operation);
// If requested session is no longer in the cache; return null
if (data == null) return null;
@@ -270,7 +266,7 @@
}
};
- this.invoke(operation);
+ this.invoker.invoke(this.cache, operation);
}
/**
@@ -290,7 +286,7 @@
}
};
- this.invoke(operation);
+ this.invoker.invoke(this.cache, operation);
}
/**
@@ -323,7 +319,7 @@
}
};
- this.invoke(operation);
+ this.invoker.invoke(this.cache, operation);
}
/**
@@ -361,61 +357,9 @@
@Override
public void setForceSynchronous(boolean forceSynchronous)
{
- this.forceSynchronous = forceSynchronous;
+ this.invoker.setForceSynchronous(forceSynchronous);
}
-
- /**
- * {@inheritDoc}
- * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#invoke(org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker.Operation)
- */
- @Override
- public <R> R invoke(CacheInvoker.Operation<R> operation)
- {
- Exception exception = null;
- for (int i = 0; i <= this.backOffIntervals.length; ++i)
- {
- if (this.forceSynchronous)
- {
- this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS);
- }
-
- try
- {
- return operation.invoke(this.cache);
- }
- catch (TimeoutException e)
- {
- exception = e;
- }
- catch (SuspectException e)
- {
- exception = e;
- }
-
- if (i < this.backOffIntervals.length)
- {
- int delay = this.backOffIntervals[i];
-
- try
- {
- if (this.log.isTraceEnabled())
- {
- this.log.trace(String.format("Cache operation failed. Retrying in %d ms", Integer.valueOf(delay)), exception);
- }
-
- Thread.sleep(delay);
- }
- catch (InterruptedException e)
- {
- Thread.currentThread().interrupt();
- }
- }
- }
-
- throw new RuntimeException(String.format("Aborting cache operation after %d retries.", Integer.valueOf(this.backOffIntervals.length + 1)), exception);
- }
-
/**
* {@inheritDoc}
* @see org.jboss.web.tomcat.service.session.distributedcache.spi.DistributedCacheManager#getSessionOwnershipSupport()
Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java 2010-07-20 21:52:30 UTC (rev 106958)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.web.tomcat.service.session.distributedcache.impl;
+
+import org.infinispan.Cache;
+import org.infinispan.atomic.AtomicMap;
+import org.infinispan.context.Flag;
+import org.infinispan.remoting.transport.jgroups.SuspectException;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.jboss.logging.Logger;
+
+/**
+ * A cache invoker implementation that retries after a specified set of intervals upon timeout or suspect.
+ * @author Paul Ferraro
+ */
+public class RetryingCacheInvoker implements CacheInvoker
+{
+ private static final Logger log = Logger.getLogger(RetryingCacheInvoker.class);
+
+ private final int[] backOffIntervals;
+
+ private volatile boolean forceSynchronous = false;
+
+ public RetryingCacheInvoker(int... backOffIntervals)
+ {
+ this.backOffIntervals = backOffIntervals;
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#invoke(org.infinispan.Cache, org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker.Operation)
+ */
+ @Override
+ public <R> R invoke(Cache<String, AtomicMap<Object, Object>> cache, Operation<R> operation)
+ {
+ Exception exception = null;
+
+ for (int i = 0; i <= this.backOffIntervals.length; ++i)
+ {
+ if (this.forceSynchronous)
+ {
+ cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS);
+ }
+
+ try
+ {
+ return operation.invoke(cache);
+ }
+ catch (TimeoutException e)
+ {
+ exception = e;
+ }
+ catch (SuspectException e)
+ {
+ exception = e;
+ }
+
+ if (i < this.backOffIntervals.length)
+ {
+ int delay = this.backOffIntervals[i];
+
+ try
+ {
+ if (log.isTraceEnabled())
+ {
+ log.trace(String.format("Cache operation failed. Retrying in %d ms", Integer.valueOf(delay)), exception);
+ }
+
+ Thread.sleep(delay);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ throw new RuntimeException(String.format("Aborting cache operation after %d retries.", Integer.valueOf(this.backOffIntervals.length + 1)), exception);
+ }
+
+ /**
+ * {@inheritDoc}
+ * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#setForceSynchronous(boolean)
+ */
+ @Override
+ public void setForceSynchronous(boolean forceSynchronous)
+ {
+ this.forceSynchronous = forceSynchronous;
+ }
+}
More information about the jboss-cvs-commits
mailing list