[jboss-svn-commits] JBL Code SVN: r26225 - in labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta: tests/src/org/jboss/internal/soa/esb/services/registry and 1 other directory.
jboss-svn-commits at lists.jboss.org
jboss-svn-commits at lists.jboss.org
Thu Apr 23 09:17:59 EDT 2009
Author: kevin.conner at jboss.com
Date: 2009-04-23 09:17:59 -0400 (Thu, 23 Apr 2009)
New Revision: 26225
Modified:
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptor.java
labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptorUnitTest.java
Log:
Relax synchronization in caching registry interceptor: JBESB-2528
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptor.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptor.java 2009-04-23 12:16:58 UTC (rev 26224)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptor.java 2009-04-23 13:17:59 UTC (rev 26225)
@@ -19,9 +19,13 @@
*/
package org.jboss.internal.soa.esb.services.registry;
-import java.util.ArrayList;
-import java.util.Collections;
+import java.util.Arrays;
+import java.util.Iterator;
import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.commons.collections.map.LRUMap;
import org.apache.log4j.Logger;
@@ -67,12 +71,24 @@
*/
private final LRUMap serviceInfoMap = new LRUMap(MAX_CACHE_SIZE) ;
+ /**
+ * Find all Services assigned to the Red Hat/JBossESB organization.
+ * @return Collection of Strings containing the service names.
+ * @throws RegistryException
+ */
public List<String> findAllServices() throws RegistryException
{
// Do not cache, go direct to the registry
return getRegistry().findAllServices() ;
}
+ /**
+ * Find all services that belong to the supplied category.
+ *
+ * @param serviceCategoryName - name of the category to which the service belongs.
+ * @return Collection of Strings containing the service names
+ * @throws RegistryException
+ */
public List<String> findServices(final String category)
throws RegistryException
{
@@ -80,127 +96,280 @@
return getRegistry().findServices(category) ;
}
+ /**
+ * Returns the first EPR in the list that belong to a specific category and service combination.
+ *
+ * @param serviceCategoryName - name of the category to which the service belongs.
+ * @param serviceName - name of the service to which the EPS belongs.
+ * @return EPR.
+ * @throws RegistryException
+ */
public EPR findEPR(final String category, final String name)
throws RegistryException, ServiceNotFoundException
{
- final List<EPR> eprs = findEPRs(category, name) ;
- return (eprs.isEmpty() ? null : eprs.get(0)) ;
+ final Service service = new Service(category, name) ;
+ final ConcurrentMap<EPR, EPR> eprs = getEPRs(service) ;
+ final Iterator<EPR> eprIter = eprs.keySet().iterator() ;
+ if (eprIter.hasNext())
+ {
+ return eprIter.next() ;
+ }
+ else
+ {
+ return null;
+ }
}
+ /**
+ * Finds all the EPRs that belong to a specific category and service combination.
+ *
+ * @param serviceCategoryName - name of the category to which the service belongs.
+ * @param serviceName - name of the service to which the EPS belongs.
+ * @return Collection of EPRs.
+ * @throws RegistryException
+ */
public List<EPR> findEPRs(final String category, final String name)
throws RegistryException, ServiceNotFoundException
{
final Service service = new Service(category, name) ;
- return Collections.unmodifiableList(getEPRs(service)) ;
+ final ConcurrentMap<EPR, EPR> eprs = getEPRs(service) ;
+ return Arrays.asList(eprs.keySet().toArray(new EPR[0])) ;
}
+ /**
+ * Registers an EPR under the specified category and service. If the specified service does
+ * not exist, it will be created at the same time.
+ *
+ * @param serviceCategoryName - name of the category to which the service belongs.
+ * @param serviceName - name of the service to which the EPS belongs.
+ * @param serviceDescription - human readable description of the service,
+ * only used when it the service does not yet exist.
+ * @param epr - the EndPointReference (EPR) that needs to be registered.
+ * @param eprDescription - human readable description of the EPR
+ * @throws RegistryException
+ */
public void registerEPR(final String category, final String name,
final String serviceDescription, final EPR epr, final String eprDescription)
throws RegistryException
{
final Service service = new Service(category, name) ;
- synchronized(this)
+ final ServiceInfo serviceInfo = getServiceInfo(service) ;
+ if (serviceInfo != null)
{
+ serviceInfo.acquireWriteLock() ;
+ }
+ try
+ {
getRegistry().registerEPR(category, name, serviceDescription, epr, eprDescription) ;
- final List<EPR> eprs = getCurrentEPRs(service) ;
- if (eprs != null)
+ if (serviceInfo != null)
{
- eprs.add(epr) ;
+ final ConcurrentMap<EPR, EPR> eprs = serviceInfo.getEPRs() ;
+ if (eprs != null)
+ {
+ eprs.put(epr, epr) ;
+ }
}
}
+ finally
+ {
+ if (serviceInfo != null)
+ {
+ serviceInfo.releaseWriteLock() ;
+ }
+ }
}
+ /**
+ * Removes an EPR from the Registry.
+ * @param serviceCategoryName - name of the category to which the service belongs.
+ * @param serviceName - name of the service to which the EPS belongs.
+ * @param epr - the EndPointReference (EPR) that needs to be unregistered.
+ * @throws RegistryException
+ */
public void unRegisterEPR(final String category, final String name,
final EPR epr) throws RegistryException, ServiceNotFoundException
{
final Service service = new Service(category, name) ;
- synchronized(this)
+ final ServiceInfo serviceInfo = getServiceInfo(service) ;
+ if (serviceInfo != null)
{
- final List<EPR> eprs = getCurrentEPRs(service) ;
- if ((eprs != null) && eprs.remove(epr) && (eprs.size() == 0))
+ serviceInfo.acquireWriteLock() ;
+ }
+ try
+ {
+ getRegistry().unRegisterEPR(category, name, epr) ;
+ if (serviceInfo != null)
{
- serviceInfoMap.remove(service) ;
- if (LOGGER.isInfoEnabled())
+ final ConcurrentMap<EPR, EPR> eprs = serviceInfo.getEPRs() ;
+ if (eprs != null)
{
- LOGGER.debug("Cache removing service " + service) ;
+ eprs.remove(epr) ;
}
}
- getRegistry().unRegisterEPR(category, name, epr) ;
}
+ finally
+ {
+ if (serviceInfo != null)
+ {
+ serviceInfo.releaseWriteLock() ;
+ }
+ }
}
+ /**
+ * Removes a service from the Registry along with all the ServiceBindings underneath it.
+ *
+ * @param category - name of the service category, for example 'transformation'.
+ * @param serviceName - name of the service, for example 'smooks'.
+ * @throws RegistryException
+ */
public void unRegisterService(final String category, final String name)
throws RegistryException, ServiceNotFoundException
{
final Service service = new Service(category, name) ;
- synchronized(this)
+ final ServiceInfo serviceInfo = getServiceInfo(service) ;
+ if (serviceInfo != null)
{
- serviceInfoMap.remove(service) ;
- if (LOGGER.isInfoEnabled())
+ serviceInfo.acquireWriteLock() ;
+ }
+ try
+ {
+ getRegistry().unRegisterService(category, name) ;
+ removeServiceInfo(service) ;
+ if (LOGGER.isDebugEnabled())
{
LOGGER.debug("Cache removing service " + service) ;
}
- getRegistry().unRegisterService(category, name) ;
}
+ finally
+ {
+ if (serviceInfo != null)
+ {
+ serviceInfo.releaseWriteLock() ;
+ }
+ }
}
- private synchronized List<EPR> getEPRs(final Service service)
- throws RegistryException, ServiceNotFoundException
+ /**
+ * Get the service information if it is still valid.
+ * @param service The service information.
+ * @return The service information or null
+ */
+ private synchronized ServiceInfo getServiceInfo(final Service service)
{
- final List<EPR> eprs = getCurrentEPRs(service) ;
- if (eprs != null)
+ final ServiceInfo serviceInfo = (ServiceInfo)serviceInfoMap.get(service) ;
+ if (serviceInfo != null)
{
- return eprs ;
+ if (serviceInfo.isValid())
+ {
+ return serviceInfo ;
+ }
+ removeServiceInfo(service) ;
}
+ return null ;
+ }
+
+ /**
+ * Create new service information or return current information if present.
+ * @param service The service information.
+ * @return The service information
+ */
+ private synchronized ServiceInfo createServiceInfo(final Service service)
+ {
+ final ServiceInfo serviceInfo = new ServiceInfo() ;
+ final ServiceInfo origServiceInfo = (ServiceInfo)serviceInfoMap.put(service, serviceInfo) ;
+ if ((origServiceInfo != null) && origServiceInfo.isValid())
+ {
+ serviceInfoMap.put(service, origServiceInfo) ;
+ return origServiceInfo ;
+ }
else
{
- final List<EPR> currentEPRs = getRegistry().findEPRs(service.getCategory(), service.getName()) ;
- final List<EPR> copyEPRs = new ArrayList<EPR>(currentEPRs) ;
- final ServiceInfo serviceInfo = new ServiceInfo(copyEPRs) ;
- serviceInfoMap.put(service, serviceInfo) ;
- if (LOGGER.isInfoEnabled())
- {
- LOGGER.debug("Cache reloaded for service " + service) ;
- }
- return copyEPRs ;
+ return serviceInfo ;
}
}
-
- private synchronized List<EPR> getCurrentEPRs(final Service service)
+
+ /**
+ * Remove the service information from map.
+ * @param service The service information
+ */
+ private synchronized void removeServiceInfo(final Service service)
{
- final ServiceInfo serviceInfo = (ServiceInfo)serviceInfoMap.get(service) ;
+ serviceInfoMap.remove(service) ;
+ }
+
+ /**
+ * Get the EPRs assocaited with the service, updating the cache if necessary
+ * @param service The service to query.
+ * @return The map of EPRs.
+ * @throws RegistryException For errors accessing the registry delegate.
+ * @throws ServiceNotFoundException If the service is not in the registry.
+ */
+ private ConcurrentMap<EPR, EPR> getEPRs(final Service service)
+ throws RegistryException, ServiceNotFoundException
+ {
+ final ServiceInfo serviceInfo = getServiceInfo(service) ;
if (serviceInfo != null)
{
- final boolean infoEnabled = LOGGER.isInfoEnabled() ;
- if (serviceInfo.isValid())
+ serviceInfo.acquireReadLock() ;
+ try
{
- if (infoEnabled)
+ final ConcurrentMap<EPR, EPR> eprs = serviceInfo.getEPRs() ;
+ if (eprs != null)
{
- LOGGER.debug("Cache hit for service " + service) ;
+ return eprs ;
}
- return serviceInfo.getEPRs() ;
}
+ finally
+ {
+ serviceInfo.releaseReadLock() ;
+ }
+ }
+ final ServiceInfo newServiceInfo = createServiceInfo(service) ;
+ newServiceInfo.acquireWriteLock() ;
+ try
+ {
+ final ConcurrentMap<EPR, EPR> eprs = newServiceInfo.getEPRs() ;
+ if (eprs != null)
+ {
+ return eprs ;
+ }
else
{
- if (infoEnabled)
+ final List<EPR> currentEPRs = getRegistry().findEPRs(service.getCategory(), service.getName()) ;
+ final ConcurrentMap<EPR, EPR> newEPRs = new ConcurrentHashMap<EPR, EPR>() ;
+ for(EPR epr: currentEPRs)
{
- LOGGER.debug("Cache expiry for service " + service) ;
+ newEPRs.put(epr, epr) ;
}
- serviceInfoMap.remove(service) ;
+ newServiceInfo.setEPRs(newEPRs) ;
+ if (LOGGER.isDebugEnabled())
+ {
+ LOGGER.debug("Cache reloaded for service " + service) ;
+ }
+ return newEPRs ;
}
}
- return null ;
+ finally
+ {
+ newServiceInfo.releaseWriteLock() ;
+ }
}
+ /**
+ * Class representing the service information
+ * @author kevin
+ */
private static class ServiceInfo
{
private final long expiryTime ;
- private List<EPR> eprs ;
+ private ConcurrentMap<EPR, EPR> eprs ;
- private ServiceInfo(final List<EPR> eprs)
+ private ReadWriteLock lock = new ReentrantReadWriteLock() ;
+
+ private ServiceInfo()
{
- this.eprs = eprs ;
if (VALIDITY_PERIOD > 0)
{
expiryTime = System.currentTimeMillis() + VALIDITY_PERIOD ;
@@ -216,10 +385,35 @@
return System.currentTimeMillis() < expiryTime ;
}
- List<EPR> getEPRs()
+ ConcurrentMap<EPR, EPR> getEPRs()
{
return eprs ;
}
+
+ void setEPRs(final ConcurrentMap<EPR, EPR> eprs)
+ {
+ this.eprs = eprs ;
+ }
+
+ void acquireWriteLock()
+ {
+ lock.writeLock().lock() ;
+ }
+
+ void releaseWriteLock()
+ {
+ lock.writeLock().unlock() ;
+ }
+
+ void acquireReadLock()
+ {
+ lock.readLock().lock() ;
+ }
+
+ void releaseReadLock()
+ {
+ lock.readLock().unlock() ;
+ }
}
static
Modified: labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptorUnitTest.java
===================================================================
--- labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptorUnitTest.java 2009-04-23 12:16:58 UTC (rev 26224)
+++ labs/jbossesb/branches/JBESB_4_4_GA_CP/product/rosetta/tests/src/org/jboss/internal/soa/esb/services/registry/CachingRegistryInterceptorUnitTest.java 2009-04-23 13:17:59 UTC (rev 26225)
@@ -20,6 +20,10 @@
package org.jboss.internal.soa.esb.services.registry;
import java.util.List;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
@@ -96,9 +100,6 @@
assertEquals("New EPR count", 2, registerNewEPRs.size()) ;
// unRegister goes to registry and removes from cache
- // If the EPR list is empty after unRegister then the
- // service details expire and the next access will defer
- // to the registry
assertEquals("findEPRs count", 1, stats.getFindEPRsCount()) ;
assertEquals("unRegisterEPR count", 0, stats.getUnRegisterEPRCount()) ;
final List<EPR> unRegisterOrigEPRs = interceptor.findEPRs(category, name) ;
@@ -114,13 +115,13 @@
interceptor.unRegisterEPR(category, name, epr1) ;
assertEquals("unRegisterEPR count", 2, stats.getUnRegisterEPRCount()) ;
final List<EPR> finalEPRs = interceptor.findEPRs(category, name) ;
- assertEquals("findEPRs count", 2, stats.getFindEPRsCount()) ;
+ assertEquals("findEPRs count", 1, stats.getFindEPRsCount()) ;
assertNotNull("Final EPRs", finalEPRs) ;
assertEquals("Final EPR count", 0, finalEPRs.size()) ;
final EPR finalEPR = interceptor.findEPR(category, name) ;
assertNull("Final EPR", finalEPR) ;
assertEquals("findEPR count", 0, stats.getFindEPRCount()) ;
- assertEquals("findEPRs count", 2, stats.getFindEPRsCount()) ;
+ assertEquals("findEPRs count", 1, stats.getFindEPRsCount()) ;
// Need to fix MockRegistry as it does not correctly handle
// service information. We cannot use unRegisterService unless
@@ -132,7 +133,54 @@
interceptor.unRegisterService(category, name) ;
assertEquals("unRegisterService count", 1, stats.getUnRegisterServiceCount()) ;
}
-
+
+ /**
+ * Test concurrent access to next interceptor in chain.
+ * @throws Exception
+ */
+ public void testConcurrency()
+ throws Exception
+ {
+ final int numServices = 5 ;
+ final String category = "category" ;
+ final String name = "name" ;
+
+ final CachingRegistryInterceptor interceptor = new CachingRegistryInterceptor() ;
+ final CacheRegistryConcurrencyInterceptor concurrency = new CacheRegistryConcurrencyInterceptor(new CyclicBarrier(numServices)) ;
+ final MockRegistry registry = new MockRegistry() ;
+ concurrency.setRegistry(registry) ;
+ interceptor.setRegistry(concurrency) ;
+
+ final String[] names = new String[numServices] ;
+ for (int count = 0 ; count < numServices ; count++)
+ {
+ names[count] = name + count ;
+ registry.registerEPR(category, names[count], "description", new EPR(), "EPR description") ;
+ }
+ final int numThreads = numServices*2 ;
+
+ final CyclicBarrier barrier = new CyclicBarrier(numThreads) ;
+ final ConcurrencyTest[] tests = new ConcurrencyTest[numThreads] ;
+ for(int count = 0 ; count < numThreads ; count++)
+ {
+ tests[count] = new ConcurrencyTest(barrier, interceptor, category, names[count%numServices]) ;
+ tests[count].start() ;
+ }
+
+ for(int count = 0 ; count < numThreads ; count++)
+ {
+ tests[count].join();
+ }
+
+ for(int count = 0 ; count < numThreads ; count++)
+ {
+ assertNull("Throwable occurred", tests[count].getThrowable()) ;
+ }
+
+ assertEquals("Registry findEPRs invocation", numServices, concurrency.getFindEPRsCount()) ;
+ assertFalse("Barrier timeout", concurrency.isTimeout()) ;
+ }
+
private static final class CacheRegistryStatsInterceptor implements RegistryInterceptor
{
private Registry registry ;
@@ -235,4 +283,134 @@
this.registry = registry ;
}
}
+
+ private static final class ConcurrencyTest extends Thread
+ {
+ private final CyclicBarrier barrier ;
+ private final Registry registry ;
+ private final String category ;
+ private final String name ;
+
+ private Throwable throwable ;
+
+ ConcurrencyTest(final CyclicBarrier barrier, final Registry registry,
+ final String category, final String name)
+ {
+ this.barrier = barrier ;
+ this.registry = registry ;
+ this.category = category ;
+ this.name = name ;
+ }
+
+ public void run()
+ {
+ try
+ {
+ barrier.await() ;
+ registry.findEPRs(category, name) ;
+ }
+ catch (final Throwable th)
+ {
+ throwable = th ;
+ }
+ }
+
+ Throwable getThrowable()
+ {
+ return throwable ;
+ }
+ }
+
+ private static final class CacheRegistryConcurrencyInterceptor implements RegistryInterceptor
+ {
+ private final CyclicBarrier barrier ;
+
+ private volatile boolean timeout ;
+ private Registry registry ;
+ private final AtomicInteger findEPRsCount = new AtomicInteger() ;
+
+ public CacheRegistryConcurrencyInterceptor(final CyclicBarrier barrier)
+ {
+ this.barrier = barrier ;
+ }
+
+ public List<String> findAllServices()
+ throws RegistryException
+ {
+ return registry.findAllServices() ;
+ }
+
+ public EPR findEPR(final String serviceCategoryName, final String serviceName)
+ throws RegistryException, ServiceNotFoundException
+ {
+ return registry.findEPR(serviceCategoryName, serviceName) ;
+ }
+
+ public List<EPR> findEPRs(final String serviceCategoryName, final String serviceName)
+ throws RegistryException, ServiceNotFoundException
+ {
+ findEPRsCount.incrementAndGet() ;
+ if (!timeout)
+ {
+ try
+ {
+ barrier.await(10, TimeUnit.SECONDS) ;
+ }
+ catch (final TimeoutException te)
+ {
+ timeout = true ;
+ }
+ catch (final InterruptedException ie)
+ {
+ throw new RegistryException("Interrupted", ie) ;
+ }
+ catch (final BrokenBarrierException bbe)
+ {
+ throw new RegistryException("Broken barrier", bbe) ;
+ }
+ }
+ return registry.findEPRs(serviceCategoryName, serviceName) ;
+ }
+
+ public int getFindEPRsCount()
+ {
+ return findEPRsCount.get() ;
+ }
+
+ public boolean isTimeout()
+ {
+ return timeout ;
+ }
+
+ public List<String> findServices(final String serviceCategoryName)
+ throws RegistryException
+ {
+ return registry.findServices(serviceCategoryName) ;
+ }
+
+ public void registerEPR(final String serviceCategoryName, final String serviceName,
+ final String serviceDescription, final EPR epr, final String eprDescription)
+ throws RegistryException
+ {
+ registry.registerEPR(serviceCategoryName, serviceName, serviceDescription, epr, eprDescription) ;
+ }
+
+ public void unRegisterEPR(final String serviceCategoryName,
+ final String serviceName, final EPR epr)
+ throws RegistryException, ServiceNotFoundException
+ {
+ registry.unRegisterEPR(serviceCategoryName, serviceName, epr) ;
+ }
+
+ public void unRegisterService(final String category, final String serviceName)
+ throws RegistryException, ServiceNotFoundException
+ {
+ registry.unRegisterService(category, serviceName) ;
+ }
+
+ public void setRegistry(final Registry registry)
+ {
+ this.registry = registry ;
+ }
+ }
}
More information about the jboss-svn-commits
mailing list