Author: manik.surtani(a)jboss.com
Date: 2009-02-10 06:58:28 -0500 (Tue, 10 Feb 2009)
New Revision: 7673
Added:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoaderOld.java
core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
Removed:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoader.java
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoader.java
Modified:
core/branches/flat/src/main/docbook/userguide/en/modules/basic_api.xml
core/branches/flat/src/main/java/org/horizon/config/CacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderConfig.java
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreDefaultConfig.java
core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/annotation/CacheEntryLoaded.java
core/branches/flat/src/main/resources/config-samples/all.xml
Log:
Loaders - work in progress
Modified: core/branches/flat/src/main/docbook/userguide/en/modules/basic_api.xml
===================================================================
--- core/branches/flat/src/main/docbook/userguide/en/modules/basic_api.xml 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/docbook/userguide/en/modules/basic_api.xml 2009-02-10
11:58:28 UTC (rev 7673)
@@ -664,7 +664,7 @@
</footnote>
</listitem>
<listitem>
- <literal>org.horizon.loader.ClusteredCacheLoader</literal>
+ <literal>org.horizon.loader.ClusteredCacheLoaderOld</literal>
- used as a "read-only" CacheLoader, where other nodes in the
cluster are queried for state.
</listitem>
</itemizedlist>
Modified: core/branches/flat/src/main/java/org/horizon/config/CacheLoaderConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/CacheLoaderConfig.java 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/config/CacheLoaderConfig.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -21,8 +21,8 @@
*/
package org.horizon.config;
-import org.horizon.loader.CacheLoader;
-import org.horizon.loader.SingletonStoreCacheLoader;
+import org.horizon.loader.CacheLoaderOld;
+import org.horizon.loader.SingletonStoreCacheLoaderOld;
import org.horizon.util.Util;
import java.util.ArrayList;
@@ -168,7 +168,7 @@
private boolean purgeOnStartup;
private SingletonStoreConfig singletonStoreConfig;
- private transient CacheLoader<Object, Object> cacheLoader;
+ private transient CacheLoaderOld<Object, Object> cacheLoader;
protected void populateFromBaseConfig(IndividualCacheLoaderConfig base) {
if (base != null) {
@@ -233,7 +233,7 @@
* @return cache loader, if one exists
* @since 1.0
*/
- public CacheLoader<Object, Object> getCacheLoader() {
+ public CacheLoaderOld<Object, Object> getCacheLoader() {
return cacheLoader;
}
@@ -244,7 +244,7 @@
* @param cacheLoader cacheLoader to set
* @since 1.0
*/
- public void setCacheLoader(CacheLoader<Object, Object> cacheLoader) {
+ public void setCacheLoader(CacheLoaderOld<Object, Object> cacheLoader) {
this.cacheLoader = cacheLoader;
}
@@ -320,7 +320,7 @@
public SingletonStoreConfig() {
// default value
- className = SingletonStoreCacheLoader.class.getName();
+ className = SingletonStoreCacheLoaderOld.class.getName();
}
public boolean isSingletonStoreEnabled() {
Modified:
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -22,8 +22,8 @@
package org.horizon.container;
import org.horizon.factories.annotations.Inject;
-import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheLoaderOld;
import java.util.AbstractSet;
import java.util.ArrayList;
@@ -48,7 +48,7 @@
final ConcurrentMap<K, ExpirableCachedValue<V>> expirableData = new
ConcurrentHashMap<K, ExpirableCachedValue<V>>();
private static final Object NULL = new Object();
private CacheLoaderManager clm;
- private CacheLoader cacheLoader;
+ private CacheLoaderOld cacheLoader;
@Inject
public void injectDependencies(CacheLoaderManager clm) {
Modified:
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/factories/InterceptorChainFactory.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -98,7 +98,7 @@
//Nothing...
}
- // TODO: Uncomment once the CacheLoader has been moved to Starobrno
+ // TODO: Uncomment once the CacheLoaderOld has been moved to Starobrno
if (configuration.isUsingCacheLoaders()) {
// if (configuration.getCacheLoaderConfig().isPassivation())
// {
@@ -111,7 +111,7 @@
}
interceptorChain.appendIntereceptor(createInterceptor(LockingInterceptor.class));
- // TODO: Uncomment once the CacheLoader has been moved to Starobrno
+ // TODO: Uncomment once the CacheLoaderOld has been moved to Starobrno
if (configuration.isUsingCacheLoaders()) {
// if (configuration.getCacheLoaderConfig().isPassivation())
// {
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheLoaderInterceptor.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -35,8 +35,8 @@
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
import org.horizon.interceptors.base.JmxStatsCommandInterceptor;
-import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheLoaderOld;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.transaction.TransactionTable;
@@ -55,7 +55,7 @@
private CacheLoaderManager clm;
protected TransactionTable txTable = null;
- protected CacheLoader<Object, Object> loader;
+ protected CacheLoaderOld<Object, Object> loader;
protected DataContainer<Object, Object> dataContainer;
protected CacheNotifier notifier;
protected EntryFactory entryFactory;
Modified:
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -39,8 +39,8 @@
import org.horizon.invocation.Options;
import org.horizon.jmx.annotations.ManagedAttribute;
import org.horizon.jmx.annotations.ManagedOperation;
-import org.horizon.loader.CacheLoader;
import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheLoaderOld;
import org.horizon.loader.Modification;
import org.horizon.loader.Modification.ModificationType;
import org.horizon.logging.LogFactory;
@@ -69,7 +69,7 @@
private HashMap<GlobalTransaction, Integer> txStores = new
HashMap<GlobalTransaction, Integer>();
private Map<GlobalTransaction, Set<Object>> preparingTxs = new
ConcurrentHashMap<GlobalTransaction, Set<Object>>();
private long cacheStores = 0;
- CacheLoader<Object, Object> loader;
+ CacheLoaderOld<Object, Object> loader;
private CacheLoaderManager loaderManager;
private boolean statsEnabled;
@@ -80,7 +80,7 @@
@Inject
protected void init(CacheLoaderManager loaderManager, TransactionManager txManager,
CacheLoaderConfig clConfig) {
- // never inject a CacheLoader at this stage - only a CacheLoaderManager, since the
CacheLoaderManager only creates a CacheLoader instance when it @Starts.
+ // never inject a CacheLoaderOld at this stage - only a CacheLoaderManager, since
the CacheLoaderManager only creates a CacheLoaderOld instance when it @Starts.
this.loaderManager = loaderManager;
this.loaderConfig = clConfig;
txMgr = txManager;
@@ -88,7 +88,7 @@
@Start
protected void start() {
- // this should only happen after the CacheLoaderManager has started, since the
CacheLoaderManager only creates the CacheLoader instance in its @Start method.
+ // this should only happen after the CacheLoaderManager has started, since the
CacheLoaderManager only creates the CacheLoaderOld instance in its @Start method.
loader = loaderManager.getCacheLoader();
this.setStatisticsEnabled(configuration.isExposeManagementStatistics());
}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,167 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.Cache;
-import org.horizon.CacheException;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.marshall.EntryData;
-import org.horizon.marshall.EntryDataExceptionMarker;
-import org.horizon.marshall.EntryDataMarker;
-import org.horizon.marshall.Marshaller;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * A convenience abstract implementation of a {@link CacheLoader}. Specific methods to
note are methods like {@link
- * #storeState(Fqn,java.io.ObjectInputStream)}, {@link
#loadState(Fqn,java.io.ObjectOutputStream)}, {@link
- * #storeEntireState(java.io.ObjectInputStream)} and {@link
#loadEntireState(java.io.ObjectOutputStream)} which have
- * convenience implementations here.
- * <p/>
- * Also useful to note is the implementation of {@link #put(java.util.List)}, used during
the prepare phase of a
- * transaction.
- * <p/>
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
- * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
- * @since 1.0
- */
-public abstract class AbstractCacheLoader<K, V> implements CacheLoader<K, V>
{
- protected Cache<K, V> cache;
- private static final Log log = LogFactory.getLog(AbstractCacheLoader.class);
- private static final boolean trace = log.isTraceEnabled();
- /**
- * HashMap<Object,List<Modification>>. List of open transactions. Note
that this is purely transient, as we don't use
- * a log, recovery is not available
- */
- protected Map<Object, List<Modification>> transactions = new
ConcurrentHashMap<Object, List<Modification>>();
-
- public void storeEntireState(ObjectInputStream in) {
- // store new state
- Object objectFromStream = null;
- // TODO fix me
-// try {
-// objectFromStream = cache.getMarshaller().objectFromObjectStream(in);
-// }
-// catch (Exception e) {
-// throw new CacheException(e.getMessage(), e);
-// }
-
- if (objectFromStream instanceof EntryDataMarker) {
- // no persistent state sent across; return?
- if (trace) log.trace("Empty persistent stream?");
- return;
- }
- if (objectFromStream instanceof EntryDataExceptionMarker) {
- EntryDataExceptionMarker ndem = (EntryDataExceptionMarker) objectFromStream;
- throw new CacheException("State provider cacheloader at node " +
ndem.getKey()
- + " threw exception during loadState (see Caused by)",
ndem.getCause());
- }
-
- List<EntryData<K, V>> data = (List<EntryData<K, V>>)
objectFromStream;
- for (EntryData<K, V> datem : data) {
- put(datem.getKey(), datem.getValue());
- }
- }
-
- public void loadEntireState(ObjectOutputStream os) {
- List<EntryData<K, V>> list = getAllEntries();
- if (trace) log.trace("Loading state of " + list.size() + " nodes
into stream");
- // TODO fix me
-// try {
-// cache.getMarshaller().objectToObjectStream(list, os);
-// }
-// catch (Exception e) {
-// throw new CacheException(e.getMessage(), e);
-// }
- }
-
-
- public void setCache(Cache<K, V> c) {
- this.cache = c;
- }
-
- public void put(List<Modification> modifications) {
- for (Modification m : modifications) {
- switch (m.getType()) {
- case PUT:
- put((K) m.getKey(), (V) m.getValue());
- break;
- case REMOVE:
- remove(m.getKey());
- break;
- case CLEAR:
- clear();
- break;
- default:
- throw new CacheException("Unknown modification " +
m.getType());
- }
- }
- }
-
- protected Marshaller getMarshaller() {
- return null;
- // todo fix me
-// return cache.getMarshaller();
- }
-
- // empty implementations for loaders that do not wish to implement lifecycle.
- public void create() {
- }
-
- public void start() {
- }
-
- public void stop() {
- }
-
- public void destroy() {
- }
-
- // Adds simple transactional capabilities to cache loaders that are inherently
non-transactional. If your cache loader implementation
- // is tansactional though, then override these.
-
- public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
- if (one_phase) {
- put(modifications);
- } else {
- transactions.put(tx, modifications);
- }
- }
-
- public void commit(Object tx) {
- List<Modification> modifications = transactions.remove(tx);
- if (modifications == null) {
- throw new CacheException("transaction " + tx + " not found in
transaction table");
- }
- put(modifications);
- }
-
- public void rollback(Object tx) {
- transactions.remove(tx);
- }
-}
Copied: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderOld.java
(from rev 7669,
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,167 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.Cache;
+import org.horizon.CacheException;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.EntryData;
+import org.horizon.marshall.EntryDataExceptionMarker;
+import org.horizon.marshall.EntryDataMarker;
+import org.horizon.marshall.Marshaller;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * A convenience abstract implementation of a {@link CacheLoaderOld}. Specific methods
to note are methods like {@link
+ * #storeState(Fqn,java.io.ObjectInputStream)}, {@link
#loadState(Fqn,java.io.ObjectOutputStream)}, {@link
+ * #storeEntireState(java.io.ObjectInputStream)} and {@link
#loadEntireState(java.io.ObjectOutputStream)} which have
+ * convenience implementations here.
+ * <p/>
+ * Also useful to note is the implementation of {@link #put(java.util.List)}, used during
the prepare phase of a
+ * transaction.
+ * <p/>
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ * @since 1.0
+ */
+public abstract class AbstractCacheLoaderOld<K, V> implements CacheLoaderOld<K,
V> {
+ protected Cache<K, V> cache;
+ private static final Log log = LogFactory.getLog(AbstractCacheLoaderOld.class);
+ private static final boolean trace = log.isTraceEnabled();
+ /**
+ * HashMap<Object,List<Modification>>. List of open transactions. Note
that this is purely transient, as we don't use
+ * a log, recovery is not available
+ */
+ protected Map<Object, List<Modification>> transactions = new
ConcurrentHashMap<Object, List<Modification>>();
+
+ public void storeEntireState(ObjectInputStream in) {
+ // store new state
+ Object objectFromStream = null;
+ // TODO fix me
+// try {
+// objectFromStream = cache.getMarshaller().objectFromObjectStream(in);
+// }
+// catch (Exception e) {
+// throw new CacheException(e.getMessage(), e);
+// }
+
+ if (objectFromStream instanceof EntryDataMarker) {
+ // no persistent state sent across; return?
+ if (trace) log.trace("Empty persistent stream?");
+ return;
+ }
+ if (objectFromStream instanceof EntryDataExceptionMarker) {
+ EntryDataExceptionMarker ndem = (EntryDataExceptionMarker) objectFromStream;
+ throw new CacheException("State provider cacheloader at node " +
ndem.getKey()
+ + " threw exception during loadState (see Caused by)",
ndem.getCause());
+ }
+
+ List<EntryData<K, V>> data = (List<EntryData<K, V>>)
objectFromStream;
+ for (EntryData<K, V> datem : data) {
+ put(datem.getKey(), datem.getValue());
+ }
+ }
+
+ public void loadEntireState(ObjectOutputStream os) {
+ List<EntryData<K, V>> list = getAllEntries();
+ if (trace) log.trace("Loading state of " + list.size() + " nodes
into stream");
+ // TODO fix me
+// try {
+// cache.getMarshaller().objectToObjectStream(list, os);
+// }
+// catch (Exception e) {
+// throw new CacheException(e.getMessage(), e);
+// }
+ }
+
+
+ public void setCache(Cache<K, V> c) {
+ this.cache = c;
+ }
+
+ public void put(List<Modification> modifications) {
+ for (Modification m : modifications) {
+ switch (m.getType()) {
+ case PUT:
+ put((K) m.getKey(), (V) m.getValue());
+ break;
+ case REMOVE:
+ remove(m.getKey());
+ break;
+ case CLEAR:
+ clear();
+ break;
+ default:
+ throw new CacheException("Unknown modification " +
m.getType());
+ }
+ }
+ }
+
+ protected Marshaller getMarshaller() {
+ return null;
+ // todo fix me
+// return cache.getMarshaller();
+ }
+
+ // empty implementations for loaders that do not wish to implement lifecycle.
+ public void create() {
+ }
+
+ public void start() {
+ }
+
+ public void stop() {
+ }
+
+ public void destroy() {
+ }
+
+ // Adds simple transactional capabilities to cache loaders that are inherently
non-transactional. If your cache loader implementation
+ // is tansactional though, then override these.
+
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
+ if (one_phase) {
+ put(modifications);
+ } else {
+ transactions.put(tx, modifications);
+ }
+ }
+
+ public void commit(Object tx) {
+ List<Modification> modifications = transactions.remove(tx);
+ if (modifications == null) {
+ throw new CacheException("transaction " + tx + " not found in
transaction table");
+ }
+ put(modifications);
+ }
+
+ public void rollback(Object tx) {
+ transactions.remove(tx);
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,133 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.Cache;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.marshall.EntryData;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-
-/**
- * AbstractDelegatingCacheLoader provides standard functionality for a cache loader that
simply delegates each operation
- * defined in the cache loader interface to the underlying cache loader, basically acting
as a proxy to the real cache
- * loader.
- * <p/>
- * Any cache loader implementation that extends this class would be required to override
any of the methods in order to
- * provide a different or added behaviour.
- *
- * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
- * @since 1.0
- */
-public abstract class AbstractDelegatingCacheLoader<K, V> extends
AbstractCacheLoader<K, V> {
- private CacheLoader<K, V> cacheLoader;
-
- public AbstractDelegatingCacheLoader(CacheLoader<K, V> cacheLoader) {
- this.cacheLoader = cacheLoader;
- }
-
- public void clear() {
- cacheLoader.clear();
- }
-
- public void commit(Object tx) {
- cacheLoader.commit(tx);
- }
-
- public void create() {
- cacheLoader.create();
- }
-
- public void destroy() {
- cacheLoader.destroy();
- }
-
- public boolean exists(Object key) {
- return cacheLoader.exists(key);
- }
-
- public V get(Object key) {
- return cacheLoader.get(key);
- }
-
- public List<EntryData<K, V>> getAllEntries() {
- return cacheLoader.getAllEntries();
- }
-
- public IndividualCacheLoaderConfig getConfig() {
- return cacheLoader.getConfig();
- }
-
- public void loadEntireState(ObjectOutputStream os) {
- cacheLoader.loadEntireState(os);
- }
-
- public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
- cacheLoader.prepare(tx, modifications, one_phase);
- }
-
- public V put(K key, V value) {
- return cacheLoader.put(key, value);
- }
-
- public void put(List<Modification> modifications) {
- cacheLoader.put(modifications);
- }
-
- public V remove(Object key) {
- return cacheLoader.remove(key);
- }
-
- public void rollback(Object tx) {
- cacheLoader.rollback(tx);
- }
-
- public void setCache(Cache<K, V> c) {
- cacheLoader.setCache(c);
- }
-
- public void setConfig(IndividualCacheLoaderConfig config) {
- cacheLoader.setConfig(config);
- }
-
- public void start() {
- cacheLoader.start();
- }
-
- public void stop() {
- cacheLoader.stop();
- }
-
- public void storeEntireState(ObjectInputStream is) {
- cacheLoader.storeEntireState(is);
- }
-
- public CacheLoader<K, V> getCacheLoader() {
- return cacheLoader;
- }
-
- public void setCacheLoader(CacheLoader<K, V> cacheLoader) {
- this.cacheLoader = cacheLoader;
- }
-}
Copied:
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoaderOld.java
(from rev 7669,
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoader.java)
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,133 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.marshall.EntryData;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+
+/**
+ * AbstractDelegatingCacheLoader provides standard functionality for a cache loader that
simply delegates each operation
+ * defined in the cache loader interface to the underlying cache loader, basically acting
as a proxy to the real cache
+ * loader.
+ * <p/>
+ * Any cache loader implementation that extends this class would be required to override
any of the methods in order to
+ * provide a different or added behaviour.
+ *
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ * @since 1.0
+ */
+public abstract class AbstractDelegatingCacheLoaderOld<K, V> extends
AbstractCacheLoaderOld<K, V> {
+ private CacheLoaderOld<K, V> cacheLoader;
+
+ public AbstractDelegatingCacheLoaderOld(CacheLoaderOld<K, V> cacheLoader) {
+ this.cacheLoader = cacheLoader;
+ }
+
+ public void clear() {
+ cacheLoader.clear();
+ }
+
+ public void commit(Object tx) {
+ cacheLoader.commit(tx);
+ }
+
+ public void create() {
+ cacheLoader.create();
+ }
+
+ public void destroy() {
+ cacheLoader.destroy();
+ }
+
+ public boolean exists(Object key) {
+ return cacheLoader.exists(key);
+ }
+
+ public V get(Object key) {
+ return cacheLoader.get(key);
+ }
+
+ public List<EntryData<K, V>> getAllEntries() {
+ return cacheLoader.getAllEntries();
+ }
+
+ public IndividualCacheLoaderConfig getConfig() {
+ return cacheLoader.getConfig();
+ }
+
+ public void loadEntireState(ObjectOutputStream os) {
+ cacheLoader.loadEntireState(os);
+ }
+
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
+ cacheLoader.prepare(tx, modifications, one_phase);
+ }
+
+ public V put(K key, V value) {
+ return cacheLoader.put(key, value);
+ }
+
+ public void put(List<Modification> modifications) {
+ cacheLoader.put(modifications);
+ }
+
+ public V remove(Object key) {
+ return cacheLoader.remove(key);
+ }
+
+ public void rollback(Object tx) {
+ cacheLoader.rollback(tx);
+ }
+
+ public void setCache(Cache<K, V> c) {
+ cacheLoader.setCache(c);
+ }
+
+ public void setConfig(IndividualCacheLoaderConfig config) {
+ cacheLoader.setConfig(config);
+ }
+
+ public void start() {
+ cacheLoader.start();
+ }
+
+ public void stop() {
+ cacheLoader.stop();
+ }
+
+ public void storeEntireState(ObjectInputStream is) {
+ cacheLoader.storeEntireState(is);
+ }
+
+ public CacheLoaderOld<K, V> getCacheLoader() {
+ return cacheLoader;
+ }
+
+ public void setCacheLoader(CacheLoaderOld<K, V> cacheLoader) {
+ this.cacheLoader = cacheLoader;
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/AbstractDelegatingCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,269 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.CacheException;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ArrayBlockingQueue;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * The AsyncCacheLoader is a delegating cache loader that extends
AbstractDelegatingCacheLoader overriding methods to
- * that should not just delegate the operation to the underlying cache loader.
- * <p/>
- * Read operations are done synchronously, while write (CRUD - Create, Remove, Update,
Delete) operations are done
- * asynchronously. There is no provision for exception handling at the moment for
problems encountered with the
- * underlying CacheLoader during a CRUD operation, and the exception is just logged.
- * <p/>
- * When configuring the CacheLoader, use the following attribute:
- * <p/>
- * <code> <attribute
name="CacheLoaderAsynchronous">true</attribute>
</code>
- * <p/>
- * to define whether cache loader operations are to be asynchronous. If not specified, a
cache loader operation is
- * assumed synchronous.
- * <p/>
- * <p/>
- * The following additional parameters are available: <dl>
<dt>cache.async.batchSize</dt> <dd>Number of modifications to
- * commit in one transaction, default is 100. The minimum batch size is 1.</dd>
<dt>cache.async.pollWait</dt> <dd>How
- * long to wait before processing an incomplete batch, in milliseconds. Default is 100.
Set this to 0 to not wait
- * before processing available records.</dd>
<dt>cache.async.returnOld</dt> <dd>If <code>true</code>,
this loader
- * returns the old values from {@link #put} and {@link #remove} methods. Otherwise,
these methods always return null.
- * Default is true. <code>false</code> improves the performance of these
operations.</dd>
- * <dt>cache.async.queueSize</dt> <dd>Maximum number of entries to
enqueue for asynchronous processing. Lowering this
- * size may help prevent out-of-memory conditions. It also may help to prevent less
records lost in the case of JVM
- * failure. Default is 10,000 operations.</dd>
<dt>cache.async.put</dt> <dd>If set to false, all {@link #put}
- * operations will be processed synchronously, and then only the {@link #remove}
operations will be processed
- * asynchronously. This mode may be useful for processing expiration of messages within a
separate thread and keeping
- * other operations synchronous for reliability. </dd>
<dt>cache.async.threadPoolSize</dt> <dd>The size of the async
- * processor thread pool. Defaults to <tt>1</tt>. This property is new in
JBoss Cache 3.0.</dd> </dl> For increased
- * performance for many smaller transactions, use higher values for
<code>cache.async.batchSize</code> and
- * <code>cache.async.pollWait</code>. For larger sized records, use a
smaller value for
- * <code>cache.async.queueSize</code>.
- *
- * @author Manik Surtani (manik.surtani(a)jboss.com)
- * @since 1.0
- */
-public class AsyncCacheLoader<K, V> extends AbstractDelegatingCacheLoader<K,
V> {
-
- private static final Log log = LogFactory.getLog(AsyncCacheLoader.class);
- private static final boolean trace = log.isTraceEnabled();
-
- private static AtomicInteger threadId = new AtomicInteger(0);
-
- /**
- * Default limit on entries to process asynchronously.
- */
- private static final int DEFAULT_QUEUE_SIZE = 10000;
-
- private AsyncCacheLoaderConfig config;
- private ExecutorService executor;
- private AtomicBoolean stopped = new AtomicBoolean(true);
- private BlockingQueue<Modification> queue = new
ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
- private List<Future> processorFutures;
-
- public AsyncCacheLoader() {
- super(null);
- }
-
- public AsyncCacheLoader(CacheLoader<K, V> cacheLoader) {
- super(cacheLoader);
- }
-
- @Override
- public void setConfig(IndividualCacheLoaderConfig base) {
- if (base instanceof AsyncCacheLoaderConfig) {
- config = (AsyncCacheLoaderConfig) base;
- } else {
- config = new AsyncCacheLoaderConfig(base);
- }
-
- if (config.getQueueSize() > 0) {
- queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
- }
-
- super.setConfig(base);
- }
-
- @Override
- public V put(K key, V value) {
- if (config.getUseAsyncPut()) {
- V oldValue = get(key);
- Modification mod = new Modification(Modification.ModificationType.PUT, key,
value);
- enqueue(mod);
- return oldValue;
- } else {
- return super.put(key, value);
- }
- }
-
- @Override
- public void put(List<Modification> modifications) {
- if (config.getUseAsyncPut()) {
- for (Modification modification : modifications) {
- enqueue(modification);
- }
- } else {
- super.put(modifications);
- }
- }
-
- @Override
- public V remove(Object key) {
- V oldValue = get(key);
- Modification mod = new Modification(Modification.ModificationType.REMOVE, key,
null);
- enqueue(mod);
- return oldValue;
- }
-
- @Override
- public void start() {
- if (log.isInfoEnabled()) log.info("Async cache loader starting: " +
this);
- stopped.set(false);
- super.start();
- executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new
ThreadFactory() {
- public Thread newThread(Runnable r) {
- Thread t = new Thread(r, "AsyncCacheLoader-" +
threadId.getAndIncrement());
- t.setDaemon(true);
- return t;
- }
- });
- processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
- for (int i = 0; i < config.getThreadPoolSize(); i++)
processorFutures.add(executor.submit(new AsyncProcessor()));
- }
-
- @Override
- public void stop() {
- stopped.set(true);
- if (executor != null) {
- for (Future f : processorFutures) f.cancel(true);
- executor.shutdown();
- try {
- boolean terminated = executor.isTerminated();
- while (!terminated) {
- terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
- }
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- executor = null;
- super.stop();
- }
-
- private void enqueue(final Modification mod) {
- if (stopped.get()) {
- throw new CacheException("AsyncCacheLoader stopped; no longer accepting
more entries.");
- }
- if (trace) log.trace("Enqueuing modification " + mod);
- try {
- queue.put(mod);
- }
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
-
- /**
- * Processes (by batch if possible) a queue of {@link Modification}s.
- *
- * @author manik surtani
- */
- private class AsyncProcessor implements Runnable {
- // Modifications to invoke as a single put
- private final List<Modification> mods = new
ArrayList<Modification>(config.getBatchSize());
-
- public void run() {
- while (!Thread.interrupted()) {
- try {
- run0();
- }
- catch (InterruptedException e) {
- break;
- }
- }
-
- try {
- if (trace) log.trace("process remaining batch " + mods.size());
- put(mods);
- if (trace) log.trace("process remaining queued " + queue.size());
- while (!queue.isEmpty()) {
- run0();
- }
- }
- catch (InterruptedException e) {
- log.trace("remaining interrupted");
- }
- }
-
- private void run0() throws InterruptedException {
- log.trace("Checking for modifications");
- int i = queue.drainTo(mods, config.getBatchSize());
- if (i == 0) {
- Modification m = queue.take();
- mods.add(m);
- }
-
- if (trace) {
- log.trace("Calling put(List) with " + mods.size() + "
modifications");
- }
- put(mods);
- mods.clear();
- }
-
- private void put(List<Modification> mods) {
- try {
- AsyncCacheLoader.super.put(mods);
- }
- catch (Exception e) {
- if (log.isWarnEnabled()) log.warn("Failed to process async
modifications: " + e);
- if (log.isDebugEnabled()) log.debug("Exception: ", e);
- }
- }
- }
-
- @Override
- public String toString() {
- return super.toString() +
- " delegate=[" + super.getCacheLoader() + "]" +
- " stopped=" + stopped +
- " batchSize=" + config.getBatchSize() +
- " returnOld=" + config.getReturnOld() +
- " asyncPut=" + config.getUseAsyncPut() +
- " threadPoolSize=" + config.getThreadPoolSize() +
- " queue.remainingCapacity()=" + queue.remainingCapacity() +
- " queue.peek()=" + queue.peek();
- }
-
-}
Modified: core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderConfig.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderConfig.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -41,16 +41,16 @@
* Default constructor.
*/
public AsyncCacheLoaderConfig() {
- setClassName(AsyncCacheLoader.class.getName());
+ setClassName(AsyncCacheLoaderOld.class.getName());
}
/**
- * For use by {@link AsyncCacheLoader}.
+ * For use by {@link AsyncCacheLoaderOld}.
*
* @param base generic config object created by XML parsing.
*/
AsyncCacheLoaderConfig(IndividualCacheLoaderConfig base) {
- setClassName(AsyncCacheLoader.class.getName());
+ setClassName(AsyncCacheLoaderOld.class.getName());
populateFromBaseConfig(base);
}
Copied: core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderOld.java (from
rev 7666, core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,269 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.CacheException;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
+/**
+ * The AsyncCacheLoader is a delegating cache loader that extends
AbstractDelegatingCacheLoader overriding methods to
+ * that should not just delegate the operation to the underlying cache loader.
+ * <p/>
+ * Read operations are done synchronously, while write (CRUD - Create, Remove, Update,
Delete) operations are done
+ * asynchronously. There is no provision for exception handling at the moment for
problems encountered with the
+ * underlying CacheLoader during a CRUD operation, and the exception is just logged.
+ * <p/>
+ * When configuring the CacheLoader, use the following attribute:
+ * <p/>
+ * <code> <attribute
name="CacheLoaderAsynchronous">true</attribute>
</code>
+ * <p/>
+ * to define whether cache loader operations are to be asynchronous. If not specified, a
cache loader operation is
+ * assumed synchronous.
+ * <p/>
+ * <p/>
+ * The following additional parameters are available: <dl>
<dt>cache.async.batchSize</dt> <dd>Number of modifications to
+ * commit in one transaction, default is 100. The minimum batch size is 1.</dd>
<dt>cache.async.pollWait</dt> <dd>How
+ * long to wait before processing an incomplete batch, in milliseconds. Default is 100.
Set this to 0 to not wait
+ * before processing available records.</dd>
<dt>cache.async.returnOld</dt> <dd>If <code>true</code>,
this loader
+ * returns the old values from {@link #put} and {@link #remove} methods. Otherwise,
these methods always return null.
+ * Default is true. <code>false</code> improves the performance of these
operations.</dd>
+ * <dt>cache.async.queueSize</dt> <dd>Maximum number of entries to
enqueue for asynchronous processing. Lowering this
+ * size may help prevent out-of-memory conditions. It also may help to prevent less
records lost in the case of JVM
+ * failure. Default is 10,000 operations.</dd>
<dt>cache.async.put</dt> <dd>If set to false, all {@link #put}
+ * operations will be processed synchronously, and then only the {@link #remove}
operations will be processed
+ * asynchronously. This mode may be useful for processing expiration of messages within a
separate thread and keeping
+ * other operations synchronous for reliability. </dd>
<dt>cache.async.threadPoolSize</dt> <dd>The size of the async
+ * processor thread pool. Defaults to <tt>1</tt>. This property is new in
JBoss Cache 3.0.</dd> </dl> For increased
+ * performance for many smaller transactions, use higher values for
<code>cache.async.batchSize</code> and
+ * <code>cache.async.pollWait</code>. For larger sized records, use a
smaller value for
+ * <code>cache.async.queueSize</code>.
+ *
+ * @author Manik Surtani (manik.surtani(a)jboss.com)
+ * @since 1.0
+ */
+public class AsyncCacheLoaderOld<K, V> extends
AbstractDelegatingCacheLoaderOld<K, V> {
+
+ private static final Log log = LogFactory.getLog(AsyncCacheLoaderOld.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ private static AtomicInteger threadId = new AtomicInteger(0);
+
+ /**
+ * Default limit on entries to process asynchronously.
+ */
+ private static final int DEFAULT_QUEUE_SIZE = 10000;
+
+ private AsyncCacheLoaderConfig config;
+ private ExecutorService executor;
+ private AtomicBoolean stopped = new AtomicBoolean(true);
+ private BlockingQueue<Modification> queue = new
ArrayBlockingQueue<Modification>(DEFAULT_QUEUE_SIZE);
+ private List<Future> processorFutures;
+
+ public AsyncCacheLoaderOld() {
+ super(null);
+ }
+
+ public AsyncCacheLoaderOld(CacheLoaderOld<K, V> cacheLoader) {
+ super(cacheLoader);
+ }
+
+ @Override
+ public void setConfig(IndividualCacheLoaderConfig base) {
+ if (base instanceof AsyncCacheLoaderConfig) {
+ config = (AsyncCacheLoaderConfig) base;
+ } else {
+ config = new AsyncCacheLoaderConfig(base);
+ }
+
+ if (config.getQueueSize() > 0) {
+ queue = new ArrayBlockingQueue<Modification>(config.getQueueSize());
+ }
+
+ super.setConfig(base);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ if (config.getUseAsyncPut()) {
+ V oldValue = get(key);
+ Modification mod = new Modification(Modification.ModificationType.PUT, key,
value);
+ enqueue(mod);
+ return oldValue;
+ } else {
+ return super.put(key, value);
+ }
+ }
+
+ @Override
+ public void put(List<Modification> modifications) {
+ if (config.getUseAsyncPut()) {
+ for (Modification modification : modifications) {
+ enqueue(modification);
+ }
+ } else {
+ super.put(modifications);
+ }
+ }
+
+ @Override
+ public V remove(Object key) {
+ V oldValue = get(key);
+ Modification mod = new Modification(Modification.ModificationType.REMOVE, key,
null);
+ enqueue(mod);
+ return oldValue;
+ }
+
+ @Override
+ public void start() {
+ if (log.isInfoEnabled()) log.info("Async cache loader starting: " +
this);
+ stopped.set(false);
+ super.start();
+ executor = Executors.newFixedThreadPool(config.getThreadPoolSize(), new
ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ Thread t = new Thread(r, "AsyncCacheLoaderOld-" +
threadId.getAndIncrement());
+ t.setDaemon(true);
+ return t;
+ }
+ });
+ processorFutures = new ArrayList<Future>(config.getThreadPoolSize());
+ for (int i = 0; i < config.getThreadPoolSize(); i++)
processorFutures.add(executor.submit(new AsyncProcessor()));
+ }
+
+ @Override
+ public void stop() {
+ stopped.set(true);
+ if (executor != null) {
+ for (Future f : processorFutures) f.cancel(true);
+ executor.shutdown();
+ try {
+ boolean terminated = executor.isTerminated();
+ while (!terminated) {
+ terminated = executor.awaitTermination(60, TimeUnit.SECONDS);
+ }
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ executor = null;
+ super.stop();
+ }
+
+ private void enqueue(final Modification mod) {
+ if (stopped.get()) {
+ throw new CacheException("AsyncCacheLoaderOld stopped; no longer accepting
more entries.");
+ }
+ if (trace) log.trace("Enqueuing modification " + mod);
+ try {
+ queue.put(mod);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * Processes (by batch if possible) a queue of {@link Modification}s.
+ *
+ * @author manik surtani
+ */
+ private class AsyncProcessor implements Runnable {
+ // Modifications to invoke as a single put
+ private final List<Modification> mods = new
ArrayList<Modification>(config.getBatchSize());
+
+ public void run() {
+ while (!Thread.interrupted()) {
+ try {
+ run0();
+ }
+ catch (InterruptedException e) {
+ break;
+ }
+ }
+
+ try {
+ if (trace) log.trace("process remaining batch " + mods.size());
+ put(mods);
+ if (trace) log.trace("process remaining queued " + queue.size());
+ while (!queue.isEmpty()) {
+ run0();
+ }
+ }
+ catch (InterruptedException e) {
+ log.trace("remaining interrupted");
+ }
+ }
+
+ private void run0() throws InterruptedException {
+ log.trace("Checking for modifications");
+ int i = queue.drainTo(mods, config.getBatchSize());
+ if (i == 0) {
+ Modification m = queue.take();
+ mods.add(m);
+ }
+
+ if (trace) {
+ log.trace("Calling put(List) with " + mods.size() + "
modifications");
+ }
+ put(mods);
+ mods.clear();
+ }
+
+ private void put(List<Modification> mods) {
+ try {
+ AsyncCacheLoaderOld.super.put(mods);
+ }
+ catch (Exception e) {
+ if (log.isWarnEnabled()) log.warn("Failed to process async
modifications: " + e);
+ if (log.isDebugEnabled()) log.debug("Exception: ", e);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return super.toString() +
+ " delegate=[" + super.getCacheLoader() + "]" +
+ " stopped=" + stopped +
+ " batchSize=" + config.getBatchSize() +
+ " returnOld=" + config.getReturnOld() +
+ " asyncPut=" + config.getUseAsyncPut() +
+ " threadPoolSize=" + config.getThreadPoolSize() +
+ " queue.remainingCapacity()=" + queue.remainingCapacity() +
+ " queue.peek()=" + queue.peek();
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/AsyncCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,236 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import net.jcip.annotations.ThreadSafe;
-import org.horizon.Cache;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.factories.scopes.Scope;
-import org.horizon.factories.scopes.Scopes;
-import org.horizon.marshall.EntryData;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.List;
-import java.util.Map;
-
-/**
- * A {@link CacheLoader} implementation persists and load keys to and from secondary
storage, such as a database or
- * filesystem. Typically, implementations store a series of keys and values (an entire
{@link Map}) under a single
- * {@link Fqn}. Loading and saving properties of an entire {@link Map} should be
atomic.
- * <p/>
- * Lifecycle: First an instance of the loader is created, then the configuration ({@link
- * #setConfig(CacheLoaderConfig.IndividualCacheLoaderConfig)}) and cache ({@link
#setCache(CacheSPI)}) are set. After
- * this, {@link #create()} is called. Then {@link #start()} is called. When re-deployed,
{@link #stop()} will be called,
- * followed by another {@link #start()}. Finally, when shut down, {@link #destroy()} is
called, after which the loader
- * is unusable.
- * <p/>
- * An {@link AbstractCacheLoader} is provided as a convenient starting place when
implementing your own {@link
- * CacheLoader}.
- * <p/>
- * It is important to note that all implementations are thread safe, as concurrent reads
and writes, potentially even to
- * the same {@link Fqn}, are possible.
- * <p/>
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
- * @since 1.0
- */
-@ThreadSafe
-(a)Scope(Scopes.NAMED_CACHE)
-public interface CacheLoader<K, V> {
- /**
- * Sets the configuration. This is called before {@link #create()} and {@link
#start()}.
- *
- * @param config May be an instance of the {@link
CacheLoaderConfig.IndividualCacheLoaderConfig} base class, in which
- * case the cache loader should use the {@link
CacheLoaderConfig.IndividualCacheLoaderConfig#getProperties()}
- * method to find configuration information. Alternatively, may be a
type-specific subclass of {@link
- * CacheLoaderConfig.IndividualCacheLoaderConfig}, if there is one.
- */
- void setConfig(IndividualCacheLoaderConfig config);
-
- /**
- * Gets the configuration.
- *
- * @return the configuration, represented by a {@link
CacheLoaderConfig.IndividualCacheLoaderConfig} object.
- */
- IndividualCacheLoaderConfig getConfig();
-
- /**
- * Sets the {@link Cache} that is maintaining this CacheLoader. This method allows
this CacheLoader to set a
- * reference to the {@link Cache}. This method is called be called after the
CacheLoader instance has been
- * constructed.
- *
- * @param c The cache on which this loader works
- */
- void setCache(Cache<K, V> c);
-
- /**
- * Returns all keys and values from the persistent store, given a {@link Fqn}
- *
- * @param name the {@link Fqn} to search for.
- * @return Map<Object,Object> keys and values for the given node. Returns null
if the node is not found. If the node
- * is found but has no attributes, this method returns an empty Map.
- */
- V get(Object key);
-
-
- /**
- * Returns true if the CacheLoader has a node with a {@link Fqn}.
- *
- * @return true if node exists, false otherwise
- */
- boolean exists(Object key);
-
- /**
- * Puts a key and value into the attribute map of a given node. If the node does not
exist, all parent nodes from
- * the root down are created automatically. Returns the old value.
- */
- V put(K key, V value);
-
- /**
- * Removes everything from this cache-loader
- */
- void clear();
-
- /**
- * Removes the given node and all its subnodes, does nothing if the node does not
exist.
- *
- * @param fqn the {@link Fqn} of the node
- */
- V remove(Object key);
-
- /**
- * Retrieves all entries stored in this cache loader
- *
- * @return All entries
- * @throws Exception on error
- */
- List<EntryData<K, V>> getAllEntries();
-
-
- /**
- * Applies all modifications to the backend store. Changes may be applied in a single
operation.
- *
- * @param modifications A List<Modification> of modifications
- */
- void put(List<Modification> modifications);
-
- /**
- * Prepares a list of modifications. For example, for a DB-based CacheLoader:
<ol> <li>Create a local (JDBC)
- * transaction <li>Associate the local transaction with
<code>tx</code> (tx is the key) <li>Execute the corresponding
- * SQL statements against the DB (statements derived from modifications) </ol>
For non-transactional CacheLoader
- * (e.g. file-based), the implementation could attempt to implement its own
transactional logic, attempting to write
- * data to a temp location (or memory) and writing it to the proper location upon
commit.
- *
- * @param tx The transaction, indended to be used by implementations as an
identifier of the transaction
- * (and not necessarily a JTA {@link
javax.transaction.Transaction} object)
- * @param modifications A {@link List} containing {@link
org.horizon.loader.Modification}s, for the given
- * transaction
- * @param one_phase Persist immediately and (for example) commit the local JDBC
transaction as well. When true,
- * we won't get a {@link #commit(Object)} or {@link
#rollback(Object)} method call later
- * @throws Exception
- */
- void prepare(Object tx, List<Modification> modifications, boolean one_phase);
-
- /**
- * Commits the transaction. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
- * <code>tx</code> and commit that transaction. Non-transactional
CacheLoaders could simply write the data that was
- * previously saved transiently under the given <code>tx</code> key, to
(for example) a file system.
- * <p/>
- * <b>Note</b> this only holds if the previous prepare() did not define
<pre>one_phase=true</pre>
- *
- * @param tx transaction to commit
- */
- void commit(Object tx);
-
- /**
- * Rolls the transaction back. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
- * <code>tx</code> and roll back that transaction.
- *
- * @param tx transaction to roll back
- */
- void rollback(Object tx);
-
- /**
- * Fetches the entire state for this cache from secondary storage (disk, database) and
writes it to a provided
- * ObjectOutputStream. State written to the provided ObjectOutputStream parameter is
used for initialization of a new
- * CacheImpl instance. When the state gets transferred to the new cache instance its
cacheloader calls {@link
- * #storeEntireState(ObjectInputStream)}
- * <p/>
- * Implementations of this method should not catch any exception or close the given
ObjectOutputStream parameter. In
- * order to ensure cacheloader interoperability contents of the cache are written to
the ObjectOutputStream as a
- * sequence of NodeData objects.
- * <p/>
- * Default implementation is provided by {@link AbstractCacheLoader} and ensures
cacheloader interoperability.
- * Implementors are encouraged to consider extending AbstractCacheLoader prior to
implementing completely custom
- * cacheloader.
- *
- * @param os ObjectOutputStream to write state
- * @see AbstractCacheLoader#loadEntireState(ObjectOutputStream)
- * @see org.horizon.marshall.NodeData
- */
- void loadEntireState(ObjectOutputStream os);
-
- /**
- * Stores the entire state for this cache by reading it from a provided
ObjectInputStream. The state was provided to
- * this cache by calling {@link #loadEntireState(ObjectOutputStream)}} on some other
cache instance. State currently
- * in storage gets overwritten.
- * <p/>
- * Implementations of this method should not catch any exception or close the given
ObjectInputStream parameter. In
- * order to ensure cacheloader interoperability contents of the cache are read from
the ObjectInputStream as a
- * sequence of NodeData objects.
- * <p/>
- * Default implementation is provided by {@link AbstractCacheLoader} and ensures
cacheloader interoperability.
- * Implementors are encouraged to consider extending AbstractCacheLoader prior to
implementing completely custom
- * cacheloader.
- *
- * @param is ObjectInputStream to read state
- * @see AbstractCacheLoader#storeEntireState(ObjectInputStream)
- * @see org.horizon.marshall.NodeData
- */
- void storeEntireState(ObjectInputStream is);
-
- /**
- * Lifecycle method, called when the cache loader is created.
- *
- * @throws java.lang.Exception
- */
- void create();
-
- /**
- * Lifecycle method, called when the cache loader is started.
- *
- * @throws java.lang.Exception
- */
- void start();
-
- /**
- * Lifecycle method, called when the cache loader is stopped.
- */
- void stop();
-
- /**
- * Lifecycle method, called when the cache loader is destroyed.
- */
- void destroy();
-
-}
Added: core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,54 @@
+package org.horizon.loader;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderConfig;
+import org.horizon.lifecycle.Lifecycle;
+import org.horizon.marshall.Marshaller;
+
+import javax.transaction.TransactionManager;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Responsible for loading cache data from an external source
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface CacheLoader<K, V> extends Lifecycle {
+
+ /**
+ * Used to initialize a cache loader. Typically invoked by the {@link
org.horizon.loader.CacheLoaderManager} when
+ * setting up cache loaders.
+ *
+ * @param config the cache loader configuration bean
+ * @param cache cache associated with this cache loader. Implementations may use this
to determine cache name when
+ * selecting where refer to state in storage, for example, a different
database table name.
+ * @param tm transaction manager associated with the cache. Note that it is the
cache loader's responsibility to
+ * be aware of and participate in transactions accordingly.
+ * @param m marshaller to use when loading state from a stream, if supported by
the implementation.
+ */
+ void init(CacheLoaderConfig.IndividualCacheLoaderConfig config, Cache cache,
TransactionManager tm, Marshaller m);
+
+ /**
+ * Loads an entry mapped to by a given key. Should return null if the entry does not
exist.
+ *
+ * @param key key
+ * @return an entry
+ */
+ StoredEntry<K, V> load(K key);
+
+ /**
+ * Loads a set of entries based on a collection of keys. Should return an empty set
if none of the keys exist.
+ *
+ * @param keys keys
+ * @return a set of entries
+ */
+ Set<StoredEntry<K, V>> loadAll(Collection<? extends K> keys);
+
+ /**
+ * @param key key to test
+ * @return true if the key exists, false otherwise
+ */
+ boolean containsKey(K key);
+}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -58,7 +58,7 @@
private static final Log log = LogFactory.getLog(CacheLoaderManager.class);
private CacheLoaderConfig config;
private Cache<Object, Object> cache;
- private CacheLoader<Object, Object> loader;
+ private CacheLoaderOld<Object, Object> loader;
private boolean fetchPersistentState;
private Configuration configuration;
private ComponentRegistry registry;
@@ -111,8 +111,8 @@
* @throws InstantiationException
* @throws ClassNotFoundException
*/
- private CacheLoader<Object, Object> createCacheLoader() throws Exception {
- CacheLoader<Object, Object> tmpLoader;
+ private CacheLoaderOld<Object, Object> createCacheLoader() throws Exception {
+ CacheLoaderOld<Object, Object> tmpLoader;
// if we only have a single cache loader configured in the chaining cacheloader
then
// don't use a chaining cache loader at all.
@@ -122,7 +122,7 @@
// also if we are using passivation then just directly use the first cache loader.
if (config.useChainingCacheLoader()) {
// create chaining cache loader.
- ChainingCacheLoader<Object, Object> ccl = new
ChainingCacheLoader<Object, Object>();
+ ChainingCacheLoaderOld<Object, Object> ccl = new
ChainingCacheLoaderOld<Object, Object>();
tmpLoader = ccl;
Iterator<IndividualCacheLoaderConfig> it =
config.getIndividualCacheLoaderConfigs().iterator();
@@ -140,7 +140,7 @@
assertNotSingletonAndShared(cfg);
- CacheLoader<Object, Object> l = createCacheLoader(cfg, cache);
+ CacheLoaderOld<Object, Object> l = createCacheLoader(cfg, cache);
cfg = l.getConfig();
finalConfigs.add(cfg);
// Only loaders that deal w/ state transfer factor into
@@ -177,21 +177,21 @@
* @return a cache loader
* @throws Exception
*/
- private CacheLoader<Object, Object>
createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig cfg, Cache<Object,
Object> cache) throws Exception {
+ private CacheLoaderOld<Object, Object>
createCacheLoader(CacheLoaderConfig.IndividualCacheLoaderConfig cfg, Cache<Object,
Object> cache) throws Exception {
// create loader
- CacheLoader<Object, Object> tmpLoader = cfg.getCacheLoader() == null ?
createInstance(cfg.getClassName()) : cfg.getCacheLoader();
+ CacheLoaderOld<Object, Object> tmpLoader = cfg.getCacheLoader() == null ?
createInstance(cfg.getClassName()) : cfg.getCacheLoader();
if (tmpLoader != null) {
// async?
if (cfg.isAsync()) {
- CacheLoader<Object, Object> asyncDecorator;
- asyncDecorator = new AsyncCacheLoader<Object, Object>(tmpLoader);
+ CacheLoaderOld<Object, Object> asyncDecorator;
+ asyncDecorator = new AsyncCacheLoaderOld<Object, Object>(tmpLoader);
tmpLoader = asyncDecorator;
}
if (cfg.isIgnoreModifications()) {
- AbstractDelegatingCacheLoader<Object, Object> readOnlyDecorator;
- readOnlyDecorator = new ReadOnlyDelegatingCacheLoader<Object,
Object>(tmpLoader);
+ AbstractDelegatingCacheLoaderOld<Object, Object> readOnlyDecorator;
+ readOnlyDecorator = new ReadOnlyDelegatingCacheLoaderOld<Object,
Object>(tmpLoader);
tmpLoader = readOnlyDecorator;
}
@@ -200,17 +200,17 @@
if (ssc != null && ssc.isSingletonStoreEnabled()) {
Object decorator = createInstance(ssc.getSingletonStoreClass());
- /* class providing singleton store functionality must extend
AbstractDelegatingCacheLoader so that
+ /* class providing singleton store functionality must extend
AbstractDelegatingCacheLoaderOld so that
* underlying cacheloader can be set. */
- if (decorator instanceof AbstractDelegatingCacheLoader) {
+ if (decorator instanceof AbstractDelegatingCacheLoaderOld) {
@SuppressWarnings("unchecked")
- AbstractDelegatingCacheLoader<Object, Object> singletonDecorator =
(AbstractDelegatingCacheLoader<Object, Object>) decorator;
+ AbstractDelegatingCacheLoaderOld<Object, Object> singletonDecorator
= (AbstractDelegatingCacheLoaderOld<Object, Object>) decorator;
/* set the cache loader to where calls will be delegated by the class
providing the singleton
* store functionality. */
singletonDecorator.setCacheLoader(tmpLoader);
tmpLoader = singletonDecorator;
} else {
- throw new Exception("Invalid cache loader configuration!! Singleton
store implementation class must extend
org.horizon.loader.AbstractDelegatingCacheLoader");
+ throw new Exception("Invalid cache loader configuration!! Singleton
store implementation class must extend
org.horizon.loader.AbstractDelegatingCacheLoaderOld");
}
}
@@ -233,15 +233,15 @@
* @param c instance of cache to be set in cache loader
* @param loader cache loader to which assign the cache instance
*/
- protected void setCacheInLoader(Cache<Object, Object> c, CacheLoader<Object,
Object> loader) {
+ protected void setCacheInLoader(Cache<Object, Object> c,
CacheLoaderOld<Object, Object> loader) {
loader.setCache(c);
}
@SuppressWarnings("unchecked")
- private CacheLoader<Object, Object> createInstance(String className) throws
ClassNotFoundException, IllegalAccessException, InstantiationException {
+ private CacheLoaderOld<Object, Object> createInstance(String className) throws
ClassNotFoundException, IllegalAccessException, InstantiationException {
if (log.isTraceEnabled()) log.trace("instantiating class " + className);
Class<?> cl =
Thread.currentThread().getContextClassLoader().loadClass(className);
- return (CacheLoader<Object, Object>) cl.newInstance();
+ return (CacheLoaderOld<Object, Object>) cl.newInstance();
}
/**
@@ -277,7 +277,7 @@
/**
* Returns the cache loader
*/
- public CacheLoader<Object, Object> getCacheLoader() {
+ public CacheLoaderOld<Object, Object> getCacheLoader() {
return loader;
}
@@ -340,8 +340,8 @@
}
public void purgeLoaders(boolean force) throws Exception {
- if ((loader instanceof ChainingCacheLoader) && !force) {
- ((ChainingCacheLoader<?, ?>) loader).purgeIfNecessary();
+ if ((loader instanceof ChainingCacheLoaderOld) && !force) {
+ ((ChainingCacheLoaderOld<?, ?>) loader).purgeIfNecessary();
} else {
CacheLoaderConfig.IndividualCacheLoaderConfig first =
getCacheLoaderConfig().getFirstCacheLoaderConfig();
if (force ||
Copied: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderOld.java (from rev
7669, core/branches/flat/src/main/java/org/horizon/loader/CacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderOld.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,236 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
+import org.horizon.marshall.EntryData;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * A {@link CacheLoaderOld} implementation persists and load keys to and from secondary
storage, such as a database or
+ * filesystem. Typically, implementations store a series of keys and values (an entire
{@link Map}) under a single
+ * {@link Fqn}. Loading and saving properties of an entire {@link Map} should be
atomic.
+ * <p/>
+ * Lifecycle: First an instance of the loader is created, then the configuration ({@link
+ * #setConfig(CacheLoaderConfig.IndividualCacheLoaderConfig)}) and cache ({@link
#setCache(CacheSPI)}) are set. After
+ * this, {@link #create()} is called. Then {@link #start()} is called. When re-deployed,
{@link #stop()} will be called,
+ * followed by another {@link #start()}. Finally, when shut down, {@link #destroy()} is
called, after which the loader
+ * is unusable.
+ * <p/>
+ * An {@link AbstractCacheLoaderOld} is provided as a convenient starting place when
implementing your own {@link
+ * CacheLoaderOld}.
+ * <p/>
+ * It is important to note that all implementations are thread safe, as concurrent reads
and writes, potentially even to
+ * the same {@link Fqn}, are possible.
+ * <p/>
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ * @since 1.0
+ */
+@ThreadSafe
+(a)Scope(Scopes.NAMED_CACHE)
+public interface CacheLoaderOld<K, V> {
+ /**
+ * Sets the configuration. This is called before {@link #create()} and {@link
#start()}.
+ *
+ * @param config May be an instance of the {@link
CacheLoaderConfig.IndividualCacheLoaderConfig} base class, in which
+ * case the cache loader should use the {@link
CacheLoaderConfig.IndividualCacheLoaderConfig#getProperties()}
+ * method to find configuration information. Alternatively, may be a
type-specific subclass of {@link
+ * CacheLoaderConfig.IndividualCacheLoaderConfig}, if there is one.
+ */
+ void setConfig(IndividualCacheLoaderConfig config);
+
+ /**
+ * Gets the configuration.
+ *
+ * @return the configuration, represented by a {@link
CacheLoaderConfig.IndividualCacheLoaderConfig} object.
+ */
+ IndividualCacheLoaderConfig getConfig();
+
+ /**
+ * Sets the {@link Cache} that is maintaining this CacheLoader. This method allows
this CacheLoader to set a
+ * reference to the {@link Cache}. This method is called be called after the
CacheLoader instance has been
+ * constructed.
+ *
+ * @param c The cache on which this loader works
+ */
+ void setCache(Cache<K, V> c);
+
+ /**
+ * Returns all keys and values from the persistent store, given a {@link Fqn}
+ *
+ * @param name the {@link Fqn} to search for.
+ * @return Map<Object,Object> keys and values for the given node. Returns null
if the node is not found. If the node
+ * is found but has no attributes, this method returns an empty Map.
+ */
+ V get(Object key);
+
+
+ /**
+ * Returns true if the CacheLoader has a node with a {@link Fqn}.
+ *
+ * @return true if node exists, false otherwise
+ */
+ boolean exists(Object key);
+
+ /**
+ * Puts a key and value into the attribute map of a given node. If the node does not
exist, all parent nodes from
+ * the root down are created automatically. Returns the old value.
+ */
+ V put(K key, V value);
+
+ /**
+ * Removes everything from this cache-loader
+ */
+ void clear();
+
+ /**
+ * Removes the given node and all its subnodes, does nothing if the node does not
exist.
+ *
+ * @param fqn the {@link Fqn} of the node
+ */
+ V remove(Object key);
+
+ /**
+ * Retrieves all entries stored in this cache loader
+ *
+ * @return All entries
+ * @throws Exception on error
+ */
+ List<EntryData<K, V>> getAllEntries();
+
+
+ /**
+ * Applies all modifications to the backend store. Changes may be applied in a single
operation.
+ *
+ * @param modifications A List<Modification> of modifications
+ */
+ void put(List<Modification> modifications);
+
+ /**
+ * Prepares a list of modifications. For example, for a DB-based CacheLoader:
<ol> <li>Create a local (JDBC)
+ * transaction <li>Associate the local transaction with
<code>tx</code> (tx is the key) <li>Execute the corresponding
+ * SQL statements against the DB (statements derived from modifications) </ol>
For non-transactional CacheLoader
+ * (e.g. file-based), the implementation could attempt to implement its own
transactional logic, attempting to write
+ * data to a temp location (or memory) and writing it to the proper location upon
commit.
+ *
+ * @param tx The transaction, indended to be used by implementations as an
identifier of the transaction
+ * (and not necessarily a JTA {@link
javax.transaction.Transaction} object)
+ * @param modifications A {@link List} containing {@link
org.horizon.loader.Modification}s, for the given
+ * transaction
+ * @param one_phase Persist immediately and (for example) commit the local JDBC
transaction as well. When true,
+ * we won't get a {@link #commit(Object)} or {@link
#rollback(Object)} method call later
+ * @throws Exception
+ */
+ void prepare(Object tx, List<Modification> modifications, boolean one_phase);
+
+ /**
+ * Commits the transaction. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
+ * <code>tx</code> and commit that transaction. Non-transactional
CacheLoaders could simply write the data that was
+ * previously saved transiently under the given <code>tx</code> key, to
(for example) a file system.
+ * <p/>
+ * <b>Note</b> this only holds if the previous prepare() did not define
<pre>one_phase=true</pre>
+ *
+ * @param tx transaction to commit
+ */
+ void commit(Object tx);
+
+ /**
+ * Rolls the transaction back. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
+ * <code>tx</code> and roll back that transaction.
+ *
+ * @param tx transaction to roll back
+ */
+ void rollback(Object tx);
+
+ /**
+ * Fetches the entire state for this cache from secondary storage (disk, database) and
writes it to a provided
+ * ObjectOutputStream. State written to the provided ObjectOutputStream parameter is
used for initialization of a new
+ * CacheImpl instance. When the state gets transferred to the new cache instance its
cacheloader calls {@link
+ * #storeEntireState(ObjectInputStream)}
+ * <p/>
+ * Implementations of this method should not catch any exception or close the given
ObjectOutputStream parameter. In
+ * order to ensure cacheloader interoperability contents of the cache are written to
the ObjectOutputStream as a
+ * sequence of NodeData objects.
+ * <p/>
+ * Default implementation is provided by {@link AbstractCacheLoaderOld} and ensures
cacheloader interoperability.
+ * Implementors are encouraged to consider extending AbstractCacheLoader prior to
implementing completely custom
+ * cacheloader.
+ *
+ * @param os ObjectOutputStream to write state
+ * @see AbstractCacheLoaderOld#loadEntireState(ObjectOutputStream)
+ * @see org.horizon.marshall.NodeData
+ */
+ void loadEntireState(ObjectOutputStream os);
+
+ /**
+ * Stores the entire state for this cache by reading it from a provided
ObjectInputStream. The state was provided to
+ * this cache by calling {@link #loadEntireState(ObjectOutputStream)}} on some other
cache instance. State currently
+ * in storage gets overwritten.
+ * <p/>
+ * Implementations of this method should not catch any exception or close the given
ObjectInputStream parameter. In
+ * order to ensure cacheloader interoperability contents of the cache are read from
the ObjectInputStream as a
+ * sequence of NodeData objects.
+ * <p/>
+ * Default implementation is provided by {@link AbstractCacheLoaderOld} and ensures
cacheloader interoperability.
+ * Implementors are encouraged to consider extending AbstractCacheLoader prior to
implementing completely custom
+ * cacheloader.
+ *
+ * @param is ObjectInputStream to read state
+ * @see AbstractCacheLoaderOld#storeEntireState(ObjectInputStream)
+ * @see org.horizon.marshall.NodeData
+ */
+ void storeEntireState(ObjectInputStream is);
+
+ /**
+ * Lifecycle method, called when the cache loader is created.
+ *
+ * @throws java.lang.Exception
+ */
+ void create();
+
+ /**
+ * Lifecycle method, called when the cache loader is started.
+ *
+ * @throws java.lang.Exception
+ */
+ void start();
+
+ /**
+ * Lifecycle method, called when the cache loader is stopped.
+ */
+ void stop();
+
+ /**
+ * Lifecycle method, called when the cache loader is destroyed.
+ */
+ void destroy();
+
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderOld.java
___________________________________________________________________
Name: svn:executable
+ *
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,76 @@
+package org.horizon.loader;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.Collection;
+
+/**
+ * A specialization of the {@link CacheLoader} interface that can be written to.
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public interface CacheStore<K, V> extends CacheLoader<K, V> {
+
+ /**
+ * Stores an entry
+ *
+ * @param ed entry to store
+ */
+ void store(StoredEntry<K, V> ed);
+
+ /**
+ * Stores a collection of entries
+ *
+ * @param ed entries to store
+ */
+ void storeAll(Collection<StoredEntry<K, V>> ed);
+
+ /**
+ * Writes contents of the stream to the store. Implementations should expect that the
stream contains data in an
+ * implementation-specific format, typically generated using {@link
#load(java.io.OutputStream)}. While not a
+ * requirement, it is recommended that implementations make use of the {@link
org.horizon.marshall.Marshaller} when
+ * dealing with the stream to make use of efficient marshalling.
+ *
+ * @param inputStream stream to read from
+ */
+ void store(InputStream inputStream);
+
+ /**
+ * Loads the entire state into a stream, using whichever format is most efficient for
the cache loader
+ * implementation. Typically read and parsed by {@link #store(java.io.InputStream)}.
+ * <p/>
+ * While not a requirement, it is recommended that implementations make use of the
{@link
+ * org.horizon.marshall.Marshaller} when dealing with the stream to make use of
efficient marshalling.
+ *
+ * @param outputStream stream to write to
+ * @throws java.io.IOException in the event of problems writing to the stream
+ */
+ void load(OutputStream outputStream) throws IOException;
+
+ /**
+ * Clears all entries in the store
+ */
+ void clear();
+
+ /**
+ * Removes an entry in the store.
+ *
+ * @param key key to remove
+ * @return true if the entry was removed; false if the entry wasn't found.
+ */
+ boolean remove(K key);
+
+ /**
+ * Removes a collection of entries from the store
+ *
+ * @param keys keys to remove
+ */
+ void removeAll(Collection<? extends K> keys);
+
+ /**
+ * Purges expired entries from the store.
+ */
+ void purgeExpired();
+}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,338 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.config.CacheLoaderConfig;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.factories.ComponentRegistry;
-import org.horizon.factories.annotations.Inject;
-import org.horizon.marshall.EntryData;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-
-/**
- * This decorator is used whenever more than one cache loader is configured. READ
operations are directed to each of
- * the cache loaders (in the order which they were configured) until a non-null (or
non-empty in the case of retrieving
- * collection objects) result is achieved.
- * <p/>
- * WRITE operations are propagated to ALL registered cacheloaders that specified set
ignoreModifications to false.
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
- * @since 1.0
- */
-public class ChainingCacheLoader<K, V> extends AbstractCacheLoader<K, V> {
-
- private final List<CacheLoader<K, V>> cacheLoaders = new
ArrayList<CacheLoader<K, V>>(2);
- private final List<CacheLoader<K, V>> writeCacheLoaders = new
ArrayList<CacheLoader<K, V>>(2);
- private final List<CacheLoaderConfig.IndividualCacheLoaderConfig>
cacheLoaderConfigs = new
ArrayList<CacheLoaderConfig.IndividualCacheLoaderConfig>(2);
- private ComponentRegistry registry;
-
- /**
- * Sets the configuration. Will be called before {@link #create()} and {@link
#start()}
- *
- * @param config ignored
- */
- public void setConfig(IndividualCacheLoaderConfig config) {
- // don't do much here?
- }
-
- public IndividualCacheLoaderConfig getConfig() {
- return null;
- }
-
- @Inject
- public void injectDependencies(ComponentRegistry registry) {
- this.registry = registry;
- }
-
-
- public V get(Object key) {
- V answer = null;
- for (CacheLoader<K, V> l : cacheLoaders) {
- answer = l.get(key);
- if (answer != null) break;
- }
- return answer;
- }
-
- /**
- * Checks whether the CacheLoader has a node with Fqn
- *
- * @param name
- * @return True if node exists, false otherwise
- */
- public boolean exists(Object key) {
- boolean answer = false;
- for (CacheLoader<K, V> l : cacheLoaders) {
- answer = l.exists(key);
- if (answer) break;
- }
- return answer;
- }
-
- /**
- * Inserts key and value into the attributes hashmap of the given node. If the node
does not exist, all parent nodes
- * from the root down are created automatically. Returns the old value
- */
- public V put(K key, V value) {
- V answer = null;
- boolean isFirst = true;
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- V tAnswer = l.put(key, value);
- if (isFirst) {
- answer = tAnswer;
- isFirst = false;
- }
-
- }
- return answer;
- }
-
- /**
- * Inserts all modifications to the backend store. Overwrite whatever is already in
the datastore.
- *
- * @param modifications A List<Modification> of modifications
- * @throws Exception
- */
- @Override
- public void put(List<Modification> modifications) {
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- l.put(modifications);
- }
- }
-
- /**
- * Removes the given key and value. No-op if key doesn't exist. Returns the first
response from the loader chain.
- */
- public V remove(Object key) {
- V answer = null;
- boolean isFirst = true;
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- V tAnswer = l.remove(key);
- if (isFirst) {
- answer = tAnswer;
- isFirst = false;
- }
- }
- return answer;
- }
-
- /**
- * Prepare the modifications. For example, for a DB-based CacheLoader: <ol>
<li>Create a local (JDBC) transaction
- * <li>Associate the local transaction with <code>tx</code> (tx is
the key) <li>Execute the coresponding SQL
- * statements against the DB (statements derived from modifications) </ol> For
non-transactional CacheLoader (e.g.
- * file-based), this could be a null operation
- *
- * @param tx The transaction, just used as a hashmap key
- * @param modifications List<Modification>, a list of all modifications within
the given transaction
- * @param one_phase Persist immediately and (for example) commit the local JDBC
transaction as well. When true,
- * we won't get a {@link #commit(Object)} or {@link
#rollback(Object)} method call later
- * @throws Exception
- */
- @Override
- public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- l.prepare(tx, modifications, one_phase);
- }
- }
-
- /**
- * Commit the transaction. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
- * <code>tx</code> and commit that transaction<br/>
Non-transactional CacheLoaders could simply write the data that
- * was previously saved transiently under the given <code>tx</code> key,
to (for example) a file system (note this
- * only holds if the previous prepare() did not define one_phase=true
- *
- * @param tx
- */
- @Override
- public void commit(Object tx) {
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- l.commit(tx);
- }
- }
-
- /**
- * Roll the transaction back. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
- * <code>tx</code> and roll back that transaction
- *
- * @param tx
- */
- @Override
- public void rollback(Object tx) {
- for (CacheLoader<K, V> l : writeCacheLoaders) {
- l.rollback(tx);
- }
- }
-
-
- /**
- * Creates individual cache loaders.
- *
- * @throws Exception
- */
- @Override
- public void create() {
- Iterator<CacheLoader<K, V>> it = cacheLoaders.iterator();
- Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgIt =
cacheLoaderConfigs.iterator();
- while (it.hasNext() && cfgIt.hasNext()) {
- CacheLoader<K, V> cl = it.next();
- CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgIt.next();
- cl.setConfig(cfg);
- registry.wireDependencies(cl);
- cl.create();
- }
- }
-
- @Override
- public void start() {
- for (CacheLoader<K, V> cacheLoader : cacheLoaders) {
- cacheLoader.start();
- }
- }
-
- @Override
- public void stop() {
- for (CacheLoader<K, V> cacheLoader : cacheLoaders) {
- cacheLoader.stop();
- }
- }
-
- @Override
- public void destroy() {
- for (CacheLoader<K, V> cacheLoader : cacheLoaders) {
- cacheLoader.destroy();
- }
- }
-
- @Override
- public void loadEntireState(ObjectOutputStream os) {
- Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
- Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
- while (i.hasNext() && cfgs.hasNext()) {
- CacheLoader<K, V> l = i.next();
- CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
- if (cfg.isFetchPersistentState()) {
- l.loadEntireState(os);
- break;
- }
- }
- }
-
- @Override
- public void storeEntireState(ObjectInputStream is) {
- Iterator<CacheLoader<K, V>> i = writeCacheLoaders.iterator();
- Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
- while (i.hasNext()) {
- CacheLoader<K, V> l = i.next();
- CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
- if (cfg.isFetchPersistentState()) {
- l.storeEntireState(is);
- break;
- }
- }
-
- }
-
- /**
- * Returns the number of cache loaders in the chain.
- */
- public int getSize() {
- return cacheLoaders.size();
- }
-
- /**
- * Returns a List<CacheLoader> of individual cache loaders configured.
- */
- public List<CacheLoader<K, V>> getCacheLoaders() {
- return Collections.unmodifiableList(cacheLoaders);
- }
-
- /**
- * Adds a cache loader to the chain (always added at the end of the chain)
- *
- * @param l the cache loader to add
- * @param cfg and its configuration
- */
- public void addCacheLoader(CacheLoader<K, V> l,
CacheLoaderConfig.IndividualCacheLoaderConfig cfg) {
- synchronized (this) {
- cacheLoaderConfigs.add(cfg);
- cacheLoaders.add(l);
-
- if (!cfg.isIgnoreModifications()) {
- writeCacheLoaders.add(l);
- }
- }
- }
-
- @Override
- public String toString() {
- StringBuilder buf = new StringBuilder("ChainingCacheLoader{");
- Iterator<CacheLoader<K, V>> i = cacheLoaders.iterator();
- Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> c =
cacheLoaderConfigs.iterator();
- int count = 0;
- while (i.hasNext() && c.hasNext()) {
- CacheLoader<K, V> loader = i.next();
- CacheLoaderConfig.IndividualCacheLoaderConfig cfg = c.next();
-
- buf.append(++count);
- buf.append(": IgnoreMods? ");
- buf.append(cfg.isIgnoreModifications());
- buf.append(" CLoader: ");
- buf.append(loader);
- buf.append("; ");
- }
- buf.append("}");
- return buf.toString();
- }
-
- public void purgeIfNecessary() throws Exception {
- Iterator<CacheLoader<K, V>> loaders = cacheLoaders.iterator();
- Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> configs =
cacheLoaderConfigs.iterator();
-
- while (loaders.hasNext() && configs.hasNext()) {
- CacheLoader<K, V> myLoader = loaders.next();
- CacheLoaderConfig.IndividualCacheLoaderConfig myConfig = configs.next();
-
- if (!myConfig.isIgnoreModifications() && myConfig.isPurgeOnStartup())
myLoader.clear();
- }
- }
-
- public void clear() {
- for (CacheLoader<K, V> l : writeCacheLoaders)
- l.clear();
- }
-
- public List<EntryData<K, V>> getAllEntries() {
- ArrayList<EntryData<K, V>> full = new ArrayList<EntryData<K,
V>>();
-
- for (CacheLoader<K, V> l : writeCacheLoaders)
- full.addAll(l.getAllEntries());
-
- return full;
- }
-
-}
Copied: core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoaderOld.java
(from rev 7666,
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,338 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.config.CacheLoaderConfig;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.factories.ComponentRegistry;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.marshall.EntryData;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+
+/**
+ * This decorator is used whenever more than one cache loader is configured. READ
operations are directed to each of
+ * the cache loaders (in the order which they were configured) until a non-null (or
non-empty in the case of retrieving
+ * collection objects) result is achieved.
+ * <p/>
+ * WRITE operations are propagated to ALL registered cacheloaders that specified set
ignoreModifications to false.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a>
+ * @since 1.0
+ */
+public class ChainingCacheLoaderOld<K, V> extends AbstractCacheLoaderOld<K,
V> {
+
+ private final List<CacheLoaderOld<K, V>> cacheLoaders = new
ArrayList<CacheLoaderOld<K, V>>(2);
+ private final List<CacheLoaderOld<K, V>> writeCacheLoaders = new
ArrayList<CacheLoaderOld<K, V>>(2);
+ private final List<CacheLoaderConfig.IndividualCacheLoaderConfig>
cacheLoaderConfigs = new
ArrayList<CacheLoaderConfig.IndividualCacheLoaderConfig>(2);
+ private ComponentRegistry registry;
+
+ /**
+ * Sets the configuration. Will be called before {@link #create()} and {@link
#start()}
+ *
+ * @param config ignored
+ */
+ public void setConfig(IndividualCacheLoaderConfig config) {
+ // don't do much here?
+ }
+
+ public IndividualCacheLoaderConfig getConfig() {
+ return null;
+ }
+
+ @Inject
+ public void injectDependencies(ComponentRegistry registry) {
+ this.registry = registry;
+ }
+
+
+ public V get(Object key) {
+ V answer = null;
+ for (CacheLoaderOld<K, V> l : cacheLoaders) {
+ answer = l.get(key);
+ if (answer != null) break;
+ }
+ return answer;
+ }
+
+ /**
+ * Checks whether the CacheLoader has a node with Fqn
+ *
+ * @param name
+ * @return True if node exists, false otherwise
+ */
+ public boolean exists(Object key) {
+ boolean answer = false;
+ for (CacheLoaderOld<K, V> l : cacheLoaders) {
+ answer = l.exists(key);
+ if (answer) break;
+ }
+ return answer;
+ }
+
+ /**
+ * Inserts key and value into the attributes hashmap of the given node. If the node
does not exist, all parent nodes
+ * from the root down are created automatically. Returns the old value
+ */
+ public V put(K key, V value) {
+ V answer = null;
+ boolean isFirst = true;
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ V tAnswer = l.put(key, value);
+ if (isFirst) {
+ answer = tAnswer;
+ isFirst = false;
+ }
+
+ }
+ return answer;
+ }
+
+ /**
+ * Inserts all modifications to the backend store. Overwrite whatever is already in
the datastore.
+ *
+ * @param modifications A List<Modification> of modifications
+ * @throws Exception
+ */
+ @Override
+ public void put(List<Modification> modifications) {
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ l.put(modifications);
+ }
+ }
+
+ /**
+ * Removes the given key and value. No-op if key doesn't exist. Returns the first
response from the loader chain.
+ */
+ public V remove(Object key) {
+ V answer = null;
+ boolean isFirst = true;
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ V tAnswer = l.remove(key);
+ if (isFirst) {
+ answer = tAnswer;
+ isFirst = false;
+ }
+ }
+ return answer;
+ }
+
+ /**
+ * Prepare the modifications. For example, for a DB-based CacheLoader: <ol>
<li>Create a local (JDBC) transaction
+ * <li>Associate the local transaction with <code>tx</code> (tx is
the key) <li>Execute the coresponding SQL
+ * statements against the DB (statements derived from modifications) </ol> For
non-transactional CacheLoader (e.g.
+ * file-based), this could be a null operation
+ *
+ * @param tx The transaction, just used as a hashmap key
+ * @param modifications List<Modification>, a list of all modifications within
the given transaction
+ * @param one_phase Persist immediately and (for example) commit the local JDBC
transaction as well. When true,
+ * we won't get a {@link #commit(Object)} or {@link
#rollback(Object)} method call later
+ * @throws Exception
+ */
+ @Override
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ l.prepare(tx, modifications, one_phase);
+ }
+ }
+
+ /**
+ * Commit the transaction. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
+ * <code>tx</code> and commit that transaction<br/>
Non-transactional CacheLoaders could simply write the data that
+ * was previously saved transiently under the given <code>tx</code> key,
to (for example) a file system (note this
+ * only holds if the previous prepare() did not define one_phase=true
+ *
+ * @param tx
+ */
+ @Override
+ public void commit(Object tx) {
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ l.commit(tx);
+ }
+ }
+
+ /**
+ * Roll the transaction back. A DB-based CacheLoader would look up the local JDBC
transaction asociated with
+ * <code>tx</code> and roll back that transaction
+ *
+ * @param tx
+ */
+ @Override
+ public void rollback(Object tx) {
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders) {
+ l.rollback(tx);
+ }
+ }
+
+
+ /**
+ * Creates individual cache loaders.
+ *
+ * @throws Exception
+ */
+ @Override
+ public void create() {
+ Iterator<CacheLoaderOld<K, V>> it = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgIt =
cacheLoaderConfigs.iterator();
+ while (it.hasNext() && cfgIt.hasNext()) {
+ CacheLoaderOld<K, V> cl = it.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgIt.next();
+ cl.setConfig(cfg);
+ registry.wireDependencies(cl);
+ cl.create();
+ }
+ }
+
+ @Override
+ public void start() {
+ for (CacheLoaderOld<K, V> cacheLoader : cacheLoaders) {
+ cacheLoader.start();
+ }
+ }
+
+ @Override
+ public void stop() {
+ for (CacheLoaderOld<K, V> cacheLoader : cacheLoaders) {
+ cacheLoader.stop();
+ }
+ }
+
+ @Override
+ public void destroy() {
+ for (CacheLoaderOld<K, V> cacheLoader : cacheLoaders) {
+ cacheLoader.destroy();
+ }
+ }
+
+ @Override
+ public void loadEntireState(ObjectOutputStream os) {
+ Iterator<CacheLoaderOld<K, V>> i = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
+ while (i.hasNext() && cfgs.hasNext()) {
+ CacheLoaderOld<K, V> l = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+ if (cfg.isFetchPersistentState()) {
+ l.loadEntireState(os);
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void storeEntireState(ObjectInputStream is) {
+ Iterator<CacheLoaderOld<K, V>> i = writeCacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> cfgs =
cacheLoaderConfigs.iterator();
+ while (i.hasNext()) {
+ CacheLoaderOld<K, V> l = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = cfgs.next();
+ if (cfg.isFetchPersistentState()) {
+ l.storeEntireState(is);
+ break;
+ }
+ }
+
+ }
+
+ /**
+ * Returns the number of cache loaders in the chain.
+ */
+ public int getSize() {
+ return cacheLoaders.size();
+ }
+
+ /**
+ * Returns a List<CacheLoader> of individual cache loaders configured.
+ */
+ public List<CacheLoaderOld<K, V>> getCacheLoaders() {
+ return Collections.unmodifiableList(cacheLoaders);
+ }
+
+ /**
+ * Adds a cache loader to the chain (always added at the end of the chain)
+ *
+ * @param l the cache loader to add
+ * @param cfg and its configuration
+ */
+ public void addCacheLoader(CacheLoaderOld<K, V> l,
CacheLoaderConfig.IndividualCacheLoaderConfig cfg) {
+ synchronized (this) {
+ cacheLoaderConfigs.add(cfg);
+ cacheLoaders.add(l);
+
+ if (!cfg.isIgnoreModifications()) {
+ writeCacheLoaders.add(l);
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder("ChainingCacheLoaderOld{");
+ Iterator<CacheLoaderOld<K, V>> i = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> c =
cacheLoaderConfigs.iterator();
+ int count = 0;
+ while (i.hasNext() && c.hasNext()) {
+ CacheLoaderOld<K, V> loader = i.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig cfg = c.next();
+
+ buf.append(++count);
+ buf.append(": IgnoreMods? ");
+ buf.append(cfg.isIgnoreModifications());
+ buf.append(" CLoader: ");
+ buf.append(loader);
+ buf.append("; ");
+ }
+ buf.append("}");
+ return buf.toString();
+ }
+
+ public void purgeIfNecessary() throws Exception {
+ Iterator<CacheLoaderOld<K, V>> loaders = cacheLoaders.iterator();
+ Iterator<CacheLoaderConfig.IndividualCacheLoaderConfig> configs =
cacheLoaderConfigs.iterator();
+
+ while (loaders.hasNext() && configs.hasNext()) {
+ CacheLoaderOld<K, V> myLoader = loaders.next();
+ CacheLoaderConfig.IndividualCacheLoaderConfig myConfig = configs.next();
+
+ if (!myConfig.isIgnoreModifications() && myConfig.isPurgeOnStartup())
myLoader.clear();
+ }
+ }
+
+ public void clear() {
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders)
+ l.clear();
+ }
+
+ public List<EntryData<K, V>> getAllEntries() {
+ ArrayList<EntryData<K, V>> full = new ArrayList<EntryData<K,
V>>();
+
+ for (CacheLoaderOld<K, V> l : writeCacheLoaders)
+ full.addAll(l.getAllEntries());
+
+ return full;
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/ChainingCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,379 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import net.jcip.annotations.ThreadSafe;
-import org.horizon.ComponentStatus;
-import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.DataCommand;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.factories.annotations.Inject;
-import org.horizon.lock.StripedLock;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.remoting.ResponseFilter;
-import org.horizon.remoting.transport.Address;
-import org.horizon.tree.Fqn;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-/**
- * A cache loader that consults other members in the cluster for values. Does not
propagate update methods since
- * replication should take care of this. A <code>timeout</code> property is
required, a <code>long</code> that
- * specifies in milliseconds how long to wait for results before returning a null.
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a> // TODO implement me!!
- * @since 1.0
- */
-@ThreadSafe
-public class ClusteredCacheLoader extends AbstractCacheLoader {
- private static final Log log = LogFactory.getLog(ClusteredCacheLoader.class);
- private static final boolean trace = log.isTraceEnabled();
- private StripedLock lock = new StripedLock();
- private ClusteredCacheLoaderConfig config;
- private CommandsFactory commandsFactory;
-
- /**
- * A test to check whether the cache is in its started state. If not, calls should
not be made as the channel may
- * not have properly started, blocks due to state transfers may be in progress, etc.
- *
- * @return true if the cache is in its STARTED state.
- */
- protected boolean isCacheReady() {
- return cache.getCacheStatus() == ComponentStatus.STARTED;
- }
-
- @Inject
- public void setCommandsFactory(CommandsFactory commandsFactory) {
- this.commandsFactory = commandsFactory;
- }
-
- /**
- * Sets the configuration. A property <code>timeout</code> is used as the
timeout value.
- */
- public void setConfig(IndividualCacheLoaderConfig base) {
-// if (base instanceof ClusteredCacheLoaderConfig)
-// {
-// this.config = (ClusteredCacheLoaderConfig) base;
-// }
-// else
-// {
-// config = new ClusteredCacheLoaderConfig(base);
-// }
- }
-
- public IndividualCacheLoaderConfig getConfig() {
- return config;
- }
-
- public Object get(Object key) {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public boolean exists(Object key) {
- return false; // TODO: Manik: Customise this generated block
- }
-
- public Object put(Object key, Object value) {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public void clear() {
- // TODO: Manik: Customise this generated block
- }
-
- public Object remove(Object key) {
- return null; // TODO: Manik: Customise this generated block
- }
-
- public List getAllEntries() {
- return null; // TODO: Manik: Customise this generated block
- }
-
- private boolean isOriginLocal() {
- return
cache.getAdvancedCache().getInvocationContextContainer().get().isOriginLocal();
- }
-
- public Set getChildrenNames(Fqn fqn) throws Exception {
- if (!isCacheReady() || !isOriginLocal()) return Collections.emptySet();
- lock.acquireLock(fqn, true);
-// try
-// {
-// GetChildrenNamesCommand command =
commandsFactory.buildGetChildrenNamesCommand(fqn);
-// Object resp = callRemote(command);
-// return (Set) resp;
-// }
-// finally
-// {
-// lock.releaseLock(fqn);
-// }
- throw new RuntimeException("Implement me");
- }
-
- private Object callRemote(DataCommand dataCommand) throws Exception {
- if (trace) log.trace("cache=" + cache.getCacheManager().getAddress() +
"; calling with " + dataCommand);
-// ClusteredGetCommand clusteredGet =
commandsFactory.buildClusteredGetCommand(false, dataCommand);
- List resps;
- // JBCACHE-1186
-// resps = cache.getRPCManager().callRemoteMethods(null, clusteredGet,
GroupRequest.GET_ALL, config.getTimeout(), new ResponseValidityFilter(cache.getMembers(),
cache.getLocalAddress()), false);
-
-// if (resps == null)
-// {
-// if (log.isInfoEnabled())
-// log.info("No replies to call " + dataCommand + ". Perhaps
we're alone in the cluster?");
-// throw new ReplicationException("No replies to call " + dataCommand +
". Perhaps we're alone in the cluster?");
-// }
-// else
-// {
-// // test for and remove exceptions
-// Iterator i = resps.iterator();
-// Object result = null;
-// while (i.hasNext())
-// {
-// Object o = i.next();
-// if (o instanceof Exception)
-// {
-// if (log.isDebugEnabled())
-// log.debug("Found remote exception among responses - removing
from responses list", (Exception) o);
-// }
-// else if (o != null)
-// {
-// // keep looping till we find a FOUND answer.
-// List<Boolean> clusteredGetResp = (List<Boolean>) o;
-// // found?
-// if (clusteredGetResp.get(0))
-// {
-// result = clusteredGetResp.get(1);
-// break;
-// }
-// }
-// else if (!cache.getConfiguration().isUseRegionBasedMarshalling())
-// {
-// throw new IllegalStateException("Received unexpected null response
to " + clusteredGet);
-// }
-// // else region was inactive on peer;
-// // keep looping to see if anyone else responded
-// }
-//
-// if (trace) log.trace("got responses " + resps);
-// return result;
-// }
- throw new RuntimeException("Implement me");
- }
-
- public Map get(Fqn name) throws Exception {
- return get0(name);
- }
-
- protected Map get0(Fqn name) throws Exception {
- // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
- if (!isCacheReady() || !isOriginLocal()) return Collections.emptyMap();
- lock.acquireLock(name, true);
-// try
-// {
-// GetDataMapCommand command = commandsFactory.buildGetDataMapCommand(name);
-// Object resp = callRemote(command);
-// return (Map) resp;
-// }
-// finally
-// {
-// lock.releaseLock(name);
-// }
- throw new RuntimeException("Implement me");
- }
-
- public boolean exists(Fqn name) throws Exception {
- // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
- if (!isCacheReady() || !isOriginLocal()) return false;
-
- lock.acquireLock(name, false);
-// try
-// {
-// ExistsCommand command = commandsFactory.buildExistsNodeCommand(name);
-// Object resp = callRemote(command);
-// return resp != null && (Boolean) resp;
-// }
-// finally
-// {
-// lock.releaseLock(name);
-// }
- throw new RuntimeException("Implement me");
- }
-
- public Object put(Fqn name, Object key, Object value) throws Exception {
- // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
- if (!isCacheReady() || !isOriginLocal()) return null;
- lock.acquireLock(name, true);
- try {
-// NodeSPI n = cache.peek(name, false);
-// if (n == null)
-// {
-// GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name,
key, true);
-// return callRemote(command);
-// }
-// else
-// {
-// // dont bother with a remote call
-// return n.getDirect(key);
-// }
- }
- finally {
- lock.releaseLock(name);
- }
- throw new RuntimeException("Implement me");
- }
-
- /**
- * Does nothing; replication handles put.
- */
- public void put(Fqn name, Map attributes) throws Exception {
- }
-
- /**
- * Does nothing; replication handles put.
- */
-// @Override
-// public void put(List<Modification> modifications) throws Exception
-// {
-// }
-
- /**
- * Fetches the remove value, does not remove. Replication handles removal.
- */
- public Object remove(Fqn name, Object key) throws Exception {
- // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
- if (!isCacheReady() || !isOriginLocal()) return false;
- lock.acquireLock(name, true);
- try {
-// NodeSPI n = cache.peek(name, true);
-// if (n == null)
-// {
-// GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name,
key, true);
-// return callRemote(command);
-// }
-// else
-// {
-// // dont bother with a remote call
-// return n.getDirect(key);
-// }
- }
- finally {
- lock.releaseLock(name);
- }
- throw new RuntimeException("Implement me");
- }
-
- /**
- * Does nothing; replication handles removal.
- */
- public void remove(Fqn name) throws Exception {
- // do nothing
- }
-
- /**
- * Does nothing; replication handles removal.
- */
- public void removeData(Fqn name) throws Exception {
- }
-
- /**
- * Does nothing.
- */
-// @Override
-// public void prepare(Object tx, List modifications, boolean one_phase) throws
Exception
-// {
-// }
-
- /**
- * Does nothing.
- */
-// @Override
-// public void commit(Object tx) throws Exception
-// {
-// }
-
- /**
- * Does nothing.
- */
- @Override
- public void rollback(Object tx) {
- }
-
-// @Override
-// public void loadEntireState(ObjectOutputStream os) throws Exception
-// {
-// //intentional no-op
-// }
-
- // @Override
-
- public void loadState(Fqn subtree, ObjectOutputStream os) throws Exception {
- // intentional no-op
- }
-
-// @Override
-// public void storeEntireState(ObjectInputStream is) throws Exception
-// {
-// // intentional no-op
-// }
-
- // @Override
-
- public void storeState(Fqn subtree, ObjectInputStream is) throws Exception {
- // intentional no-op
- }
-
- public static class ResponseValidityFilter implements ResponseFilter {
- private int numValidResponses = 0;
- private List<Address> pendingResponders;
-
- public ResponseValidityFilter(List<Address> expected, Address localAddress)
{
- this.pendingResponders = new ArrayList<Address>(expected);
- // We'll never get a response from ourself
- this.pendingResponders.remove(localAddress);
- }
-
- public boolean isAcceptable(Object object, Address address) {
- pendingResponders.remove(address);
-
- if (object instanceof List) {
- List response = (List) object;
- Boolean foundResult = (Boolean) response.get(0);
- if (foundResult) numValidResponses++;
- }
- // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
- return true;
- }
-
- public boolean needMoreResponses() {
- return numValidResponses < 1 && pendingResponders.size() > 0;
- }
-
- }
-
-}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderConfig.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderConfig.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -37,16 +37,16 @@
private long timeout = 10000;
public ClusteredCacheLoaderConfig() {
- setClassName(ClusteredCacheLoader.class.getName());
+ setClassName(ClusteredCacheLoaderOld.class.getName());
}
/**
- * For use by {@link org.horizon.loader.ClusteredCacheLoader}.
+ * For use by {@link ClusteredCacheLoaderOld}.
*
* @param base generic config object created by XML parsing.
*/
ClusteredCacheLoaderConfig(IndividualCacheLoaderConfig base) {
- setClassName(ClusteredCacheLoader.class.getName());
+ setClassName(ClusteredCacheLoaderOld.class.getName());
populateFromBaseConfig(base);
}
Copied: core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderOld.java
(from rev 7669,
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,379 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.commands.CommandsFactory;
+import org.horizon.commands.DataCommand;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.lifecycle.ComponentStatus;
+import org.horizon.lock.StripedLock;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.ResponseFilter;
+import org.horizon.remoting.transport.Address;
+import org.horizon.tree.Fqn;
+
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * A cache loader that consults other members in the cluster for values. Does not
propagate update methods since
+ * replication should take care of this. A <code>timeout</code> property is
required, a <code>long</code> that
+ * specifies in milliseconds how long to wait for results before returning a null.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani
(manik(a)jboss.org)</a> // TODO implement me!!
+ * @since 1.0
+ */
+@ThreadSafe
+public class ClusteredCacheLoaderOld extends AbstractCacheLoaderOld {
+ private static final Log log = LogFactory.getLog(ClusteredCacheLoaderOld.class);
+ private static final boolean trace = log.isTraceEnabled();
+ private StripedLock lock = new StripedLock();
+ private ClusteredCacheLoaderConfig config;
+ private CommandsFactory commandsFactory;
+
+ /**
+ * A test to check whether the cache is in its started state. If not, calls should
not be made as the channel may
+ * not have properly started, blocks due to state transfers may be in progress, etc.
+ *
+ * @return true if the cache is in its STARTED state.
+ */
+ protected boolean isCacheReady() {
+ return cache.getStatus() == ComponentStatus.RUNNING;
+ }
+
+ @Inject
+ public void setCommandsFactory(CommandsFactory commandsFactory) {
+ this.commandsFactory = commandsFactory;
+ }
+
+ /**
+ * Sets the configuration. A property <code>timeout</code> is used as the
timeout value.
+ */
+ public void setConfig(IndividualCacheLoaderConfig base) {
+// if (base instanceof ClusteredCacheLoaderConfig)
+// {
+// this.config = (ClusteredCacheLoaderConfig) base;
+// }
+// else
+// {
+// config = new ClusteredCacheLoaderConfig(base);
+// }
+ }
+
+ public IndividualCacheLoaderConfig getConfig() {
+ return config;
+ }
+
+ public Object get(Object key) {
+ return null; // TODO: Manik: Customise this generated block
+ }
+
+ public boolean exists(Object key) {
+ return false; // TODO: Manik: Customise this generated block
+ }
+
+ public Object put(Object key, Object value) {
+ return null; // TODO: Manik: Customise this generated block
+ }
+
+ public void clear() {
+ // TODO: Manik: Customise this generated block
+ }
+
+ public Object remove(Object key) {
+ return null; // TODO: Manik: Customise this generated block
+ }
+
+ public List getAllEntries() {
+ return null; // TODO: Manik: Customise this generated block
+ }
+
+ private boolean isOriginLocal() {
+ return
cache.getAdvancedCache().getInvocationContextContainer().get().isOriginLocal();
+ }
+
+ public Set getChildrenNames(Fqn fqn) throws Exception {
+ if (!isCacheReady() || !isOriginLocal()) return Collections.emptySet();
+ lock.acquireLock(fqn, true);
+// try
+// {
+// GetChildrenNamesCommand command =
commandsFactory.buildGetChildrenNamesCommand(fqn);
+// Object resp = callRemote(command);
+// return (Set) resp;
+// }
+// finally
+// {
+// lock.releaseLock(fqn);
+// }
+ throw new RuntimeException("Implement me");
+ }
+
+ private Object callRemote(DataCommand dataCommand) throws Exception {
+ if (trace) log.trace("cache=" + cache.getCacheManager().getAddress() +
"; calling with " + dataCommand);
+// ClusteredGetCommand clusteredGet =
commandsFactory.buildClusteredGetCommand(false, dataCommand);
+ List resps;
+ // JBCACHE-1186
+// resps = cache.getRPCManager().callRemoteMethods(null, clusteredGet,
GroupRequest.GET_ALL, config.getTimeout(), new ResponseValidityFilter(cache.getMembers(),
cache.getLocalAddress()), false);
+
+// if (resps == null)
+// {
+// if (log.isInfoEnabled())
+// log.info("No replies to call " + dataCommand + ". Perhaps
we're alone in the cluster?");
+// throw new ReplicationException("No replies to call " + dataCommand +
". Perhaps we're alone in the cluster?");
+// }
+// else
+// {
+// // test for and remove exceptions
+// Iterator i = resps.iterator();
+// Object result = null;
+// while (i.hasNext())
+// {
+// Object o = i.next();
+// if (o instanceof Exception)
+// {
+// if (log.isDebugEnabled())
+// log.debug("Found remote exception among responses - removing
from responses list", (Exception) o);
+// }
+// else if (o != null)
+// {
+// // keep looping till we find a FOUND answer.
+// List<Boolean> clusteredGetResp = (List<Boolean>) o;
+// // found?
+// if (clusteredGetResp.get(0))
+// {
+// result = clusteredGetResp.get(1);
+// break;
+// }
+// }
+// else if (!cache.getConfiguration().isUseRegionBasedMarshalling())
+// {
+// throw new IllegalStateException("Received unexpected null response
to " + clusteredGet);
+// }
+// // else region was inactive on peer;
+// // keep looping to see if anyone else responded
+// }
+//
+// if (trace) log.trace("got responses " + resps);
+// return result;
+// }
+ throw new RuntimeException("Implement me");
+ }
+
+ public Map get(Fqn name) throws Exception {
+ return get0(name);
+ }
+
+ protected Map get0(Fqn name) throws Exception {
+ // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
+ if (!isCacheReady() || !isOriginLocal()) return Collections.emptyMap();
+ lock.acquireLock(name, true);
+// try
+// {
+// GetDataMapCommand command = commandsFactory.buildGetDataMapCommand(name);
+// Object resp = callRemote(command);
+// return (Map) resp;
+// }
+// finally
+// {
+// lock.releaseLock(name);
+// }
+ throw new RuntimeException("Implement me");
+ }
+
+ public boolean exists(Fqn name) throws Exception {
+ // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
+ if (!isCacheReady() || !isOriginLocal()) return false;
+
+ lock.acquireLock(name, false);
+// try
+// {
+// ExistsCommand command = commandsFactory.buildExistsNodeCommand(name);
+// Object resp = callRemote(command);
+// return resp != null && (Boolean) resp;
+// }
+// finally
+// {
+// lock.releaseLock(name);
+// }
+ throw new RuntimeException("Implement me");
+ }
+
+ public Object put(Fqn name, Object key, Object value) throws Exception {
+ // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
+ if (!isCacheReady() || !isOriginLocal()) return null;
+ lock.acquireLock(name, true);
+ try {
+// NodeSPI n = cache.peek(name, false);
+// if (n == null)
+// {
+// GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name,
key, true);
+// return callRemote(command);
+// }
+// else
+// {
+// // dont bother with a remote call
+// return n.getDirect(key);
+// }
+ }
+ finally {
+ lock.releaseLock(name);
+ }
+ throw new RuntimeException("Implement me");
+ }
+
+ /**
+ * Does nothing; replication handles put.
+ */
+ public void put(Fqn name, Map attributes) throws Exception {
+ }
+
+ /**
+ * Does nothing; replication handles put.
+ */
+// @Override
+// public void put(List<Modification> modifications) throws Exception
+// {
+// }
+
+ /**
+ * Fetches the remove value, does not remove. Replication handles removal.
+ */
+ public Object remove(Fqn name, Object key) throws Exception {
+ // DON'T make a remote call if this is a remote call in the first place - leads
to deadlocks - JBCACHE-1103
+ if (!isCacheReady() || !isOriginLocal()) return false;
+ lock.acquireLock(name, true);
+ try {
+// NodeSPI n = cache.peek(name, true);
+// if (n == null)
+// {
+// GetKeyValueCommand command = commandsFactory.buildGetKeyValueCommand(name,
key, true);
+// return callRemote(command);
+// }
+// else
+// {
+// // dont bother with a remote call
+// return n.getDirect(key);
+// }
+ }
+ finally {
+ lock.releaseLock(name);
+ }
+ throw new RuntimeException("Implement me");
+ }
+
+ /**
+ * Does nothing; replication handles removal.
+ */
+ public void remove(Fqn name) throws Exception {
+ // do nothing
+ }
+
+ /**
+ * Does nothing; replication handles removal.
+ */
+ public void removeData(Fqn name) throws Exception {
+ }
+
+ /**
+ * Does nothing.
+ */
+// @Override
+// public void prepare(Object tx, List modifications, boolean one_phase) throws
Exception
+// {
+// }
+
+ /**
+ * Does nothing.
+ */
+// @Override
+// public void commit(Object tx) throws Exception
+// {
+// }
+
+ /**
+ * Does nothing.
+ */
+ @Override
+ public void rollback(Object tx) {
+ }
+
+// @Override
+// public void loadEntireState(ObjectOutputStream os) throws Exception
+// {
+// //intentional no-op
+// }
+
+ // @Override
+
+ public void loadState(Fqn subtree, ObjectOutputStream os) throws Exception {
+ // intentional no-op
+ }
+
+// @Override
+// public void storeEntireState(ObjectInputStream is) throws Exception
+// {
+// // intentional no-op
+// }
+
+ // @Override
+
+ public void storeState(Fqn subtree, ObjectInputStream is) throws Exception {
+ // intentional no-op
+ }
+
+ public static class ResponseValidityFilter implements ResponseFilter {
+ private int numValidResponses = 0;
+ private List<Address> pendingResponders;
+
+ public ResponseValidityFilter(List<Address> expected, Address localAddress)
{
+ this.pendingResponders = new ArrayList<Address>(expected);
+ // We'll never get a response from ourself
+ this.pendingResponders.remove(localAddress);
+ }
+
+ public boolean isAcceptable(Object object, Address address) {
+ pendingResponders.remove(address);
+
+ if (object instanceof List) {
+ List response = (List) object;
+ Boolean foundResult = (Boolean) response.get(0);
+ if (foundResult) numValidResponses++;
+ }
+ // always return true to make sure a response is logged by the JGroups
RpcDispatcher.
+ return true;
+ }
+
+ public boolean needMoreResponses() {
+ return numValidResponses < 1 && pendingResponders.size() > 0;
+ }
+
+ }
+
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/ClusteredCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoader.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,415 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import net.jcip.annotations.ThreadSafe;
-import org.horizon.CacheException;
-import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
-import org.horizon.lock.StripedLock;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.marshall.EntryData;
-import org.jboss.util.stream.MarshalledValueInputStream;
-
-import java.io.Closeable;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileNotFoundException;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * Simple file-based CacheLoader implementation. Nodes are directories, attributes of a
node is a file in the directory
- * <p/>
- * The FileCacheLoader has some severe limitations which restrict its use in a production
environment, or if used in
- * such an environment, it should be used with due care and sufficient understanding of
these limitations. <ul> <li>Due
- * to the way the FileCacheLoader represents a tree structure on disk (directories and
files) traversal is inefficient
- * for deep trees.</li> <li>Usage on shared filesystems like NFS, Windows
shares, etc. should be avoided as these do not
- * implement proper file locking and can cause data corruption.</li>
<li>Usage with an isolation level of NONE can cause
- * corrupt writes as multiple threads attempt to write to the same file.</li>
<li>File systems are inherently not
- * transactional, so when attempting to use your cache in a transactional context,
failures when writing to the file
- * (which happens during the commit phase) cannot be recovered.</li> </ul>
- * <p/>
- * As a rule of thumb, it is recommended that the FileCacheLoader not be used in a highly
concurrent, transactional or
- * stressful environment, and its use is restricted to testing.
- * <p/>
- * In terms of concurrency, file systems are notoriously inconsistent in their
implementations of concurrent locks. To
- * get around this and to meet the <b>thread safety</b> contracts set out in
{@link CacheLoader}, this implementation
- * uses a {@link StripedLock}
- *
- * @author Bela Ban
- * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
- * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
- * @since 1.0
- */
-@ThreadSafe
-public class FileCacheLoader<K, V> extends AbstractCacheLoader<K, V> {
- File root = null;
- String rootPath = null;
- Log log = LogFactory.getLog(getClass());
-
- protected final StripedLock lock = new StripedLock();
-
- private FileCacheLoaderConfig config;
-
- /**
- * For full path, check '*' '<' '>' '|'
'"' '?' Regex: [\*<>|"?]
- */
- public static final Pattern PATH_PATTERN =
Pattern.compile("[\\*<>|\"?]");
-
- /**
- * For fqn, check '*' '<' '>' '|'
'"' '?' and also '\' '/' and ':'
- */
- public static final Pattern KEY_PATTERN =
Pattern.compile("[\\\\\\/:*<>|\"?]");
- private static boolean isOldWindows;
-
- static {
- float osVersion = -1;
- try {
- osVersion =
Float.parseFloat(System.getProperty("os.version").trim());
- }
- catch (Exception e) {
- // ignore
- }
- // 4.x is windows NT/2000 and 5.x is XP.
- isOldWindows =
System.getProperty("os.name").toLowerCase().startsWith("windows")
&& osVersion < 4;
- }
-
- public void setConfig(IndividualCacheLoaderConfig base) {
- if (base instanceof FileCacheLoaderConfig) {
- this.config = (FileCacheLoaderConfig) base;
- } else if (base != null) {
- this.config = new FileCacheLoaderConfig(base);
- }
-
- String location = this.config != null ? this.config.getLocation() : null;
- if (location != null && location.length() > 0) {
- root = new File(location);
- rootPath = root.getAbsolutePath() + File.separator;
- }
- }
-
- public IndividualCacheLoaderConfig getConfig() {
- return config;
- }
-
- @Override
- public void create() {
- if (root == null) {
- String tmpLocation = System.getProperty("java.io.tmpdir",
"C:\\tmp");
- root = new File(tmpLocation);
- rootPath = root.getAbsolutePath() + File.separator;
- }
- if (!root.exists()) {
- if (log.isTraceEnabled()) {
- log.trace("Creating cache loader location " + root);
- }
-
- if (config.isCheckCharacterPortability()) {
- /* Before creating the root, check whether the path is character portable.
Anything that comes after is part
- of the fqn which is inspected later. */
- isCharacterPortableLocation(root.getAbsolutePath());
- }
-
- boolean created = root.mkdirs();
- if (!created) {
- throw new CacheException("Unable to create cache loader location "
+ root);
- }
- }
-
- if (!root.isDirectory()) {
- throw new CacheException("Cache loader location [" + root + "] is
not a directory!");
- }
- }
-
- public V get(Object key) {
- lock(key);
- try {
- try {
- return loadValue(key);
- }
- catch (Exception e) {
- throw new CacheException(e);
- }
- }
- finally {
- unlock(key);
- }
- }
-
- public boolean exists(Object key) {
- lock(key);
- try {
- return getFile(key, false).exists();
- }
- catch (IOException e) {
- throw new CacheException(e);
- }
- finally {
- unlock(key);
- }
- }
-
- public V put(K key, V value) {
- lock(key);
- try {
- V retval;
- try {
- retval = loadValue(key);
- storeValue(key, value);
- return retval;
- }
- catch (Exception e) {
- throw new CacheException(e);
- }
- }
- finally {
- unlock(key);
- }
- }
-
- public V remove(Object key) {
- lock(key);
- try {
- V retval;
- try {
- File file = getFile(key, false);
- if (!file.exists())
- return null;
-
- retval = loadValue(key);
- file.delete();
- return retval;
- }
- catch (Exception e) {
- throw new CacheException(e);
- }
- }
- finally {
- unlock(key);
- }
- }
-
- private void unlock(Object key) {
- lock.releaseLock(key.toString());
- }
-
- private void lock(Object key) {
- lock.acquireLock(key.toString(), true);
- }
-
- private File getDirectory(Object key, boolean create) throws IOException {
- File f = new File(getFullPath(key));
- if (!f.exists()) {
- if (create) {
- boolean make = f.mkdirs();
- if (!make)
- throw new IOException("Unable to mkdirs " + f);
- }
- }
- return f;
- }
-
- private String getFullPath(Object key) {
- return rootPath;
- }
-
- private void safeClose(Closeable closeable) {
- if (closeable == null)
- return;
-
- try {
- closeable.close();
- }
- catch (IOException e) {
- }
- }
-
- private V loadValue(Object key) throws Exception {
- File child = getFile(key, false);
- if (!child.exists())
- return null;
-
- return loadValue(child);
- }
-
- private V loadValue(File file) throws Exception {
- FileInputStream fileIn = null;
- ObjectInputStream input = null;
-
- try {
- fileIn = new FileInputStream(file);
- input = new MarshalledValueInputStream(fileIn);
- return (V) getMarshaller().objectFromObjectStream(input);
- }
- catch (FileNotFoundException fnfe) {
- return null;
- }
- finally {
- safeClose(input);
- safeClose(fileIn);
- }
- }
-
- private EntryData<K, V> loadEntry(File file) throws Exception {
- FileInputStream fileIn = null;
- ObjectInputStream input = null;
-
- try {
- fileIn = new FileInputStream(file);
- input = new MarshalledValueInputStream(fileIn);
- V value = (V) getMarshaller().objectFromObjectStream(input);
- K key = (K) getMarshaller().objectFromObjectStream(input);
- return new EntryData<K, V>(key, value);
- }
- catch (FileNotFoundException fnfe) {
- return null;
- }
- finally {
- safeClose(input);
- safeClose(fileIn);
- }
- }
-
- private File getFile(Object key, boolean create) throws IOException {
- File directory = getDirectory(key, create);
- File child = new File(directory, key.toString());
- if (create && !child.exists()) {
- if (config.isCheckCharacterPortability()) {
- /* Check whether the entire file path (root + fqn + data file name), is
length portable */
- isLengthPortablePath(child.getAbsolutePath());
- /* Check whether the fqn tree we're trying to store could contain non
portable characters */
- isCharacterPortableKey(key);
- }
-
- if (!child.createNewFile()) {
- throw new IOException("Unable to create file: " + child);
- }
- }
-
- return child;
- }
-
- protected void storeValue(Object key, Object value) throws Exception {
- File child = getFile(key, true);
-
- FileOutputStream fileOut = null;
- ObjectOutputStream output = null;
- try {
- fileOut = new FileOutputStream(child);
- output = new ObjectOutputStream(fileOut);
- getMarshaller().objectToObjectStream(value, output);
- getMarshaller().objectToObjectStream(key, output); // For getAllEntries
- }
- finally {
- safeClose(output);
- safeClose(fileOut);
- }
- }
-
- protected boolean isCharacterPortableLocation(String fileAbsolutePath) {
- Matcher matcher = PATH_PATTERN.matcher(fileAbsolutePath);
- if (matcher.find()) {
- log.warn("Cache loader location ( " + fileAbsolutePath + " )
contains one of these characters: '*' '<' '>' '|'
'\"' '?'");
- log.warn("Directories containing these characters are illegal in some
operative systems and could lead to portability issues");
- return false;
- }
-
- return true;
- }
-
- protected boolean isCharacterPortableKey(Object key) {
- // getFullPath converts Object to String via toString(), so we do too
- Matcher matcher = KEY_PATTERN.matcher(key.toString());
- if (matcher.find()) {
- log.warn("The key.toString() contains one of these characters: '*'
'<' '>' '|' '\"' '?' '\\'
'/' ':' ");
- log.warn("Directories containing these characters are illegal in some
operating systems and could lead to portability issues");
- return false;
- }
-
- return true;
- }
-
- protected boolean isLengthPortablePath(String absoluteFqnPath) {
-
- if (isOldWindows && absoluteFqnPath.length() > 255) {
- log.warn("The full absolute path to the fqn that you are trying to store is
bigger than 255 characters, this could lead to problems on certain Windows systems: "
+ absoluteFqnPath);
- return false;
- }
-
- return true;
- }
-
- public void clear() {
- File directory = new File(rootPath);
- if (!directory.exists())
- return;
-
- File[] files = directory.listFiles();
- if (files != null) {
- for (File file : files) {
- try {
- lock(file.getName());
- if (file.exists())
- file.delete();
- }
- finally {
- unlock(file.getName());
- }
- }
- }
- }
-
- public List<EntryData<K, V>> getAllEntries() {
- List<EntryData<K, V>> entries = new LinkedList<EntryData<K,
V>>();
- File directory = new File(rootPath);
- if (!directory.exists())
- return entries;
-
- File[] files = directory.listFiles();
- if (files != null) {
- for (File file : files) {
- try {
- lock(file.getName());
- if (file.exists()) {
- EntryData<K, V> entry = loadEntry(file);
- if (entry != null)
- entries.add(entry);
- }
- }
- catch (Exception e) {
- }
- finally {
- unlock(file.getName());
- }
- }
- }
-
- return entries;
- }
-}
Modified: core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderConfig.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderConfig.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -35,16 +35,16 @@
private boolean checkCharacterPortability = true;
public FileCacheLoaderConfig() {
- setClassName(FileCacheLoader.class.getName());
+ setClassName(FileCacheLoaderOld.class.getName());
}
/**
- * For use by {@link FileCacheLoader}.
+ * For use by {@link FileCacheLoaderOld}.
*
* @param base generic config object created by XML parsing.
*/
FileCacheLoaderConfig(IndividualCacheLoaderConfig base) {
- setClassName(FileCacheLoader.class.getName());
+ setClassName(FileCacheLoaderOld.class.getName());
populateFromBaseConfig(base);
}
Copied: core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderOld.java (from
rev 7666, core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderOld.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,415 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.CacheException;
+import org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig;
+import org.horizon.lock.StripedLock;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.EntryData;
+import org.jboss.util.stream.MarshalledValueInputStream;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Simple file-based CacheLoader implementation. Nodes are directories, attributes of a
node is a file in the directory
+ * <p/>
+ * The FileCacheLoader has some severe limitations which restrict its use in a production
environment, or if used in
+ * such an environment, it should be used with due care and sufficient understanding of
these limitations. <ul> <li>Due
+ * to the way the FileCacheLoader represents a tree structure on disk (directories and
files) traversal is inefficient
+ * for deep trees.</li> <li>Usage on shared filesystems like NFS, Windows
shares, etc. should be avoided as these do not
+ * implement proper file locking and can cause data corruption.</li>
<li>Usage with an isolation level of NONE can cause
+ * corrupt writes as multiple threads attempt to write to the same file.</li>
<li>File systems are inherently not
+ * transactional, so when attempting to use your cache in a transactional context,
failures when writing to the file
+ * (which happens during the commit phase) cannot be recovered.</li> </ul>
+ * <p/>
+ * As a rule of thumb, it is recommended that the FileCacheLoader not be used in a highly
concurrent, transactional or
+ * stressful environment, and its use is restricted to testing.
+ * <p/>
+ * In terms of concurrency, file systems are notoriously inconsistent in their
implementations of concurrent locks. To
+ * get around this and to meet the <b>thread safety</b> contracts set out in
{@link CacheLoaderOld}, this implementation
+ * uses a {@link StripedLock}
+ *
+ * @author Bela Ban
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @since 1.0
+ */
+@ThreadSafe
+public class FileCacheLoaderOld<K, V> extends AbstractCacheLoaderOld<K, V> {
+ File root = null;
+ String rootPath = null;
+ Log log = LogFactory.getLog(getClass());
+
+ protected final StripedLock lock = new StripedLock();
+
+ private FileCacheLoaderConfig config;
+
+ /**
+ * For full path, check '*' '<' '>' '|'
'"' '?' Regex: [\*<>|"?]
+ */
+ public static final Pattern PATH_PATTERN =
Pattern.compile("[\\*<>|\"?]");
+
+ /**
+ * For fqn, check '*' '<' '>' '|'
'"' '?' and also '\' '/' and ':'
+ */
+ public static final Pattern KEY_PATTERN =
Pattern.compile("[\\\\\\/:*<>|\"?]");
+ private static boolean isOldWindows;
+
+ static {
+ float osVersion = -1;
+ try {
+ osVersion =
Float.parseFloat(System.getProperty("os.version").trim());
+ }
+ catch (Exception e) {
+ // ignore
+ }
+ // 4.x is windows NT/2000 and 5.x is XP.
+ isOldWindows =
System.getProperty("os.name").toLowerCase().startsWith("windows")
&& osVersion < 4;
+ }
+
+ public void setConfig(IndividualCacheLoaderConfig base) {
+ if (base instanceof FileCacheLoaderConfig) {
+ this.config = (FileCacheLoaderConfig) base;
+ } else if (base != null) {
+ this.config = new FileCacheLoaderConfig(base);
+ }
+
+ String location = this.config != null ? this.config.getLocation() : null;
+ if (location != null && location.length() > 0) {
+ root = new File(location);
+ rootPath = root.getAbsolutePath() + File.separator;
+ }
+ }
+
+ public IndividualCacheLoaderConfig getConfig() {
+ return config;
+ }
+
+ @Override
+ public void create() {
+ if (root == null) {
+ String tmpLocation = System.getProperty("java.io.tmpdir",
"C:\\tmp");
+ root = new File(tmpLocation);
+ rootPath = root.getAbsolutePath() + File.separator;
+ }
+ if (!root.exists()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Creating cache loader location " + root);
+ }
+
+ if (config.isCheckCharacterPortability()) {
+ /* Before creating the root, check whether the path is character portable.
Anything that comes after is part
+ of the fqn which is inspected later. */
+ isCharacterPortableLocation(root.getAbsolutePath());
+ }
+
+ boolean created = root.mkdirs();
+ if (!created) {
+ throw new CacheException("Unable to create cache loader location "
+ root);
+ }
+ }
+
+ if (!root.isDirectory()) {
+ throw new CacheException("Cache loader location [" + root + "] is
not a directory!");
+ }
+ }
+
+ public V get(Object key) {
+ lock(key);
+ try {
+ try {
+ return loadValue(key);
+ }
+ catch (Exception e) {
+ throw new CacheException(e);
+ }
+ }
+ finally {
+ unlock(key);
+ }
+ }
+
+ public boolean exists(Object key) {
+ lock(key);
+ try {
+ return getFile(key, false).exists();
+ }
+ catch (IOException e) {
+ throw new CacheException(e);
+ }
+ finally {
+ unlock(key);
+ }
+ }
+
+ public V put(K key, V value) {
+ lock(key);
+ try {
+ V retval;
+ try {
+ retval = loadValue(key);
+ storeValue(key, value);
+ return retval;
+ }
+ catch (Exception e) {
+ throw new CacheException(e);
+ }
+ }
+ finally {
+ unlock(key);
+ }
+ }
+
+ public V remove(Object key) {
+ lock(key);
+ try {
+ V retval;
+ try {
+ File file = getFile(key, false);
+ if (!file.exists())
+ return null;
+
+ retval = loadValue(key);
+ file.delete();
+ return retval;
+ }
+ catch (Exception e) {
+ throw new CacheException(e);
+ }
+ }
+ finally {
+ unlock(key);
+ }
+ }
+
+ private void unlock(Object key) {
+ lock.releaseLock(key.toString());
+ }
+
+ private void lock(Object key) {
+ lock.acquireLock(key.toString(), true);
+ }
+
+ private File getDirectory(Object key, boolean create) throws IOException {
+ File f = new File(getFullPath(key));
+ if (!f.exists()) {
+ if (create) {
+ boolean make = f.mkdirs();
+ if (!make)
+ throw new IOException("Unable to mkdirs " + f);
+ }
+ }
+ return f;
+ }
+
+ private String getFullPath(Object key) {
+ return rootPath;
+ }
+
+ private void safeClose(Closeable closeable) {
+ if (closeable == null)
+ return;
+
+ try {
+ closeable.close();
+ }
+ catch (IOException e) {
+ }
+ }
+
+ private V loadValue(Object key) throws Exception {
+ File child = getFile(key, false);
+ if (!child.exists())
+ return null;
+
+ return loadValue(child);
+ }
+
+ private V loadValue(File file) throws Exception {
+ FileInputStream fileIn = null;
+ ObjectInputStream input = null;
+
+ try {
+ fileIn = new FileInputStream(file);
+ input = new MarshalledValueInputStream(fileIn);
+ return (V) getMarshaller().objectFromObjectStream(input);
+ }
+ catch (FileNotFoundException fnfe) {
+ return null;
+ }
+ finally {
+ safeClose(input);
+ safeClose(fileIn);
+ }
+ }
+
+ private EntryData<K, V> loadEntry(File file) throws Exception {
+ FileInputStream fileIn = null;
+ ObjectInputStream input = null;
+
+ try {
+ fileIn = new FileInputStream(file);
+ input = new MarshalledValueInputStream(fileIn);
+ V value = (V) getMarshaller().objectFromObjectStream(input);
+ K key = (K) getMarshaller().objectFromObjectStream(input);
+ return new EntryData<K, V>(key, value);
+ }
+ catch (FileNotFoundException fnfe) {
+ return null;
+ }
+ finally {
+ safeClose(input);
+ safeClose(fileIn);
+ }
+ }
+
+ private File getFile(Object key, boolean create) throws IOException {
+ File directory = getDirectory(key, create);
+ File child = new File(directory, key.toString());
+ if (create && !child.exists()) {
+ if (config.isCheckCharacterPortability()) {
+ /* Check whether the entire file path (root + fqn + data file name), is
length portable */
+ isLengthPortablePath(child.getAbsolutePath());
+ /* Check whether the fqn tree we're trying to store could contain non
portable characters */
+ isCharacterPortableKey(key);
+ }
+
+ if (!child.createNewFile()) {
+ throw new IOException("Unable to create file: " + child);
+ }
+ }
+
+ return child;
+ }
+
+ protected void storeValue(Object key, Object value) throws Exception {
+ File child = getFile(key, true);
+
+ FileOutputStream fileOut = null;
+ ObjectOutputStream output = null;
+ try {
+ fileOut = new FileOutputStream(child);
+ output = new ObjectOutputStream(fileOut);
+ getMarshaller().objectToObjectStream(value, output);
+ getMarshaller().objectToObjectStream(key, output); // For getAllEntries
+ }
+ finally {
+ safeClose(output);
+ safeClose(fileOut);
+ }
+ }
+
+ protected boolean isCharacterPortableLocation(String fileAbsolutePath) {
+ Matcher matcher = PATH_PATTERN.matcher(fileAbsolutePath);
+ if (matcher.find()) {
+ log.warn("Cache loader location ( " + fileAbsolutePath + " )
contains one of these characters: '*' '<' '>' '|'
'\"' '?'");
+ log.warn("Directories containing these characters are illegal in some
operative systems and could lead to portability issues");
+ return false;
+ }
+
+ return true;
+ }
+
+ protected boolean isCharacterPortableKey(Object key) {
+ // getFullPath converts Object to String via toString(), so we do too
+ Matcher matcher = KEY_PATTERN.matcher(key.toString());
+ if (matcher.find()) {
+ log.warn("The key.toString() contains one of these characters: '*'
'<' '>' '|' '\"' '?' '\\'
'/' ':' ");
+ log.warn("Directories containing these characters are illegal in some
operating systems and could lead to portability issues");
+ return false;
+ }
+
+ return true;
+ }
+
+ protected boolean isLengthPortablePath(String absoluteFqnPath) {
+
+ if (isOldWindows && absoluteFqnPath.length() > 255) {
+ log.warn("The full absolute path to the fqn that you are trying to store is
bigger than 255 characters, this could lead to problems on certain Windows systems: "
+ absoluteFqnPath);
+ return false;
+ }
+
+ return true;
+ }
+
+ public void clear() {
+ File directory = new File(rootPath);
+ if (!directory.exists())
+ return;
+
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ try {
+ lock(file.getName());
+ if (file.exists())
+ file.delete();
+ }
+ finally {
+ unlock(file.getName());
+ }
+ }
+ }
+ }
+
+ public List<EntryData<K, V>> getAllEntries() {
+ List<EntryData<K, V>> entries = new LinkedList<EntryData<K,
V>>();
+ File directory = new File(rootPath);
+ if (!directory.exists())
+ return entries;
+
+ File[] files = directory.listFiles();
+ if (files != null) {
+ for (File file : files) {
+ try {
+ lock(file.getName());
+ if (file.exists()) {
+ EntryData<K, V> entry = loadEntry(file);
+ if (entry != null)
+ entries.add(entry);
+ }
+ }
+ catch (Exception e) {
+ }
+ finally {
+ unlock(file.getName());
+ }
+ }
+ }
+
+ return entries;
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/FileCacheLoaderOld.java
___________________________________________________________________
Name: svn:executable
+ *
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,84 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-
-import java.io.ObjectInputStream;
-import java.util.List;
-
-/**
- * Provides ignoreModifications features to all cache loaders.
- *
- * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
- * @since 1.0
- */
-public class ReadOnlyDelegatingCacheLoader<K, V> extends
AbstractDelegatingCacheLoader<K, V> {
- private static final Log log =
LogFactory.getLog(ReadOnlyDelegatingCacheLoader.class);
-
- public ReadOnlyDelegatingCacheLoader(CacheLoader cl) {
- super(cl);
- }
-
- @Override
- public V put(K key, V value) {
- log.trace("Not delegating write operation to underlying cache loader");
- return get(key);
- }
-
- @Override
- public void put(List<Modification> modifications) {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-
- @Override
- public V remove(Object key) {
- log.trace("Not delegating write operation to underlying cache loader");
- return get(key);
- }
-
- @Override
- public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-
- @Override
- public void commit(Object tx) {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-
- @Override
- public void rollback(Object tx) {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-
- @Override
- public void storeEntireState(ObjectInputStream is) {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-
- @Override
- public void clear() {
- log.trace("Not delegating write operation to underlying cache loader");
- }
-}
Copied:
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoaderOld.java
(from rev 7666,
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoader.java)
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,84 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+
+import java.io.ObjectInputStream;
+import java.util.List;
+
+/**
+ * Provides ignoreModifications features to all cache loaders.
+ *
+ * @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @since 1.0
+ */
+public class ReadOnlyDelegatingCacheLoaderOld<K, V> extends
AbstractDelegatingCacheLoaderOld<K, V> {
+ private static final Log log =
LogFactory.getLog(ReadOnlyDelegatingCacheLoaderOld.class);
+
+ public ReadOnlyDelegatingCacheLoaderOld(CacheLoaderOld cl) {
+ super(cl);
+ }
+
+ @Override
+ public V put(K key, V value) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ return get(key);
+ }
+
+ @Override
+ public void put(List<Modification> modifications) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public V remove(Object key) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ return get(key);
+ }
+
+ @Override
+ public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void commit(Object tx) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void rollback(Object tx) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void storeEntireState(ObjectInputStream is) {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+
+ @Override
+ public void clear() {
+ log.trace("Not delegating write operation to underlying cache loader");
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/ReadOnlyDelegatingCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted:
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoader.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoader.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoader.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -1,506 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.loader;
-
-import org.horizon.config.CacheLoaderConfig;
-import
org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
-import org.horizon.logging.Log;
-import org.horizon.logging.LogFactory;
-import org.horizon.notifications.Listener;
-import org.horizon.notifications.cachelistener.event.Event;
-import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
-import org.horizon.notifications.cachemanagerlistener.annotation.CacheStopped;
-import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
-import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
-import org.horizon.remoting.transport.Address;
-import org.horizon.tree.Fqn;
-
-import java.io.ObjectInputStream;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.Callable;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.Future;
-import java.util.concurrent.ThreadFactory;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-/**
- * SingletonStoreCacheLoader is a delegating cache loader used for situations when only
one cache instance should
- * interact with the underlying store. The coordinator of the cluster will be responsible
for the underlying
- * CacheLoader. SingletonStoreCacheLoader is a simply facade to a real CacheLoader
implementation. It always delegates
- * reads to the real CacheLoader.
- * <p/>
- * Writes are forwarded only if this SingletonStoreCacheLoader is currently the
cordinator. This avoid having all
- * CacheLoaders in a cluster writing the same data to the same underlying store. Although
not incorrect (e.g. a DB will
- * just discard additional INSERTs for the same key, and throw an exception), this will
avoid a lot of redundant
- * work.<br/>
- * <p/>
- * Whenever the current coordinator dies (or leaves), the second in line will take over.
That SingletonStoreCacheLoader
- * will then pass writes through to its underlying CacheLoader. Optionally, when a new
coordinator takes over the
- * Singleton, it can push the in-memory state to the cache cacheLoader, within a time
constraint.
- *
- * @author Bela Ban
- * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
- * @since 1.0
- */
-public class SingletonStoreCacheLoader extends AbstractDelegatingCacheLoader {
- /**
- * Log instance.
- */
- private static final Log log = LogFactory.getLog(SingletonStoreCacheLoader.class);
- private static final boolean trace = log.isTraceEnabled();
-
- /**
- * Name of thread that should pushing in-memory state to cache loader.
- */
- private static final String THREAD_NAME = "InMemoryToCacheLoaderPusher";
-
- /**
- * Configuration for the SingletonStoreCacheLoader.
- */
- private SingletonStoreDefaultConfig config;
-
- /**
- * Executor service used to submit tasks to push in-memory state.
- */
- private final ExecutorService executor;
-
- /**
- * Future result of the in-memory push state task. This allows
SingletonStoreCacheLoader to check whether there's any
- * push taks on going.
- */
- private Future<?> pushStateFuture; /* FutureTask guarantess a safe publication
of the result */
-
- /**
- * Address instance that allows SingletonStoreCacheLoader to find out whether it
became the coordinator of the
- * cluster, or whether it stopped being it. This dictates whether the
SingletonStoreCacheLoader is active or not.
- */
- private Address localAddress;
-
- /**
- * Whether the the current cache instance is the coordinator and therefore
SingletonStoreCacheLoader is active. Being
- * active means delegating calls to the underlying cache loader.
- */
- private boolean active;
-
- /**
- * Empty constructor so that it can instantiated using reflection.
- */
- public SingletonStoreCacheLoader() {
- super(null);
-
- executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
- public Thread newThread(Runnable r) {
- return new Thread(r, THREAD_NAME);
- }
- });
- }
-
- /**
- * Sets the config for SingletonStoreCacheLoader and for the delegating cache loader.
- */
- @Override
- public void setConfig(CacheLoaderConfig.IndividualCacheLoaderConfig config) {
- super.setConfig(config);
-
- SingletonStoreConfig ssc = config.getSingletonStoreConfig();
- if (ssc instanceof SingletonStoreDefaultConfig) {
- this.config = (SingletonStoreDefaultConfig) ssc;
- } else if (ssc != null) {
- this.config = new SingletonStoreDefaultConfig(ssc);
- } else {
- this.config = new SingletonStoreDefaultConfig();
- }
- }
-
- @Override
- public void create()// throws Exception
- {
- super.create();
-
- cache.addListener(new SingletonStoreListener());
- }
-
- /**
- * Protected constructor which should only be used from unit tests. Production code
should set
- * pushStateWhenCoordinator using setConfig() method instead.
- *
- * @param config configuration instance for SingletonStoreCacheLoader
- */
- protected SingletonStoreCacheLoader(SingletonStoreDefaultConfig config) {
- this();
-
- this.config = config;
- }
-
- /**
- * Returns SingletonStoreCacheLoader's configuration instance. This method has
been defined for convenience reasons
- * when unit testing SingletonStoreCacheLoader's configuration.
- *
- * @return instance of SingletonStoreDefaultConfig
- */
- protected SingletonStoreDefaultConfig getSingletonStoreDefaultConfig() {
- return config;
- }
-
- /**
- * Returns the Future instance of a running in-memory to cache loader push task. This
method has been defined for
- * convenience reasons when unit testing.
- *
- * @return an instance of Future
- */
- protected Future<?> getPushStateFuture() {
- return pushStateFuture;
- }
-
- /**
- * Method called when the cache instance either becomes the coordinator or stops being
the coordinator. If it becomes
- * the coordinator, it can optionally start the in-memory state transfer to the
underlying cache store.
- *
- * @param newActiveState true if the cache instance just became the coordinator, false
if the cache instance stopped
- * being the coordinator.
- */
- protected void activeStatusChanged(boolean newActiveState) throws PushStateException
{
- active = newActiveState;
- log.debug("changed mode: " + this);
- if (active && config.isPushStateWhenCoordinator()) {
- doPushState();
- }
- }
-
- /**
- * Factory method for the creation of a Callable task in charge of pushing in-memory
state to cache loader.
- *
- * @return new instance of Callable<?> whose call() method either throws an
exception or returns null if the task was
- * successfull.
- */
- protected Callable<?> createPushStateTask() {
- return new Callable() {
- public Object call() throws Exception {
- final boolean debugEnabled = log.isDebugEnabled();
-
- if (debugEnabled) log.debug("start pushing in-memory state to cache
cacheLoader");
-// pushState(cache.getRoot());
- if (debugEnabled) log.debug("in-memory state passed to cache cacheLoader
successfully");
-
- return null;
- }
- };
- }
-
- /**
- * Pushes the state of a specific cache entry by reading the entry's data from the
cache and putting in the cache store
- * via the cache loader. This method is call recursively so that it iterates through
the whole cache.
- *
- */
- // TODO implement me
-// protected void pushState(Object key) throws Exception
-// {
-// /* Put the key's data first */
-// }
-
- /**
- * Method that waits for the in-memory to cache loader state to finish. This
method's called in case a push state is
- * already in progress and we need to wait for it to finish.
- *
- * @param future instance of Future representing the on going push task
- * @param timeout time to wait for the push task to finish
- * @param unit instance of TimeUnit representing the unit of timeout
- */
- protected void awaitForPushToFinish(Future future, int timeout, TimeUnit unit) {
- final boolean debugEnabled = log.isDebugEnabled();
- try {
- if (debugEnabled) log.debug("wait for state push to cache loader to
finish");
- future.get(timeout, unit);
- }
- catch (TimeoutException e) {
- if (debugEnabled) log.debug("timed out waiting for state push to cache
loader to finish");
- }
- catch (ExecutionException e) {
- if (debugEnabled) log.debug("exception reported waiting for state push to
cache loader to finish");
- }
- catch (InterruptedException ie) {
- /* Re-assert the thread's interrupted status */
- Thread.currentThread().interrupt();
- if (trace) log.trace("wait for state push to cache loader to finish was
interrupted");
- }
- }
-
- /**
- * Called when the SingletonStoreCacheLoader discovers that the cache instance has
become the coordinator and push in
- * memory state has been enabled. It might not actually push the state if there's
an ongoing push task running, in
- * which case will wait for the push task to finish.
- *
- * @throws PushStateException when the push state task reports an issue.
- */
- private void doPushState() throws PushStateException {
- if (pushStateFuture == null || pushStateFuture.isDone()) {
- Callable<?> task = createPushStateTask();
- pushStateFuture = executor.submit(task);
- try {
- waitForTaskToFinish(pushStateFuture,
config.getPushStateWhenCoordinatorTimeout(), TimeUnit.MILLISECONDS);
- }
- catch (Exception e) {
- throw new PushStateException("unable to complete in memory state push to
cache loader", e);
- }
- } else {
- /* at the most, we wait for push state timeout value. if it push task finishes
earlier, this call
- * will stop when the push task finishes, otherwise a timeout exception will be
reported */
- awaitForPushToFinish(pushStateFuture,
config.getPushStateWhenCoordinatorTimeout(), TimeUnit.MILLISECONDS);
- }
- }
-
- /**
- * Waits, within a time constraint, for a task to finish.
- *
- * @param future represents the task waiting to finish.
- * @param timeout maximum time to wait for the time to finish.
- * @param unit instance of TimeUnit representing the unit of timeout
- * @throws Exception if any issues are reported while waiting for the task to finish
- */
- private void waitForTaskToFinish(Future future, int timeout, TimeUnit unit) throws
Exception {
- try {
- future.get(timeout, unit);
- }
- catch (TimeoutException e) {
- throw new Exception("task timed out", e);
- }
- catch (InterruptedException e) {
- /* Re-assert the thread's interrupted status */
- Thread.currentThread().interrupt();
- if (trace) log.trace("task was interrupted");
- }
- finally {
- /* no-op if task is completed */
- future.cancel(true); /* interrupt if running */
- }
- }
-
- /**
- * Indicates whether the current cache instances is the coordinator of the cluster.
- *
- * @param members new member list
- * @return whether the current cache instance is the coordinator or not.
- */
- private boolean isCoordinator(List<Address> members) {
- if (members != null && localAddress != null) {
- return members.size() > 0 && localAddress.equals(members.get(0));
- }
-
- /* Invalid new view, so previous value returned */
- return active;
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public Object put(Fqn name, Object key, Object value) throws Exception {
- if (active) {
-// return super.put(name, key, value);
- }
-
- return null;
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public void put(Fqn name, Map attributes) throws Exception {
- // TODO implement me
-// if (active)
-// {
-// super.put(name, attributes);
-// }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
-// public void put(List<Modification> modifications) throws Exception
-// {
- // TODO implement me
-// if (active)
-// {
-// super.put(modifications);
-// }
-// }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public Object remove(Fqn fqn, Object key) throws Exception {
- if (active) {
-// return super.remove(fqn, key);
- }
-
- return null;
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public void remove(Fqn fqn) throws Exception {
- if (active) {
- super.remove(fqn);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public void removeData(Fqn fqn) throws Exception {
- if (active) {
-// super.removeData(fqn);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
-// public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) throws Exception
-// {
-// if (active)
-// {
-// super.prepare(tx, modifications, one_phase);
-// }
-// }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
- @Override
- public void commit(Object tx) // throws Exception
- {
- if (active) {
- super.commit(tx);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
- @Override
- public void rollback(Object tx) {
- if (active) {
- super.rollback(tx);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
- @Override
- public void storeEntireState(ObjectInputStream is) //throws Exception
- {
- if (active) {
- super.storeEntireState(is);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
-// @Override
- public void storeState(Fqn subtree, ObjectInputStream is) throws Exception {
- if (active) {
- // super.storeState(subtree, is);
- }
- }
-
- /**
- * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
- */
- @Override
- public String toString() {
- return "loc_addr=" + localAddress + ", active=" + active;
- }
-
- /**
- * Cache listener that reacts to cluster topology changes to find out whether a new
coordinator is elected.
- * SingletonStoreCacheLoader reacts to these changes in order to decide which cache
instance should interact with the
- * underlying cache store.
- */
- @Listener
- public class SingletonStoreListener {
- /**
- * Cache started, check whether the cache instance is the coordinator and set the
singleton store cache loader's
- * active status.
- */
- @CacheStarted
- public void cacheStarted(Event e) {
- localAddress = cache.getCacheManager().getAddress();
- active = cache.getCacheManager().isCoordinator();
- if (log.isDebugEnabled()) log.debug("cache started: " + this);
- }
-
- @CacheStopped
- public void cacheStopped(Event e) {
- if (log.isDebugEnabled()) log.debug("cache stopped: " + this);
- }
-
- /**
- * The cluster formation changed, so determine whether the current cache instance
stopped being the coordinator or
- * became the coordinator. This method can lead to an optional in memory to cache
loader state push, if the
- * current cache instance became the coordinator. This method will report any
issues that could potentially arise
- * from this push.
- */
- @ViewChanged
- public void viewChange(ViewChangedEvent event) {
- boolean tmp = isCoordinator(event.getNewMemberList());
-
- if (active != tmp) {
- try {
- activeStatusChanged(tmp);
- }
- catch (PushStateException e) {
- log.error("exception reported changing cache instance's active
status", e);
- }
-
- }
- }
- }
-
- /**
- * Exception representing any issues that arise from pushing the in-memory state to
the cache loader.
- */
- public static class PushStateException extends Exception {
- private static final long serialVersionUID = 5542893943730200886L;
-
- public PushStateException(String message, Throwable cause) {
- super(message, cause);
- }
-
- public PushStateException(Throwable cause) {
- super(cause);
- }
- }
-}
\ No newline at end of file
Copied:
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoaderOld.java
(from rev 7669,
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoader.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoaderOld.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoaderOld.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,506 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.loader;
+
+import org.horizon.config.CacheLoaderConfig;
+import
org.horizon.config.CacheLoaderConfig.IndividualCacheLoaderConfig.SingletonStoreConfig;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.notifications.Listener;
+import org.horizon.notifications.cachelistener.event.Event;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStarted;
+import org.horizon.notifications.cachemanagerlistener.annotation.CacheStopped;
+import org.horizon.notifications.cachemanagerlistener.annotation.ViewChanged;
+import org.horizon.notifications.cachemanagerlistener.event.ViewChangedEvent;
+import org.horizon.remoting.transport.Address;
+import org.horizon.tree.Fqn;
+
+import java.io.ObjectInputStream;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * SingletonStoreCacheLoader is a delegating cache loader used for situations when only
one cache instance should
+ * interact with the underlying store. The coordinator of the cluster will be responsible
for the underlying
+ * CacheLoader. SingletonStoreCacheLoader is a simply facade to a real CacheLoader
implementation. It always delegates
+ * reads to the real CacheLoader.
+ * <p/>
+ * Writes are forwarded only if this SingletonStoreCacheLoader is currently the
cordinator. This avoid having all
+ * CacheLoaders in a cluster writing the same data to the same underlying store. Although
not incorrect (e.g. a DB will
+ * just discard additional INSERTs for the same key, and throw an exception), this will
avoid a lot of redundant
+ * work.<br/>
+ * <p/>
+ * Whenever the current coordinator dies (or leaves), the second in line will take over.
That SingletonStoreCacheLoader
+ * will then pass writes through to its underlying CacheLoader. Optionally, when a new
coordinator takes over the
+ * Singleton, it can push the in-memory state to the cache cacheLoader, within a time
constraint.
+ *
+ * @author Bela Ban
+ * @author <a href="mailto:galder.zamarreno@jboss.com">Galder
Zamarreno</a>
+ * @since 1.0
+ */
+public class SingletonStoreCacheLoaderOld extends AbstractDelegatingCacheLoaderOld {
+ /**
+ * Log instance.
+ */
+ private static final Log log = LogFactory.getLog(SingletonStoreCacheLoaderOld.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ /**
+ * Name of thread that should pushing in-memory state to cache loader.
+ */
+ private static final String THREAD_NAME = "InMemoryToCacheLoaderPusher";
+
+ /**
+ * Configuration for the SingletonStoreCacheLoader.
+ */
+ private SingletonStoreDefaultConfig config;
+
+ /**
+ * Executor service used to submit tasks to push in-memory state.
+ */
+ private final ExecutorService executor;
+
+ /**
+ * Future result of the in-memory push state task. This allows
SingletonStoreCacheLoader to check whether there's any
+ * push taks on going.
+ */
+ private Future<?> pushStateFuture; /* FutureTask guarantess a safe publication
of the result */
+
+ /**
+ * Address instance that allows SingletonStoreCacheLoader to find out whether it
became the coordinator of the
+ * cluster, or whether it stopped being it. This dictates whether the
SingletonStoreCacheLoader is active or not.
+ */
+ private Address localAddress;
+
+ /**
+ * Whether the the current cache instance is the coordinator and therefore
SingletonStoreCacheLoader is active. Being
+ * active means delegating calls to the underlying cache loader.
+ */
+ private boolean active;
+
+ /**
+ * Empty constructor so that it can instantiated using reflection.
+ */
+ public SingletonStoreCacheLoaderOld() {
+ super(null);
+
+ executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+ public Thread newThread(Runnable r) {
+ return new Thread(r, THREAD_NAME);
+ }
+ });
+ }
+
+ /**
+ * Sets the config for SingletonStoreCacheLoader and for the delegating cache loader.
+ */
+ @Override
+ public void setConfig(CacheLoaderConfig.IndividualCacheLoaderConfig config) {
+ super.setConfig(config);
+
+ SingletonStoreConfig ssc = config.getSingletonStoreConfig();
+ if (ssc instanceof SingletonStoreDefaultConfig) {
+ this.config = (SingletonStoreDefaultConfig) ssc;
+ } else if (ssc != null) {
+ this.config = new SingletonStoreDefaultConfig(ssc);
+ } else {
+ this.config = new SingletonStoreDefaultConfig();
+ }
+ }
+
+ @Override
+ public void create()// throws Exception
+ {
+ super.create();
+
+ cache.addListener(new SingletonStoreListener());
+ }
+
+ /**
+ * Protected constructor which should only be used from unit tests. Production code
should set
+ * pushStateWhenCoordinator using setConfig() method instead.
+ *
+ * @param config configuration instance for SingletonStoreCacheLoader
+ */
+ protected SingletonStoreCacheLoaderOld(SingletonStoreDefaultConfig config) {
+ this();
+
+ this.config = config;
+ }
+
+ /**
+ * Returns SingletonStoreCacheLoader's configuration instance. This method has
been defined for convenience reasons
+ * when unit testing SingletonStoreCacheLoader's configuration.
+ *
+ * @return instance of SingletonStoreDefaultConfig
+ */
+ protected SingletonStoreDefaultConfig getSingletonStoreDefaultConfig() {
+ return config;
+ }
+
+ /**
+ * Returns the Future instance of a running in-memory to cache loader push task. This
method has been defined for
+ * convenience reasons when unit testing.
+ *
+ * @return an instance of Future
+ */
+ protected Future<?> getPushStateFuture() {
+ return pushStateFuture;
+ }
+
+ /**
+ * Method called when the cache instance either becomes the coordinator or stops being
the coordinator. If it becomes
+ * the coordinator, it can optionally start the in-memory state transfer to the
underlying cache store.
+ *
+ * @param newActiveState true if the cache instance just became the coordinator, false
if the cache instance stopped
+ * being the coordinator.
+ */
+ protected void activeStatusChanged(boolean newActiveState) throws PushStateException
{
+ active = newActiveState;
+ log.debug("changed mode: " + this);
+ if (active && config.isPushStateWhenCoordinator()) {
+ doPushState();
+ }
+ }
+
+ /**
+ * Factory method for the creation of a Callable task in charge of pushing in-memory
state to cache loader.
+ *
+ * @return new instance of Callable<?> whose call() method either throws an
exception or returns null if the task was
+ * successfull.
+ */
+ protected Callable<?> createPushStateTask() {
+ return new Callable() {
+ public Object call() throws Exception {
+ final boolean debugEnabled = log.isDebugEnabled();
+
+ if (debugEnabled) log.debug("start pushing in-memory state to cache
cacheLoader");
+// pushState(cache.getRoot());
+ if (debugEnabled) log.debug("in-memory state passed to cache cacheLoader
successfully");
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Pushes the state of a specific cache entry by reading the entry's data from the
cache and putting in the cache store
+ * via the cache loader. This method is call recursively so that it iterates through
the whole cache.
+ *
+ */
+ // TODO implement me
+// protected void pushState(Object key) throws Exception
+// {
+// /* Put the key's data first */
+// }
+
+ /**
+ * Method that waits for the in-memory to cache loader state to finish. This
method's called in case a push state is
+ * already in progress and we need to wait for it to finish.
+ *
+ * @param future instance of Future representing the on going push task
+ * @param timeout time to wait for the push task to finish
+ * @param unit instance of TimeUnit representing the unit of timeout
+ */
+ protected void awaitForPushToFinish(Future future, int timeout, TimeUnit unit) {
+ final boolean debugEnabled = log.isDebugEnabled();
+ try {
+ if (debugEnabled) log.debug("wait for state push to cache loader to
finish");
+ future.get(timeout, unit);
+ }
+ catch (TimeoutException e) {
+ if (debugEnabled) log.debug("timed out waiting for state push to cache
loader to finish");
+ }
+ catch (ExecutionException e) {
+ if (debugEnabled) log.debug("exception reported waiting for state push to
cache loader to finish");
+ }
+ catch (InterruptedException ie) {
+ /* Re-assert the thread's interrupted status */
+ Thread.currentThread().interrupt();
+ if (trace) log.trace("wait for state push to cache loader to finish was
interrupted");
+ }
+ }
+
+ /**
+ * Called when the SingletonStoreCacheLoader discovers that the cache instance has
become the coordinator and push in
+ * memory state has been enabled. It might not actually push the state if there's
an ongoing push task running, in
+ * which case will wait for the push task to finish.
+ *
+ * @throws PushStateException when the push state task reports an issue.
+ */
+ private void doPushState() throws PushStateException {
+ if (pushStateFuture == null || pushStateFuture.isDone()) {
+ Callable<?> task = createPushStateTask();
+ pushStateFuture = executor.submit(task);
+ try {
+ waitForTaskToFinish(pushStateFuture,
config.getPushStateWhenCoordinatorTimeout(), TimeUnit.MILLISECONDS);
+ }
+ catch (Exception e) {
+ throw new PushStateException("unable to complete in memory state push to
cache loader", e);
+ }
+ } else {
+ /* at the most, we wait for push state timeout value. if it push task finishes
earlier, this call
+ * will stop when the push task finishes, otherwise a timeout exception will be
reported */
+ awaitForPushToFinish(pushStateFuture,
config.getPushStateWhenCoordinatorTimeout(), TimeUnit.MILLISECONDS);
+ }
+ }
+
+ /**
+ * Waits, within a time constraint, for a task to finish.
+ *
+ * @param future represents the task waiting to finish.
+ * @param timeout maximum time to wait for the time to finish.
+ * @param unit instance of TimeUnit representing the unit of timeout
+ * @throws Exception if any issues are reported while waiting for the task to finish
+ */
+ private void waitForTaskToFinish(Future future, int timeout, TimeUnit unit) throws
Exception {
+ try {
+ future.get(timeout, unit);
+ }
+ catch (TimeoutException e) {
+ throw new Exception("task timed out", e);
+ }
+ catch (InterruptedException e) {
+ /* Re-assert the thread's interrupted status */
+ Thread.currentThread().interrupt();
+ if (trace) log.trace("task was interrupted");
+ }
+ finally {
+ /* no-op if task is completed */
+ future.cancel(true); /* interrupt if running */
+ }
+ }
+
+ /**
+ * Indicates whether the current cache instances is the coordinator of the cluster.
+ *
+ * @param members new member list
+ * @return whether the current cache instance is the coordinator or not.
+ */
+ private boolean isCoordinator(List<Address> members) {
+ if (members != null && localAddress != null) {
+ return members.size() > 0 && localAddress.equals(members.get(0));
+ }
+
+ /* Invalid new view, so previous value returned */
+ return active;
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public Object put(Fqn name, Object key, Object value) throws Exception {
+ if (active) {
+// return super.put(name, key, value);
+ }
+
+ return null;
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public void put(Fqn name, Map attributes) throws Exception {
+ // TODO implement me
+// if (active)
+// {
+// super.put(name, attributes);
+// }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+// public void put(List<Modification> modifications) throws Exception
+// {
+ // TODO implement me
+// if (active)
+// {
+// super.put(modifications);
+// }
+// }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public Object remove(Fqn fqn, Object key) throws Exception {
+ if (active) {
+// return super.remove(fqn, key);
+ }
+
+ return null;
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public void remove(Fqn fqn) throws Exception {
+ if (active) {
+ super.remove(fqn);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public void removeData(Fqn fqn) throws Exception {
+ if (active) {
+// super.removeData(fqn);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+// public void prepare(Object tx, List<Modification> modifications, boolean
one_phase) throws Exception
+// {
+// if (active)
+// {
+// super.prepare(tx, modifications, one_phase);
+// }
+// }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+ @Override
+ public void commit(Object tx) // throws Exception
+ {
+ if (active) {
+ super.commit(tx);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+ @Override
+ public void rollback(Object tx) {
+ if (active) {
+ super.rollback(tx);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+ @Override
+ public void storeEntireState(ObjectInputStream is) //throws Exception
+ {
+ if (active) {
+ super.storeEntireState(is);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+// @Override
+ public void storeState(Fqn subtree, ObjectInputStream is) throws Exception {
+ if (active) {
+ // super.storeState(subtree, is);
+ }
+ }
+
+ /**
+ * Calls the underlying cache loader's operation if the current cache instance is
the coordinator.
+ */
+ @Override
+ public String toString() {
+ return "loc_addr=" + localAddress + ", active=" + active;
+ }
+
+ /**
+ * Cache listener that reacts to cluster topology changes to find out whether a new
coordinator is elected.
+ * SingletonStoreCacheLoader reacts to these changes in order to decide which cache
instance should interact with the
+ * underlying cache store.
+ */
+ @Listener
+ public class SingletonStoreListener {
+ /**
+ * Cache started, check whether the cache instance is the coordinator and set the
singleton store cache loader's
+ * active status.
+ */
+ @CacheStarted
+ public void cacheStarted(Event e) {
+ localAddress = cache.getCacheManager().getAddress();
+ active = cache.getCacheManager().isCoordinator();
+ if (log.isDebugEnabled()) log.debug("cache started: " + this);
+ }
+
+ @CacheStopped
+ public void cacheStopped(Event e) {
+ if (log.isDebugEnabled()) log.debug("cache stopped: " + this);
+ }
+
+ /**
+ * The cluster formation changed, so determine whether the current cache instance
stopped being the coordinator or
+ * became the coordinator. This method can lead to an optional in memory to cache
loader state push, if the
+ * current cache instance became the coordinator. This method will report any
issues that could potentially arise
+ * from this push.
+ */
+ @ViewChanged
+ public void viewChange(ViewChangedEvent event) {
+ boolean tmp = isCoordinator(event.getNewMemberList());
+
+ if (active != tmp) {
+ try {
+ activeStatusChanged(tmp);
+ }
+ catch (PushStateException e) {
+ log.error("exception reported changing cache instance's active
status", e);
+ }
+
+ }
+ }
+ }
+
+ /**
+ * Exception representing any issues that arise from pushing the in-memory state to
the cache loader.
+ */
+ public static class PushStateException extends Exception {
+ private static final long serialVersionUID = 5542893943730200886L;
+
+ public PushStateException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PushStateException(Throwable cause) {
+ super(cause);
+ }
+ }
+}
\ No newline at end of file
Property changes on:
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreCacheLoaderOld.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreDefaultConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreDefaultConfig.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/loader/SingletonStoreDefaultConfig.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -61,8 +61,8 @@
/* if we got to this point, we know that singleton store must have been enabled */
setSingletonStoreEnabled(true);
- /* and we also know that the configuration was created by SingletonStoreCacheLoader
*/
- setSingletonStoreClass(SingletonStoreCacheLoader.class.getName());
+ /* and we also know that the configuration was created by
SingletonStoreCacheLoaderOld */
+ setSingletonStoreClass(SingletonStoreCacheLoaderOld.class.getName());
}
/**
@@ -88,7 +88,7 @@
@Override
public String getSingletonStoreClass() {
- return SingletonStoreCacheLoader.class.getName();
+ return SingletonStoreCacheLoaderOld.class.getName();
}
@Override
Added: core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/StoredEntry.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -0,0 +1,12 @@
+package org.horizon.loader;
+
+/**
+ * // TODO: Manik: Document this!
+ *
+ * @author Manik Surtani
+ */
+public class StoredEntry<K, V> {
+ K key;
+ V value;
+ long expiry, created;
+}
Modified: core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-10 11:54:44
UTC (rev 7672)
+++ core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-10 11:58:28
UTC (rev 7673)
@@ -28,7 +28,7 @@
/**
* A simple implementation of lock striping, using cache entry keys to lock on, primarily
used to help make {@link
- * org.horizon.loader.CacheLoader} implemtations thread safe.
+ * org.horizon.loader.CacheLoaderOld} implemtations thread safe.
* <p/>
* Backed by a set of {@link java.util.concurrent.locks.ReentrantReadWriteLock}
instances, and using the key hashcodes
* to determine buckets.
Modified:
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/annotation/CacheEntryLoaded.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/annotation/CacheEntryLoaded.java 2009-02-10
11:54:44 UTC (rev 7672)
+++
core/branches/flat/src/main/java/org/horizon/notifications/cachelistener/annotation/CacheEntryLoaded.java 2009-02-10
11:58:28 UTC (rev 7673)
@@ -28,7 +28,7 @@
/**
* This annotation should be used on methods that need to be notified when a cache entry
is loaded from a {@link
- * org.horizon.loader.CacheLoader}.
+ * org.horizon.loader.CacheLoaderOld}.
* <p/>
* Methods annotated with this annotation should be public and take in a single
parameter, a {@link
* org.horizon.notifications.cachelistener.event.CacheEntryLoadedEvent} otherwise an
{@link
Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-10 11:54:44 UTC
(rev 7672)
+++ core/branches/flat/src/main/resources/config-samples/all.xml 2009-02-10 11:58:28 UTC
(rev 7673)
@@ -154,7 +154,7 @@
horizon.jdbc.table.drop=false
</properties>
- <singletonStore enabled="true"
class="org.horizon.loader.SingletonStoreCacheLoader">
+ <singletonStore enabled="true"
class="org.horizon.loader.SingletonStoreCacheLoaderOld">
<properties>
horizon.singletonStore.pushStateWhenCoordinator=true
horizon.singletonStore.pushStateWhenCoordinatorTimeout=20000