[jboss-cvs] JBossAS SVN: r106958 - projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Jul 20 17:52:30 EDT 2010


Author: pferraro
Date: 2010-07-20 17:52:30 -0400 (Tue, 20 Jul 2010)
New Revision: 106958

Added:
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java
Modified:
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java
   projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java
Log:
Extract cache invoker from DistributedCacheManagerImpl, for ease of testing

Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java	2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/CacheInvoker.java	2010-07-20 21:52:30 UTC (rev 106958)
@@ -30,8 +30,10 @@
  */
 public interface CacheInvoker
 {
-   <R> R invoke(Operation<R> operation);
+   <R> R invoke(Cache<String, AtomicMap<Object, Object>> cache, Operation<R> operation);
    
+   void setForceSynchronous(boolean forceSynchronous);
+   
    interface Operation<R>
    {
       R invoke(Cache<String, AtomicMap<Object, Object>> cache);

Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java	2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerFactoryImpl.java	2010-07-20 21:52:30 UTC (rev 106958)
@@ -50,7 +50,7 @@
       CacheContainer container = this.registry.getCacheContainer(this.cacheContainerName);
       SessionAttributeStorage<T> storage = this.getSessionAttributeStorage(manager);
       
-      return new DistributedCacheManagerImpl<T>(manager, container, storage, 10, 100);
+      return new DistributedCacheManagerImpl<T>(manager, container, storage, new RetryingCacheInvoker(10, 100));
    }
 
    @SuppressWarnings("unchecked")

Modified: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java	2010-07-20 21:50:48 UTC (rev 106957)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/DistributedCacheManagerImpl.java	2010-07-20 21:52:30 UTC (rev 106958)
@@ -42,9 +42,7 @@
 import org.infinispan.notifications.cachelistener.event.CacheEntryActivatedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryModifiedEvent;
 import org.infinispan.notifications.cachelistener.event.CacheEntryRemovedEvent;
-import org.infinispan.remoting.transport.jgroups.SuspectException;
 import org.infinispan.transaction.tm.BatchModeTransactionManager;
-import org.infinispan.util.concurrent.TimeoutException;
 import org.jboss.logging.Logger;
 import org.jboss.web.tomcat.service.session.distributedcache.spi.BatchingManager;
 import org.jboss.web.tomcat.service.session.distributedcache.spi.DistributableSessionMetadata;
@@ -58,7 +56,7 @@
  * @author Paul Ferraro
  */
 @Listener
-public class DistributedCacheManagerImpl<T extends OutgoingDistributableSessionData> implements DistributedCacheManager<T>, CacheInvoker
+public class DistributedCacheManagerImpl<T extends OutgoingDistributableSessionData> implements DistributedCacheManager<T>
 {
    static String mask(String sessionId)
    {
@@ -83,20 +81,18 @@
    private final LocalDistributableSessionManager manager;
    private final CacheContainer container;
    private final SessionAttributeStorage<T> attributeStorage;
-   private final int[] backOffIntervals;
+   private final CacheInvoker invoker;
    
    private Cache<String, AtomicMap<Object, Object>> cache;
    private BatchingManager batchingManager;
    private boolean passivationEnabled = false;
    
-   private volatile boolean forceSynchronous = false;
-   
-   public DistributedCacheManagerImpl(LocalDistributableSessionManager manager, CacheContainer container, SessionAttributeStorage<T> attributeStorage, int... backOffIntervals)
+   public DistributedCacheManagerImpl(LocalDistributableSessionManager manager, CacheContainer container, SessionAttributeStorage<T> attributeStorage, CacheInvoker invoker)
    {
       this.manager = manager;
       this.container = container;
       this.attributeStorage = attributeStorage;
-      this.backOffIntervals = backOffIntervals;
+      this.invoker = invoker;
    }
 
    @Override
@@ -188,7 +184,7 @@
          }
       };
       
-      this.invoke(operation);
+      this.invoker.invoke(this.cache, operation);
    }
    
    @Override
@@ -214,7 +210,7 @@
          }
       };
       
-      AtomicMap<Object, Object> data = this.invoke(operation);
+      AtomicMap<Object, Object> data = this.invoker.invoke(this.cache, operation);
       
       // If requested session is no longer in the cache; return null
       if (data == null) return null;
@@ -270,7 +266,7 @@
          }
       };
       
-      this.invoke(operation);
+      this.invoker.invoke(this.cache, operation);
    }
 
    /**
@@ -290,7 +286,7 @@
          }
       };
       
-      this.invoke(operation);
+      this.invoker.invoke(this.cache, operation);
    }
 
    /**
@@ -323,7 +319,7 @@
          }
       };
       
-      this.invoke(operation);
+      this.invoker.invoke(this.cache, operation);
    }
 
    /**
@@ -361,61 +357,9 @@
    @Override
    public void setForceSynchronous(boolean forceSynchronous)
    {
-      this.forceSynchronous = forceSynchronous;
+      this.invoker.setForceSynchronous(forceSynchronous);
    }
-   
-   /**
-    * {@inheritDoc}
-    * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#invoke(org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker.Operation)
-    */
-   @Override
-   public <R> R invoke(CacheInvoker.Operation<R> operation)
-   {
-      Exception exception = null;
 
-      for (int i = 0; i <= this.backOffIntervals.length; ++i)
-      {
-         if (this.forceSynchronous)
-         {
-            this.cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS);
-         }
-         
-         try
-         {
-            return operation.invoke(this.cache);
-         }
-         catch (TimeoutException e)
-         {
-            exception = e;
-         }
-         catch (SuspectException e)
-         {
-            exception = e;
-         }
-         
-         if (i < this.backOffIntervals.length)
-         {
-            int delay = this.backOffIntervals[i];
-            
-            try
-            {
-               if (this.log.isTraceEnabled())
-               {
-                  this.log.trace(String.format("Cache operation failed.  Retrying in %d ms", Integer.valueOf(delay)), exception);
-               }
-               
-               Thread.sleep(delay);
-            }
-            catch (InterruptedException e)
-            {
-               Thread.currentThread().interrupt();
-            }
-         }
-      }
-      
-      throw new RuntimeException(String.format("Aborting cache operation after %d retries.", Integer.valueOf(this.backOffIntervals.length + 1)), exception);
-   }
-
    /**
     * {@inheritDoc}
     * @see org.jboss.web.tomcat.service.session.distributedcache.spi.DistributedCacheManager#getSessionOwnershipSupport()

Added: projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java
===================================================================
--- projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java	                        (rev 0)
+++ projects/cluster/ha-server-cache-ispn/trunk/src/main/java/org/jboss/ha/web/tomcat/service/session/distributedcache/impl/RetryingCacheInvoker.java	2010-07-20 21:52:30 UTC (rev 106958)
@@ -0,0 +1,109 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2010, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.ha.web.tomcat.service.session.distributedcache.impl;
+
+import org.infinispan.Cache;
+import org.infinispan.atomic.AtomicMap;
+import org.infinispan.context.Flag;
+import org.infinispan.remoting.transport.jgroups.SuspectException;
+import org.infinispan.util.concurrent.TimeoutException;
+import org.jboss.logging.Logger;
+
+/**
+ * A cache invoker implementation that retries after a specified set of intervals upon timeout or suspect.
+ * @author Paul Ferraro
+ */
+public class RetryingCacheInvoker implements CacheInvoker
+{
+   private static final Logger log = Logger.getLogger(RetryingCacheInvoker.class);
+   
+   private final int[] backOffIntervals;
+
+   private volatile boolean forceSynchronous = false;
+   
+   public RetryingCacheInvoker(int... backOffIntervals)
+   {
+      this.backOffIntervals = backOffIntervals;
+   }
+   
+   /**
+    * {@inheritDoc}
+    * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#invoke(org.infinispan.Cache, org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker.Operation)
+    */
+   @Override
+   public <R> R invoke(Cache<String, AtomicMap<Object, Object>> cache, Operation<R> operation)
+   {
+      Exception exception = null;
+
+      for (int i = 0; i <= this.backOffIntervals.length; ++i)
+      {
+         if (this.forceSynchronous)
+         {
+            cache.getAdvancedCache().withFlags(Flag.FORCE_SYNCHRONOUS);
+         }
+         
+         try
+         {
+            return operation.invoke(cache);
+         }
+         catch (TimeoutException e)
+         {
+            exception = e;
+         }
+         catch (SuspectException e)
+         {
+            exception = e;
+         }
+         
+         if (i < this.backOffIntervals.length)
+         {
+            int delay = this.backOffIntervals[i];
+            
+            try
+            {
+               if (log.isTraceEnabled())
+               {
+                  log.trace(String.format("Cache operation failed.  Retrying in %d ms", Integer.valueOf(delay)), exception);
+               }
+               
+               Thread.sleep(delay);
+            }
+            catch (InterruptedException e)
+            {
+               Thread.currentThread().interrupt();
+            }
+         }
+      }
+      
+      throw new RuntimeException(String.format("Aborting cache operation after %d retries.", Integer.valueOf(this.backOffIntervals.length + 1)), exception);
+   }
+
+   /**
+    * {@inheritDoc}
+    * @see org.jboss.ha.web.tomcat.service.session.distributedcache.impl.CacheInvoker#setForceSynchronous(boolean)
+    */
+   @Override
+   public void setForceSynchronous(boolean forceSynchronous)
+   {
+      this.forceSynchronous = forceSynchronous;
+   }
+}



More information about the jboss-cvs-commits mailing list