Author: chris.laprun(a)jboss.com
Date: 2011-09-20 13:32:45 -0400 (Tue, 20 Sep 2011)
New Revision: 7469
Modified:
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/AbstractConsumerRegistry.java
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/ConsumerCache.java
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/InMemoryConsumerRegistry.java
components/wsrp/trunk/hibernate-impl/src/main/java/org/gatein/wsrp/consumer/registry/hibernate/HibernateConsumerRegistry.java
components/wsrp/trunk/jcr-impl/src/main/java/org/gatein/wsrp/consumer/registry/JCRConsumerRegistry.java
Log:
- More clustering work: smarter ConsumerCache in AbstractConsumerRegistry which should now
properly work in a clustering environment.
- Removed some commented-out code.
Modified:
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/AbstractConsumerRegistry.java
===================================================================
---
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/AbstractConsumerRegistry.java 2011-09-20
17:20:48 UTC (rev 7468)
+++
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/AbstractConsumerRegistry.java 2011-09-20
17:32:45 UTC (rev 7469)
@@ -1,6 +1,6 @@
/*
* JBoss, a division of Red Hat
- * Copyright 2010, Red Hat Middleware, LLC, and individual
+ * Copyright 2011, Red Hat Middleware, LLC, and individual
* contributors as indicated by the @authors tag. See the
* copyright.txt in the distribution for a full listing of
* individual contributors.
@@ -42,10 +42,10 @@
import java.util.AbstractCollection;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
/**
* @author <a href="mailto:chris.laprun@jboss.com">Chris
Laprun</a>
@@ -68,7 +68,7 @@
protected ConsumerCache consumers = new InMemoryConsumerCache();
- public void setConsumerCache(ConsumerCache consumers)
+ public synchronized void setConsumerCache(ConsumerCache consumers)
{
if (consumers == null)
{
@@ -77,7 +77,7 @@
this.consumers = consumers;
}
- public void setSessionRegistry(SessionRegistry sessionRegistry)
+ public synchronized void setSessionRegistry(SessionRegistry sessionRegistry)
{
if (sessionRegistry == null)
{
@@ -96,7 +96,7 @@
return federatingPortletInvoker;
}
- public void setSessionEventBroadcaster(SessionEventBroadcaster
sessionEventBroadcaster)
+ public synchronized void setSessionEventBroadcaster(SessionEventBroadcaster
sessionEventBroadcaster)
{
if (sessionEventBroadcaster == null)
{
@@ -110,7 +110,7 @@
return migrationService;
}
- public void setMigrationService(MigrationService migrationService)
+ public synchronized void setMigrationService(MigrationService migrationService)
{
if (migrationService == null)
{
@@ -190,7 +190,7 @@
createConsumerFrom(info);
}
- public void setFederatingPortletInvoker(FederatingPortletInvoker
federatingPortletInvoker)
+ public synchronized void setFederatingPortletInvoker(FederatingPortletInvoker
federatingPortletInvoker)
{
this.federatingPortletInvoker = federatingPortletInvoker;
}
@@ -240,7 +240,7 @@
startOrStopConsumer(consumer, false, true);
}
- public String updateProducerInfo(ProducerInfo producerInfo)
+ public synchronized String updateProducerInfo(ProducerInfo producerInfo)
{
ParameterValidation.throwIllegalArgExceptionIfNull(producerInfo,
"ProducerInfo");
@@ -273,18 +273,7 @@
public void reloadConsumers()
{
- consumers.clear();
-
- Iterator<ProducerInfo> producerInfos = getProducerInfosFromStorage();
-
- // load the configured producers
- ProducerInfo producerInfo;
- while (producerInfos.hasNext())
- {
- producerInfo = producerInfos.next();
-
- createConsumerFrom(producerInfo);
- }
+ consumers.initFromStorage();
}
public void stop() throws Exception
@@ -503,35 +492,65 @@
}
}
- protected static class InMemoryConsumerCache implements ConsumerCache
+ protected class InMemoryConsumerCache implements ConsumerCache
{
- private Map<String, WSRPConsumer> consumers = new HashMap<String,
WSRPConsumer>(11);
+ private Map<String, WSRPConsumer> consumers = new
ConcurrentHashMap<String, WSRPConsumer>(11);
private boolean invalidated;
+ private long lastModified;
+ public void initFromStorage()
+ {
+ clear();
+ Iterator<ProducerInfo> infosFromStorage = getProducerInfosFromStorage();
+ while (infosFromStorage.hasNext())
+ {
+ ProducerInfo info = infosFromStorage.next();
+ consumers.put(info.getId(), createConsumerFrom(info));
+ }
+ lastModified = System.currentTimeMillis();
+ setInvalidated(false);
+ }
+
public Collection<WSRPConsumer> getConsumers()
{
+ refreshIfNeeded();
return consumers.values();
}
public WSRPConsumer getConsumer(String id)
{
- return consumers.get(id);
+ // try cache first
+ WSRPConsumer consumer = consumers.get(id);
+
+ // if we didn't find the consumer in cache, try to load it from JCR
+ if (consumer == null)
+ {
+ ProducerInfo info = loadProducerInfo(id);
+ if (info != null)
+ {
+ consumer = createConsumerFrom(info);
+ }
+ }
+ return consumer;
}
public WSRPConsumer removeConsumer(String id)
{
+ lastModified = System.currentTimeMillis();
return consumers.remove(id);
}
public void putConsumer(String id, WSRPConsumer consumer)
{
consumers.put(id, consumer);
+ lastModified = System.currentTimeMillis();
}
public void clear()
{
consumers.clear();
invalidated = true;
+ lastModified = System.currentTimeMillis();
}
public boolean isInvalidated()
@@ -543,5 +562,40 @@
{
this.invalidated = invalidated;
}
+
+ public long getLastModified()
+ {
+ return lastModified;
+ }
+
+ protected void refreshIfNeeded()
+ {
+ AbstractConsumerRegistry registry = AbstractConsumerRegistry.this;
+ // check if we need to refresh the local cache
+ if (isInvalidated() || registry.producerInfosGotModifiedSince(lastModified))
+ {
+ for (String id : registry.getConfiguredConsumersIds())
+ {
+ // only recreate the consumer if it's not in the cache or it's
been modified after we've been last modified
+ ProducerInfo info =
registry.getUpdatedProducerInfoIfModifiedSinceOrNull(id, lastModified);
+ if (consumers.get(id) == null)
+ {
+ if (info == null)
+ {
+ info = loadProducerInfo(id);
+ }
+ consumers.put(id, createConsumerFrom(info));
+ }
+ }
+
+ lastModified = System.currentTimeMillis();
+ setInvalidated(false);
+ }
+
+ }
}
+
+ protected abstract ProducerInfo getUpdatedProducerInfoIfModifiedSinceOrNull(String id,
long lastModified);
+
+ protected abstract boolean producerInfosGotModifiedSince(long lastModified);
}
Modified:
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/ConsumerCache.java
===================================================================
---
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/ConsumerCache.java 2011-09-20
17:20:48 UTC (rev 7468)
+++
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/ConsumerCache.java 2011-09-20
17:32:45 UTC (rev 7469)
@@ -43,4 +43,8 @@
boolean isInvalidated();
void setInvalidated(boolean invalidated);
+
+ long getLastModified();
+
+ void initFromStorage();
}
Modified:
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/InMemoryConsumerRegistry.java
===================================================================
---
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/InMemoryConsumerRegistry.java 2011-09-20
17:20:48 UTC (rev 7468)
+++
components/wsrp/trunk/consumer/src/main/java/org/gatein/wsrp/consumer/registry/InMemoryConsumerRegistry.java 2011-09-20
17:32:45 UTC (rev 7469)
@@ -140,6 +140,18 @@
keysToIds = null;
}
+ @Override
+ protected ProducerInfo getUpdatedProducerInfoIfModifiedSinceOrNull(String id, long
lastModified)
+ {
+ return null;
+ }
+
+ @Override
+ protected boolean producerInfosGotModifiedSince(long lastModified)
+ {
+ return false;
+ }
+
protected void initConsumers(SortedMap<String, WSRPConsumer> consumers)
{
if (!ParameterValidation.existsAndIsNotEmpty(consumers))
Modified:
components/wsrp/trunk/hibernate-impl/src/main/java/org/gatein/wsrp/consumer/registry/hibernate/HibernateConsumerRegistry.java
===================================================================
---
components/wsrp/trunk/hibernate-impl/src/main/java/org/gatein/wsrp/consumer/registry/hibernate/HibernateConsumerRegistry.java 2011-09-20
17:20:48 UTC (rev 7468)
+++
components/wsrp/trunk/hibernate-impl/src/main/java/org/gatein/wsrp/consumer/registry/hibernate/HibernateConsumerRegistry.java 2011-09-20
17:32:45 UTC (rev 7469)
@@ -133,4 +133,16 @@
sessionFactory = null;
super.stop();
}
+
+ @Override
+ protected ProducerInfo getUpdatedProducerInfoIfModifiedSinceOrNull(String id, long
lastModified)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ protected boolean producerInfosGotModifiedSince(long lastModified)
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified:
components/wsrp/trunk/jcr-impl/src/main/java/org/gatein/wsrp/consumer/registry/JCRConsumerRegistry.java
===================================================================
---
components/wsrp/trunk/jcr-impl/src/main/java/org/gatein/wsrp/consumer/registry/JCRConsumerRegistry.java 2011-09-20
17:20:48 UTC (rev 7468)
+++
components/wsrp/trunk/jcr-impl/src/main/java/org/gatein/wsrp/consumer/registry/JCRConsumerRegistry.java 2011-09-20
17:32:45 UTC (rev 7469)
@@ -69,7 +69,6 @@
public static final List<Class> mappingClasses = new ArrayList<Class>(6);
private InputStream configurationIS;
private long lastModified;
-// private Map<String, ProducerInfo> infoCache;
static
{
@@ -167,35 +166,32 @@
ChromatticSession session = persister.getSession();
- synchronized (this)
+ final long now = System.currentTimeMillis();
+
+ ProducerInfoMapping pim = session.findById(ProducerInfoMapping.class, key);
+ if (pim == null)
{
- final long now = System.currentTimeMillis();
+ throw new IllegalArgumentException("Couldn't find ProducerInfoMapping
associated with key " + key);
+ }
+ oldId = pim.getId();
+ newId = producerInfo.getId();
+ producerInfo.setLastModified(now);
+ pim.initFrom(producerInfo);
- ProducerInfoMapping pim = session.findById(ProducerInfoMapping.class, key);
- if (pim == null)
- {
- throw new IllegalArgumentException("Couldn't find
ProducerInfoMapping associated with key " + key);
- }
- oldId = pim.getId();
- newId = producerInfo.getId();
- producerInfo.setLastModified(now);
- pim.initFrom(producerInfo);
+ idUnchanged = oldId.equals(newId);
- idUnchanged = oldId.equals(newId);
+ if (!idUnchanged)
+ {
+ ProducerInfosMapping pims = getProducerInfosMapping(session);
+ Map<String, ProducerInfoMapping> nameToProducerInfoMap =
pims.getNameToProducerInfoMap();
+ nameToProducerInfoMap.put(pim.getId(), pim);
- if (!idUnchanged)
- {
- ProducerInfosMapping pims = getProducerInfosMapping(session);
- Map<String, ProducerInfoMapping> nameToProducerInfoMap =
pims.getNameToProducerInfoMap();
- nameToProducerInfoMap.put(pim.getId(), pim);
-
- pims.setLastModified(now);
- lastModified = now;
- }
-
- persister.closeSession(true);
+ pims.setLastModified(now);
+ lastModified = now;
}
+ persister.closeSession(true);
+
// if the consumer's id has changed, return the old one so that state can be
updated
return idUnchanged ? null : oldId;
}
@@ -204,111 +200,32 @@
{
ChromatticSession session = persister.getSession();
- Collection<WSRPConsumer> consumers =
getRefreshedInfoCache(session).getConsumers();
-
- // GTNWSRP-239
- // Kindof crappy place to put this, but we need to be able to retrieve the mixin
from the jcr so that it can be used to
- // configure the ProducerInfo
- /*Iterator<WSRPConsumer> consumersIterator = consumers.iterator();
- while (consumersIterator.hasNext())
+ try
{
- ProducerInfo pi = consumersIterator.next().getProducerInfo();
- String key = pi.getKey();
- ProducerInfoMapping pim = session.findById(ProducerInfoMapping.class, key);
- if (pim == null)
+ List<ProducerInfoMapping> pims =
getProducerInfosMapping(session).getProducerInfos();
+ List<ProducerInfo> infos = new
ArrayList<ProducerInfo>(pims.size());
+ for (ProducerInfoMapping pim : pims)
{
- throw new IllegalArgumentException("Couldn't find
ProducerInfoMapping associated with key " + key);
+ infos.add(pim.toModel(null, this));
}
- WSSEndpointEnabled wssee = getMixin(pim.getEndpointInfo(), session,
WSSEndpointEnabled.class);
- if (wssee != null)
- {
- pi.getEndpointConfigurationInfo().setWSSEnabled(wssee.getWSSEnabled());
- }
- }*/
- final Iterator<ProducerInfo> iterator = new
ProducerInfoIterator(consumers.iterator());
- persister.closeSession(false);
- return iterator;
- }
-
- /*private Map<String, ProducerInfo> getRefreshedInfoCache(ChromatticSession
session)
- {
- ProducerInfosMapping producerInfosMapping = getProducerInfosMapping(session);
-
- // check if we need to refresh the local cache
- if (lastModified < getMixin(producerInfosMapping, session,
LastModified.class).getLastModified())
- {
- List<ProducerInfoMapping> mappings =
producerInfosMapping.getProducerInfos();
-
-
- for (ProducerInfoMapping mapping : mappings)
- {
- infoCache.put(mapping.getId(), mapping.toModel(null, this));
- }
-
- lastModified = System.currentTimeMillis();
+ return infos.iterator();
}
-
- return infoCache;
- }*/
-
- private ConsumerCache getRefreshedInfoCache(ChromatticSession session)
- {
- ProducerInfosMapping producerInfosMapping = getProducerInfosMapping(session);
-
- // check if we need to refresh the local cache
- if (consumers.isInvalidated() || lastModified <
producerInfosMapping.getLastModified())
+ finally
{
- List<ProducerInfoMapping> mappings =
producerInfosMapping.getProducerInfos();
-
- for (ProducerInfoMapping pim : mappings)
- {
- String id = pim.getId();
- // only recreate the consumer if it's not in the cache or it's been
modified after we've been last modified
- if (consumers.getConsumer(id) == null || lastModified <
pim.getLastModified())
- {
- consumers.putConsumer(id, createConsumerFrom(pim.toModel(null, this)));
- }
- }
-
- lastModified = System.currentTimeMillis();
- consumers.setInvalidated(false);
+ persister.closeSession(false);
}
-
- return consumers;
}
public ProducerInfo loadProducerInfo(String id)
{
- ChromatticSession session = persister.getSession();
try
{
- ProducerInfoMapping pim = session.findByPath(ProducerInfoMapping.class,
getPathFor(id));
-
+ ChromatticSession session = persister.getSession();
+ ProducerInfoMapping pim = getProducerInfoMapping(id, session);
if (pim != null)
{
- WSRPConsumer consumer = getRefreshedInfoCache(session).getConsumer(id);
-
- if (consumer == null)
- {
- return null;
- }
- else
- {
- return consumer.getProducerInfo();
- /*ProducerInfo producerInfo = consumer.getProducerInfo();
- if(producerInfo == null || producerInfo.getLastModified() <
getMixin(pim, session, LastModified.class).getLastModified())
- {
- producerInfo = pim.toModel(producerInfo, this);
- getRefreshedInfoCache(session).put(id, producerInfo);
- return producerInfo;
- }
- else
- {
- return producerInfo;
- }*/
- }
-
+ return pim.toModel(null, this);
}
else
{
@@ -319,19 +236,13 @@
{
persister.closeSession(false);
}
+
}
- /*private <M extends BaseMixin> M getMixin(Object objectToCheck,
ChromatticSession session, Class<M> type)
+ private ProducerInfoMapping getProducerInfoMapping(String id, ChromatticSession
session)
{
- M mixin = session.getEmbedded(objectToCheck, type);
- if (mixin == null)
- {
- mixin = session.create(type);
- session.setEmbedded(objectToCheck, type, mixin);
- mixin.initializeValue();
- }
- return mixin;
- }*/
+ return session.findByPath(ProducerInfoMapping.class, getPathFor(id));
+ }
@Override
public boolean containsConsumer(String id)
@@ -417,6 +328,41 @@
}
}
+ @Override
+ protected ProducerInfo getUpdatedProducerInfoIfModifiedSinceOrNull(String id, long
lastModified)
+ {
+ try
+ {
+ ChromatticSession session = persister.getSession();
+ ProducerInfoMapping pim = getProducerInfoMapping(id, session);
+ if (lastModified < pim.getLastModified())
+ {
+ return pim.toModel(null, this);
+ }
+ else
+ {
+ return null;
+ }
+ }
+ finally
+ {
+ persister.closeSession(false);
+ }
+ }
+
+ @Override
+ protected boolean producerInfosGotModifiedSince(long lastModified)
+ {
+ try
+ {
+ return lastModified <
getProducerInfosMapping(persister.getSession()).getLastModified();
+ }
+ finally
+ {
+ persister.closeSession(false);
+ }
+ }
+
private ProducerInfosMapping getProducerInfosMapping(ChromatticSession session)
{
ProducerInfosMapping producerInfosMapping =
session.findByPath(ProducerInfosMapping.class, PRODUCER_INFOS_PATH);
@@ -513,31 +459,4 @@
{
return persister;
}
-
- private static class MappingToProducerInfoIterator implements
Iterator<ProducerInfo>
- {
- private Iterator<ProducerInfoMapping> mappings;
- private final JCRConsumerRegistry registry;
-
- public MappingToProducerInfoIterator(Iterator<ProducerInfoMapping>
infoMappingIterator, JCRConsumerRegistry jcrConsumerRegistry)
- {
- this.mappings = infoMappingIterator;
- this.registry = jcrConsumerRegistry;
- }
-
- public boolean hasNext()
- {
- return mappings.hasNext();
- }
-
- public ProducerInfo next()
- {
- return mappings.next().toModel(null, registry);
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException("Remove not supported!");
- }
- }
}