Author: mircea.markus
Date: 2009-02-25 05:23:49 -0500 (Wed, 25 Feb 2009)
New Revision: 7778
added support for replication queues
core/branches/flat/src/main/java/org/horizon/config/parsing/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/config/parsing/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -276,7 +276,7 @@
throw new ConfigurationException("Unable to configure eviction", e);
- if (p != null && !p.isEmpty()) XmlConfigHelper.setValues(cfg, p, false,
+ if (p != null && !p.isEmpty()) XmlConfigHelper. setValues(cfg, p, false,
Modified: core/branches/flat/src/main/java/org/horizon/loader/
--- core/branches/flat/src/main/java/org/horizon/loader/ 2009-02-24
22:02:12 UTC (rev 7777)
+++ core/branches/flat/src/main/java/org/horizon/loader/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -1,7 +1,6 @@
package org.horizon.loader;
import org.horizon.Cache;
-import org.horizon.lifecycle.Lifecycle;
import org.horizon.marshall.Marshaller;
import java.util.Set;
@@ -12,7 +11,7 @@
* @author Manik Surtani
* @since 1.0
-public interface CacheLoader extends Lifecycle {
+public interface CacheLoader {
* Used to initialize a cache loader. Typically invoked by the {@link
org.horizon.loader.CacheLoaderManager} when
@@ -50,7 +49,11 @@
boolean containsKey(Object key) throws CacheLoaderException;
+ public void start() throws CacheLoaderException;
+ public void stop() throws CacheLoaderException;
* @return the type of the {@link org.horizon.loader.CacheLoaderConfig} bean used to
configure this implementation of
* {@link org.horizon.loader.CacheLoader}
Modified: core/branches/flat/src/main/java/org/horizon/loader/
core/branches/flat/src/main/java/org/horizon/loader/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -130,7 +130,11 @@
public void stop() {
- if (loader != null) loader.stop();
+ if (loader != null) try {
+ loader.stop();
+ } catch (CacheLoaderException e) {
+ throw new CacheException(e);
+ }
loader = null;
Added: core/branches/flat/src/main/java/org/horizon/loader/bucket/
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,83 @@
+package org.horizon.loader.bucket;
+import org.horizon.loader.StoredEntry;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Collection;
+ * A bucket is where entries are stored.
+ */
+public final class Bucket implements Externalizable {
+ private Map<Object, StoredEntry> entries = new HashMap<Object,
+ private transient String bucketName;
+ public final void addEntry(StoredEntry se) {
+ entries.put(se.getKey(), se);
+ }
+ public final boolean removeEntry(Object key) {
+ return entries.remove(key) != null;
+ }
+ public final StoredEntry getEntry(Object key) {
+ return entries.get(key);
+ }
+ public final void writeExternal(ObjectOutput out) throws IOException {
+ out.writeInt(entries.size());
+ for (StoredEntry se : entries.values()) out.writeObject(se);
+ }
+ public final void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
+ int sz = in.readInt();
+ entries = new HashMap<Object, StoredEntry>(sz);
+ for (int i = 0; i < sz; i++) {
+ StoredEntry se = (StoredEntry) in.readObject();
+ entries.put(se.getKey(), se);
+ }
+ }
+ public Map<Object, StoredEntry> getEntries() {
+ return entries;
+ }
+ public String getBucketName() {
+ return bucketName;
+ }
+ public void setBucketName(String bucketName) {
+ this.bucketName = bucketName;
+ }
+ public boolean removeExpiredEntries() {
+ boolean result = false;
+ Iterator<Map.Entry<Object, StoredEntry>> entryIterator =
+ while (entryIterator.hasNext()) {
+ Map.Entry<Object, StoredEntry> entry =;
+ if (entry.getValue().isExpired()) {
+ entryIterator.remove();
+ result = true;
+ }
+ }
+ return result;
+ }
+ public Collection<? extends StoredEntry> getStoredEntries() {
+ return entries.values();
+ }
+ @Override
+ public String toString() {
+ return "Bucket{" +
+ "entries=" + entries +
+ ", bucketName='" + bucketName + '\'' +
+ '}';
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/bucket/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,175 @@
+package org.horizon.loader.bucket;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.lock.StripedLock;
+import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.Cache;
+import org.horizon.marshall.Marshaller;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+ * //TODO comment this
+ *
+ * @author Mircea.Markus(a)
+ * @since 1.0
+ */
+public abstract class BucketBasedCacheStore extends AbstractCacheStore {
+ private static Log log = LogFactory.getLog(BucketBasedCacheStore.class);
+ private StripedLock bucketLocks;
+ private BucketBasedCacheStoreConfig config;
+ private long globalLockTimeoutMillis;
+ /**
+ * This global lock guards against direct store access via clear() and the stream
APIs. These three methods would
+ * need exclusive (write) access to this lock while all others can use shared (read)
access to this lock since other
+ * methods will use finer grained bucket locks.
+ */
+ private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock();
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ this.config = (BucketBasedCacheStoreConfig) config;
+ }
+ public void start() throws CacheLoaderException {
+ bucketLocks = new StripedLock(config.getLockConcurrencyLevel());
+ globalLockTimeoutMillis = config.getLockAcquistionTimeout();
+ }
+ public StoredEntry load(Object key) throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("Loading entry " + key);
+ String keyHashCode = String.valueOf(key.hashCode());
+ lockForReading(keyHashCode);
+ Bucket bucket;
+ try {
+ bucket = loadBucket(keyHashCode);
+ if (bucket == null) return null;
+ StoredEntry se = bucket.getEntry(key);
+ if (se != null && se.isExpired()) {
+ bucket.removeEntry(key);
+ saveBucket(bucket);
+ return null;
+ } else {
+ return se;
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problems loading key " + key, e);
+ } finally {
+ unlock(keyHashCode);
+ }
+ }
+ public void store(StoredEntry ed) throws CacheLoaderException {
+ if (ed == null) return;
+ if (ed.isExpired()) {
+ log.trace("Entry " + ed + " is expired! Not doing
+ return;
+ }
+ if (log.isTraceEnabled()) log.trace("Storing entry " + ed);
+ String keyHashCode = String.valueOf(ed.getKey().hashCode());
+ lockForWritting(keyHashCode);
+ try {
+ Bucket bucket = loadBucket(keyHashCode);
+ if (bucket != null) {
+ bucket.addEntry(ed);
+ saveBucket(bucket);
+ } else {
+ bucket = new Bucket();
+ bucket.setBucketName(keyHashCode);
+ bucket.addEntry(ed);
+ insertBucket(bucket);
+ }
+ }
+ catch (Exception ex) {
+ throw new CacheLoaderException("Problems storing entry with key " +
ed.getKey(), ex);
+ } finally {
+ unlock(keyHashCode);
+ }
+ }
+ public boolean remove(Object key) throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("Removing key " + key);
+ String keyHashCodeStr = String.valueOf(key.hashCode());
+ Bucket bucket;
+ try {
+ lockForWritting(keyHashCodeStr);
+ bucket = loadBucket(keyHashCodeStr);
+ if (bucket == null) {
+ return false;
+ } else {
+ boolean success = bucket.removeEntry(key);
+ if (success) saveBucket(bucket);
+ return success;
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException("Problems removing key " + key, e);
+ } finally {
+ unlock(keyHashCodeStr);
+ }
+ }
+ protected void unlock(String keyHashCode) {
+ bucketLocks.releaseLock(keyHashCode);
+ globalLock.readLock().unlock();
+ }
+ protected void lockForWritting(String keyHashCode) throws CacheLoaderException {
+ try {
+ globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Received interrupt signal while waiting for global lock
+ throw new CacheLoaderException(e);
+ }
+ bucketLocks.acquireLock(keyHashCode, true);
+ }
+ protected void lockForReading(String keyHashCode) throws CacheLoaderException {
+ try {
+ globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
+ } catch (InterruptedException e) {
+ log.warn("Received interrupt signal while waiting for global lock
+ throw new CacheLoaderException(e);
+ }
+ bucketLocks.acquireLock(keyHashCode, false);
+ }
+ protected void acquireGlobalLock(boolean exclusive) throws TimeoutException,
InterruptedException {
+ Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
+ if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
+ throw new TimeoutException("Timed out trying to acquire " + (exclusive
? "exclusive" : "shared") + " global lock after " +
globalLockTimeoutMillis + " millis. Lock is " + l);
+ }
+ protected void releaseGlobalLock(boolean exclusive) {
+ Lock lock = exclusive ? globalLock.writeLock() : globalLock.readLock();
+ lock.unlock();
+ }
+ public int getGlobalLockCount() {
+ return globalLock.getReadLockCount() + (globalLock.isWriteLocked() ? +1 : 0);
+ }
+ public int getBucketLockCount() {
+ return bucketLocks.getTotalLockCount();
+ }
+ protected abstract void insertBucket(Bucket bucket) throws CacheLoaderException;
+ /**
+ * This method assumes that the bucket is already persisted in the database.
+ */
+ protected abstract void saveBucket(Bucket bucket) throws CacheLoaderException;
+ protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/bucket/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,32 @@
+package org.horizon.loader.bucket;
+import org.horizon.loader.AbstractCacheLoaderConfig;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+public class BucketBasedCacheStoreConfig extends AbstractCacheLoaderConfig {
+ private int lockConcurrencyLevel = 2048;
+ private long lockAcquistionTimeout = 60000;
+ public int getLockConcurrencyLevel() {
+ return lockConcurrencyLevel;
+ }
+ public void setLockConcurrencyLevel(int lockConcurrencyLevel) {
+ testImmutability("lockConcurrencyLevel");
+ this.lockConcurrencyLevel = lockConcurrencyLevel;
+ }
+ public long getLockAcquistionTimeout() {
+ return lockAcquistionTimeout;
+ }
+ public void setLockAcquistionTimeout(long lockAcquistionTimeout) {
+ testImmutability("lockAcquistionTimeout");
+ this.lockAcquistionTimeout = lockAcquistionTimeout;
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -98,11 +98,11 @@
return delegate.getConfigurationClass();
- public void start() {
+ public void start() throws CacheLoaderException {
- public void stop() {
+ public void stop() throws CacheLoaderException {
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -91,7 +91,7 @@
- public void start() {
+ public void start() throws CacheLoaderException {
queue = new
LinkedBlockingQueue<Modification>(asyncStoreConfig.getQueueSize());"Async cache loader starting {0}", this);
@@ -109,7 +109,7 @@
- public void stop() {
+ public void stop() throws CacheLoaderException {
if (executor != null) {
for (Future f : processorFutures) f.cancel(true);
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -123,11 +123,11 @@
return null;
- public void start() {
+ public void start() throws CacheLoaderException {
for (CacheLoader l : loaders.keySet()) l.start();
- public void stop() {
+ public void stop() throws CacheLoaderException {
for (CacheLoader l : loaders.keySet()) l.stop();
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -147,7 +147,7 @@
- public void start() {
+ public void start() throws CacheLoaderException {
cacheManager.addListener(new SingletonStoreListener());
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/
core/branches/flat/src/main/java/org/horizon/loader/file/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/file/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -2,27 +2,21 @@
import org.horizon.Cache;
import org.horizon.config.ConfigurationException;
-import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
-import org.horizon.lock.StripedLock;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.util.HashMap;
import java.util.HashSet;
-import java.util.Map;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
* A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}. This
file store stores stuff in the
@@ -45,138 +39,44 @@
* @author Manik Surtani
* @since 1.0
-public class FileCacheStore extends AbstractCacheStore {
+public class FileCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(FileCacheStore.class);
- // doesn't matter that we have such a large amount of file buckets since they are
created on disk on demand and
- // take up no extra memory
- private static final int NUM_BUCKETS = Integer.MAX_VALUE;
private int streamBufferSize;
ExecutorService purgerService;
- StripedLock bucketLocks;
- // This global lock guards against direct file system access via clear() and the
stream APIs. These three methods
- // would need exclusive (write) access to this lock while all others can use shared
(read) access to this lock since
- // other methods will use finer grained bucket locks.
- final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock();
FileCacheStoreConfig cfg;
Cache cache;
Marshaller m;
File root;
- long lockTimeout;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ super.init(config, cache, m);
this.cfg = (FileCacheStoreConfig) config;
this.cache = cache;
this.m = m;
- public StoredEntry load(Object key) throws CacheLoaderException {
- log.trace("Loading entry {0}", key);
- Bucket b = null;
- try {
- b = lockAndGetBucket(key, false, false);
- if (b == null) return null;
- StoredEntry se = b.getEntry(key);
- if (se != null && se.isExpired()) {
- b.removeEntry(key);
- saveBucket(b);
- return null;
- } else {
- return se;
- }
- } catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
- new CacheLoaderException("Problems loading key " + key, e);
- throw cle;
- } finally {
- unlockBucket(b);
- }
- }
public Set<StoredEntry> loadAll() throws CacheLoaderException {
log.trace("Loading all entries");
- Set<StoredEntry> set = new HashSet<StoredEntry>();
- try {
- for (File f : root.listFiles()) {
- Bucket b = null;
- try {
- b = lockAndGetBucket(f, false, false);
- if (b != null) {
- Set<Object> expiredOnBucket = new HashSet<Object>();
- for (StoredEntry e : b.entries.values()) {
- if (e.isExpired())
- expiredOnBucket.add(e.getKey());
- else
- set.add(e);
- }
- for (Object expired : expiredOnBucket) b.removeEntry(expired);
- saveBucket(b);
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ for (File bucketFile : root.listFiles()) {
+ String bucketName = bucketFile.getName();
+ try {
+ lockForReading(bucketName);
+ Bucket bucket = loadBucket(bucketFile);
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) {
+ saveBucket(bucket);
- } finally {
- unlockBucket(b);
+ result.addAll(bucket.getStoredEntries());
+ } finally {
+ unlock(bucketName);
- } catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
- new CacheLoaderException("Problems loading keys", e);
- throw cle;
- return set;
+ return result;
- public Class<? extends CacheLoaderConfig> getConfigurationClass() {
- return FileCacheStoreConfig.class;
- }
- public void start() {
- String location = cfg.getLocation();
- if (location == null || location.trim().length() == 0) location =
"Horizon-FileCacheStore"; // use relative path!
- location += File.separator + cache.getName();
- root = new File(location);
- if (!root.exists()) {
- if (!root.mkdirs())
- throw new ConfigurationException("Directory " +
root.getAbsolutePath() + " does not exist and cannot be created!");
- }
- if (cfg.isPurgeSynchronously()) {
- purgerService = new WithinThreadExecutor();
- } else {
- purgerService = Executors.newSingleThreadExecutor();
- }
- streamBufferSize = cfg.getStreamBufferSize();
- int lockConcurrencyLevel = cfg.getLockConcurrencyLevel();
- bucketLocks = new StripedLock(lockConcurrencyLevel);
- lockTimeout = cfg.getLockAcquistionTimeout();
- }
- public void stop() {
- purgerService.shutdownNow();
- }
- public void store(StoredEntry ed) throws CacheLoaderException {
- if (ed == null) return;
- if (ed.isExpired()) {
- log.trace("Entry {0} is expired! Not doing anything.", ed);
- return;
- }
- log.trace("Storing entry {0}", ed);
- Bucket b = null;
- try {
- b = lockAndGetBucket(ed.getKey(), true, true);
- b.addEntry(ed);
- saveBucket(b);
- } catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
- new CacheLoaderException("Problems storing entry with key " +
ed.getKey(), e);
- throw cle;
- } finally {
- unlockBucket(b);
- }
- }
public void fromStream(InputStream inputStream) throws CacheLoaderException {
ObjectInputStream ois = null;
try {
@@ -213,7 +113,7 @@
throw cle;
finally {
- releaseGlobalLock();
+ releaseGlobalLock(true);
// we should close the stream we created!
if (inputStream != ois) safeClose(ois);
@@ -225,15 +125,14 @@
oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
new ObjectOutputStream(outputStream);
File[] files = root.listFiles();
byte[] buffer = new byte[streamBufferSize];
- int bytesRead, totalBytesRead = 0;
for (File file : files) {
- FileInputStream is = new FileInputStream(file);
- int sz = is.available();
- BufferedInputStream bis = new BufferedInputStream(is);
+ int bytesRead, totalBytesRead = 0;
+ FileInputStream fileInStream = new FileInputStream(file);
+ int sz = fileInStream.available();
+ BufferedInputStream bis = new BufferedInputStream(fileInStream);
@@ -244,12 +143,12 @@
oos.write(buffer, 0, bytesRead);
- is.close();
+ fileInStream.close();
} catch (Exception ioe) {
throw new CacheLoaderException("Problems handling stream", ioe);
} finally {
- releaseGlobalLock();
+ releaseGlobalLock(true);
// we should close the stream we created!
if (oos != outputStream) safeClose(oos);
@@ -265,31 +164,10 @@
} catch (Exception e) {
throw new CacheLoaderException("Problems clearing cache store", e);
} finally {
- releaseGlobalLock();
+ releaseGlobalLock(true);
- public boolean remove(Object key) throws CacheLoaderException {
- log.trace("Removing key {0}", key);
- Bucket b = null;
- try {
- b = lockAndGetBucket(key, false, true);
- if (b == null) {
- return false;
- } else {
- boolean success = b.removeEntry(key);
- if (success) saveBucket(b);
- return success;
- }
- } catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
- new CacheLoaderException("Problems removing key " + key, e);
- throw cle;
- } finally {
- unlockBucket(b);
- }
- }
public void purgeExpired() {
purgerService.execute(new Runnable() {
public void run() {
@@ -302,133 +180,91 @@
- //
- // Buckets and bucket manipulators
- //
- private int hash(Object key) {
- int h = key.hashCode();
- h ^= (h >>> 20) ^ (h >>> 12);
- return h ^ (h >>> 7) ^ (h >>> 4);
+ protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
+ return loadBucket(new File(root, bucketName));
- private int index(int h) {
- return h & (NUM_BUCKETS - 1);
+ protected Bucket loadBucket(File bucketFile) throws CacheLoaderException {
+ Bucket bucket = null;
+ if (bucketFile.exists()) {
+ FileInputStream is = null;
+ ObjectInputStream ois = null;
+ try {
+ is = new FileInputStream(bucketFile);
+ ois = new ObjectInputStream(is);
+ bucket = (Bucket) ois.readObject();
+ } catch (Exception e) {
+ String message = "Error while reading from file: " +
+ log.error(message);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ safeClose(is);
+ safeClose(ois);
+ }
+ }
+ if (bucket != null) {
+ bucket.setBucketName(bucketFile.getName());
+ }
+ return bucket;
- final Bucket lockAndGetBucket(Object key, boolean create, boolean exclusiveLock)
throws IOException, ClassNotFoundException, TimeoutException, InterruptedException {
- int bucketNumber = index(hash(key));
- File bucket = new File(root, bucketNumber + ".bucket");
- return lockAndGetBucket(bucket, create, exclusiveLock);
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ saveBucket(bucket);
- final Bucket lockAndGetBucket(File f, boolean create, boolean exclusiveLock) throws
IOException, ClassNotFoundException, TimeoutException, InterruptedException {
- Bucket b = null;
- String bucketName = f.getName();
- // first get a shared lock on the global lock to make sure no state tfr is going
- acquireGlobalLock(false);
- bucketLocks.acquireLock(bucketName, exclusiveLock);
+ public final void saveBucket(Bucket b) throws CacheLoaderException {
+ File f = new File(root, b.getBucketName());
if (f.exists()) {
- FileInputStream is = new FileInputStream(f);
- ObjectInputStream ois = new ObjectInputStream(is);
- b = (Bucket) ois.readObject();
- ois.close();
- is.close();
- } else if (create) {
- b = new Bucket();
- b.entries = new HashMap<Object, StoredEntry>();
+ if (!f.delete()) log.warn("Had problems removing file {0}", f);
- if (b == null) {
- bucketLocks.releaseLock(bucketName); // don't bother holding locks for null
- releaseGlobalLock();
- } else b.fileName = bucketName;
- return b;
- }
- final void unlockBucket(Bucket b) {
- if (b != null) bucketLocks.releaseLock(b.fileName);
- releaseGlobalLock();
- }
- final void acquireGlobalLock(boolean exclusive) throws TimeoutException,
InterruptedException {
- Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
- if (!l.tryLock(lockTimeout, TimeUnit.MILLISECONDS))
- throw new TimeoutException("Timed out trying to acquire " + (exclusive
? "exclusive" : "shared") + " global lock after " +
lockTimeout + " millis. Lock is " + l);
- }
- final void releaseGlobalLock() {
- try {
- globalLock.readLock().unlock();
- } catch (IllegalMonitorStateException imse) {
- // no op
- }
- }
- final void saveBucket(Bucket b) throws IOException, CacheLoaderException {
- if (b.modified) {
- File f = new File(root, b.fileName);
- if (f.exists()) {
- if (!f.delete()) log.warn("Had problems removing file {0}", f);
+ if (!b.getEntries().isEmpty()) {
+ FileOutputStream fos = null;
+ ObjectOutputStream oos = null;
+ try {
+ fos = new FileOutputStream(f);
+ oos = new ObjectOutputStream(fos);
+ oos.writeObject(b);
+ oos.flush();
+ fos.flush();
+ } catch (IOException ex) {
+ log.error("Exception while saving bucket " + b, ex);
+ throw new CacheLoaderException(ex);
- if (!b.entries.isEmpty()) {
- FileOutputStream fos = null;
- ObjectOutputStream oos = null;
- try {
- fos = new FileOutputStream(f);
- oos = new ObjectOutputStream(fos);
- oos.writeObject(b);
- oos.flush();
- fos.flush();
- } finally {
- safeClose(oos);
- safeClose(fos);
- }
+ finally {
+ safeClose(oos);
+ safeClose(fos);
- b.modified = false; // reset this
- /**
- * A bucket is where entries are stored.
- */
- public final static class Bucket implements Externalizable {
- Map<Object, StoredEntry> entries;
- transient String fileName;
- transient boolean modified = false;
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return FileCacheStoreConfig.class;
+ }
- final void addEntry(StoredEntry se) {
- entries.put(se.getKey(), se);
- modified = true;
+ public void start() throws CacheLoaderException {
+ super.start();
+ String location = cfg.getLocation();
+ if (location == null || location.trim().length() == 0) location =
"Horizon-FileCacheStore"; // use relative path!
+ location += File.separator + cache.getName();
+ root = new File(location);
+ if (!root.exists()) {
+ if (!root.mkdirs())
+ throw new ConfigurationException("Directory " +
root.getAbsolutePath() + " does not exist and cannot be created!");
- final boolean removeEntry(Object key) {
- if (entries.remove(key) != null) {
- modified = true;
- return true;
- }
- return false;
+ if (cfg.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ streamBufferSize = cfg.getStreamBufferSize();
+ }
- final StoredEntry getEntry(Object key) {
- return entries.get(key);
- }
+ public void stop() {
+ purgerService.shutdownNow();
+ }
- public final void writeExternal(ObjectOutput out) throws IOException {
- out.writeInt(entries.size());
- for (StoredEntry se : entries.values()) out.writeObject(se);
- }
- public final void readExternal(ObjectInput in) throws IOException,
ClassNotFoundException {
- int sz = in.readInt();
- entries = new HashMap<Object, StoredEntry>(sz);
- for (int i = 0; i < sz; i++) {
- StoredEntry se = (StoredEntry) in.readObject();
- entries.put(se.getKey(), se);
- }
- }
+ public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
+ return loadBucket(key.hashCode() + "");
core/branches/flat/src/main/java/org/horizon/loader/file/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/main/java/org/horizon/loader/file/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -1,6 +1,6 @@
package org.horizon.loader.file;
-import org.horizon.loader.AbstractCacheLoaderConfig;
+import org.horizon.loader.bucket.BucketBasedCacheStoreConfig;
* Configures {@link org.horizon.loader.file.FileCacheStore}. This allows you to tune a
number of characteristics of
@@ -20,12 +20,10 @@
* @author Manik Surtani
* @since 1.0
-public class FileCacheStoreConfig extends AbstractCacheLoaderConfig {
+public class FileCacheStoreConfig extends BucketBasedCacheStoreConfig {
String location = "Horizon-FileCacheStore";
private boolean purgeSynchronously = false;
private int streamBufferSize = 8192;
- private int lockConcurrencyLevel = 2048;
- private long lockAcquistionTimeout = 60000;
public FileCacheStoreConfig() {
@@ -57,22 +55,4 @@
this.streamBufferSize = streamBufferSize;
- public int getLockConcurrencyLevel() {
- return lockConcurrencyLevel;
- }
- public void setLockConcurrencyLevel(int lockConcurrencyLevel) {
- testImmutability("lockConcurrencyLevel");
- this.lockConcurrencyLevel = lockConcurrencyLevel;
- }
- public long getLockAcquistionTimeout() {
- return lockAcquistionTimeout;
- }
- public void setLockAcquistionTimeout(long lockAcquistionTimeout) {
- testImmutability("lockAcquistionTimeout");
- this.lockAcquistionTimeout = lockAcquistionTimeout;
- }
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,30 @@
+package org.horizon.loader.jdbc;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.util.Util;
+import java.sql.Connection;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+public abstract class ConnectionFactory {
+ public static ConnectionFactory getConnectionFactory(JdbcCacheStoreConfig config)
throws CacheLoaderException {
+ try {
+ return (ConnectionFactory)
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+ public abstract void start(JdbcCacheStoreConfig config) throws CacheLoaderException;
+ public abstract void stop();
+ public abstract Connection getConnection() throws CacheLoaderException;
+ public abstract void releaseConnection(Connection conn);
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,240 @@
+package org.horizon.loader.jdbc;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.Cache;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
+import org.horizon.marshall.Marshaller;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Set;
+import java.util.HashSet;
+ * // TODO: Manik: Document this!
+ *
+ * @author Mircea.Markus(a)
+ */
+public class JdbcCacheStore extends BucketBasedCacheStore {
+ private static final Log log = LogFactory.getLog(JdbcCacheStore.class);
+ private JdbcCacheStoreConfig config;
+ private ConnectionFactory connectionFactory;
+ private Marshaller marshaller;
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ if (log.isTraceEnabled())
+ log.trace("Initializing JdbcCacheStore " + config);
+ super.init(config, cache, m);
+ this.config = (JdbcCacheStoreConfig) config;
+ this.marshaller = m;
+ }
+ public void start() throws CacheLoaderException {
+ super.start();
+ this.connectionFactory = ConnectionFactory.getConnectionFactory(config);
+ connectionFactory.start(config);
+ //create table if needed
+ if (config.isCreateTableOnStart()) {
+ Connection conn = getConnection();
+ try {
+ TableManipulation tm = new TableManipulation(conn, config);
+ if (!tm.tableExists()) {
+ tm.createTable();
+ }
+ } finally {
+ releaseConnection(conn);
+ }
+ }
+ }
+ public void stop() throws CacheLoaderException {
+ if (config.isDropTableOnExit()) {
+ Connection connection = getConnection();
+ try {
+ TableManipulation tm = new TableManipulation(connection, config);
+ tm.dropTable();
+ } finally {
+ releaseConnection(connection);
+ }
+ }
+ connectionFactory.stop();
+ }
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = config.getInsertBucketSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running insertBucket. Sql: '" + sql + "',
on bucket: " + bucket);
+ }
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, bucket.getBucketName());
+ ByteBuffer byteBuffer = marshall(bucket);
+ ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
+ int insertedRows = ps.executeUpdate();
+ if (insertedRows != 1) {
+ throw new CacheLoaderException("Unexpected insert result: '" +
insertedRows + "'. Expected values is 1");
+ }
+ } catch (SQLException ex) {
+ logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
+ }
+ protected void saveBucket(Bucket bucket) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = config.getSaveBucketSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running saveBucket. Sql: '" + sql + "', on
bucket: " + bucket);
+ }
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ ByteBuffer buffer = marshall(bucket);
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setString(2, bucket.getBucketName());
+ int updatedRows = ps.executeUpdate();
+ if (updatedRows != 1) {
+ throw new CacheLoaderException("Unexpected update result: '" +
updatedRows + "'. Expected values is 1");
+ }
+ } catch (SQLException e) {
+ logAndThrow(e, "sql failure while updating bucket: " + bucket);
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
+ }
+ protected Bucket loadBucket(String keyHashCode) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = config.getLoadBucketSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running loadBucket. Sql: '" + sql + "', on
key: " + keyHashCode);
+ }
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, keyHashCode);
+ rs = ps.executeQuery();
+ if (! return null;
+ String bucketName = rs.getString(1);
+ InputStream inputStream = rs.getBinaryStream(2);
+ Bucket bucket = unmarshall(inputStream);
+ bucket.setBucketName(bucketName);//bucket name is volatile, so not persisted.
+ return bucket;
+ } catch (SQLException e) {
+ String message = "sql failure while loading key: " + keyHashCode;
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
+ }
+ public Set<StoredEntry> loadAll() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = config.getLoadAllSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running loadAll. Sql: '" + sql +
+ }
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ } catch (SQLException e) {
+ String message = "sql failure while loading key: ";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
+ return null;
+ }
+ public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ throw new IllegalStateException("TODO - please implement me!!!"); //todo
+ }
+ public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ throw new IllegalStateException("TODO - please implement me!!!"); //todo
+ }
+ public void clear() throws CacheLoaderException {
+ throw new IllegalStateException("TODO - please implement me!!!"); //todo
+ }
+ public void purgeExpired() throws CacheLoaderException {
+ throw new IllegalStateException("TODO - please implement me!!!"); //todo
+ }
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return JdbcCacheStoreConfig.class;
+ }
+ private Connection getConnection() throws CacheLoaderException {
+ return connectionFactory.getConnection();
+ }
+ private void releaseConnection(Connection conn) {
+ connectionFactory.releaseConnection(conn);
+ }
+ private ByteBuffer marshall(Bucket bucket) throws CacheLoaderException {
+ try {
+ return marshaller.objectToBuffer(bucket);
+ } catch (IOException e) {
+ String message = "I/O failure while marshalling " + bucket;
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ }
+ private Bucket unmarshall(InputStream inputStream) throws CacheLoaderException {
+ try {
+ return (Bucket) marshaller.objectFromStream(inputStream);
+ } catch (IOException e) {
+ String message = "I/O error while unmarshalling from stram";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } catch (ClassNotFoundException e) {
+ String message = "*UNEXPECTED* ClassNotFoundException. This should not
happen as Bucket class exists";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ }
+ private void logAndThrow(Exception e, String message) throws CacheLoaderException {
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,181 @@
+package org.horizon.loader.jdbc;
+import org.horizon.loader.bucket.BucketBasedCacheStoreConfig;
+ * // TODO: Manik: Document this!
+ *
+ * @author Manik Surtani
+ */
+public class JdbcCacheStoreConfig extends BucketBasedCacheStoreConfig {
+ /*
+ * following two params manage creation and destruction during start up/shutdown.
+ */
+ boolean createTableOnStart = true;
+ boolean dropTableOnExit = false;
+ private String connectionFactoryClass;
+ /* required by NonManagedConnectionFactory */
+ private String connectionUrl;
+ private String userName;
+ private String password;
+ private String driverClass;
+ /* attributes defining the table where data will be persisted */
+ private String tableName;
+ private String primaryKey;
+ private String keyColumnName;
+ private String keyColumnType;
+ private String dataColumnName;
+ private String dataColumnType;
+ private String insertBucketSql;
+ private String saveBucketSql;
+ private String loadBucketSql;
+ private String loadAllSql;
+ public JdbcCacheStoreConfig() {
+ className = JdbcCacheStore.class.getName();
+ }
+ public void setCreateTableOnStart(boolean createTableOnStart) {
+ this.createTableOnStart = createTableOnStart;
+ }
+ public boolean isCreateTableOnStart() {
+ return createTableOnStart;
+ }
+ public boolean isDropTableOnExit() {
+ return dropTableOnExit;
+ }
+ public void setDropTableOnExit(boolean dropTableOnExit) {
+ this.dropTableOnExit = dropTableOnExit;
+ }
+ public String getConnectionUrl() {
+ return connectionUrl;
+ }
+ public void setConnectionUrl(String connectionUrl) {
+ this.connectionUrl = connectionUrl;
+ }
+ public String getUserName() {
+ return userName;
+ }
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+ public String getPassword() {
+ return password;
+ }
+ public void setPassword(String password) {
+ this.password = password;
+ }
+ public String getDriverClass() {
+ return driverClass;
+ }
+ public void setDriverClass(String driverClass) {
+ this.driverClass = driverClass;
+ }
+ public String getConnectionFactoryClass() {
+ return connectionFactoryClass;
+ }
+ public void setConnectionFactoryClass(String connectionFactoryClass) {
+ this.connectionFactoryClass = connectionFactoryClass;
+ }
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+ public String getTableName() {
+ return tableName;
+ }
+ public String getPrimaryKey() {
+ return primaryKey;
+ }
+ public void setPrimaryKey(String primaryKey) {
+ this.primaryKey = primaryKey;
+ }
+ public String getKeyColumnName() {
+ return keyColumnName;
+ }
+ public void setKeyColumnName(String keyColumnName) {
+ this.keyColumnName = keyColumnName;
+ }
+ public String getKeyColumnType() {
+ return keyColumnType;
+ }
+ public void setKeyColumnType(String keyColumnType) {
+ this.keyColumnType = keyColumnType;
+ }
+ public String getDataColumnName() {
+ return dataColumnName;
+ }
+ public void setDataColumnName(String dataColumnName) {
+ this.dataColumnName = dataColumnName;
+ }
+ public String getDataColumnType() {
+ return dataColumnType;
+ }
+ public void setDataColumnType(String dataColumnType) {
+ this.dataColumnType = dataColumnType;
+ }
+ @Override
+ public JdbcCacheStoreConfig clone() {
+ JdbcCacheStoreConfig dolly = (JdbcCacheStoreConfig) super.clone();
+ //don't have to assign any variables as all are primitives, and cannot change
+ return dolly;
+ }
+ public String getInsertBucketSql() {
+ if (insertBucketSql == null) {
+ insertBucketSql = "INSERT INTO " + tableName + " (" +
keyColumnName + ", " + dataColumnName + ")";
+ }
+ return insertBucketSql;
+ }
+ public String getSaveBucketSql() {
+ if (saveBucketSql == null) {
+ saveBucketSql = "UPDATE " + tableName + " SET " +
dataColumnName + " = ? WHERE " + keyColumnName + " = ?";
+ }
+ return saveBucketSql;
+ }
+ public String getLoadBucketSql() {
+ if (loadBucketSql == null) {
+ loadBucketSql = "SELECT " + keyColumnName + ", " +
dataColumnName + " FROM " + tableName + " WHERE " + keyColumnName +
" = ?";
+ }
+ return loadBucketSql;
+ }
+ public String getLoadAllSql() {
+ if (loadAllSql == null) {
+ loadAllSql = "SELECT " + dataColumnName + " FROM " +
+ }
+ return loadAllSql;
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/
(rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,32 @@
+package org.horizon.loader.jdbc;
+import java.sql.Statement;
+import java.sql.SQLException;
+import java.sql.ResultSet;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+public class JdbcUtil {
+ public static void safeClose(Statement ps) {
+ if (ps != null) {
+ try {
+ ps.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public static void safeClose(ResultSet rs) {
+ if (rs != null) {
+ try {
+ rs.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,64 @@
+package org.horizon.loader.jdbc;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+public class NonManagedConnectionFactory extends ConnectionFactory {
+ private static Log log = LogFactory.getLog(NonManagedConnectionFactory.class);
+ private String connectionUrl;
+ private String userName;
+ private String password;
+ public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
+ loadDriver(config.getDriverClass());
+ this.connectionUrl = config.getConnectionUrl();
+ this.userName = config.getUserName();
+ this.password = config.getPassword();
+ }
+ public void stop() {
+ //do nothing
+ }
+ public Connection getConnection() throws CacheLoaderException {
+ try {
+ return DriverManager.getConnection(connectionUrl, userName, password);
+ } catch (SQLException e) {
+ throw new CacheLoaderException("Could not obtain a new connection",
+ }
+ }
+ public void releaseConnection(Connection conn) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ log.warn("Failure while closing the connection to the database ", e);
+ }
+ }
+ private void loadDriver(String driverClass) throws CacheLoaderException {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Attempting to load driver " + driverClass);
+ }
+ Class.forName(driverClass).newInstance();
+ }
+ catch (Throwable th) {
+ String message = "Failed loading driver with class: '" +
driverClass + "'";
+ log.error(message, th);
+ throw new CacheLoaderException(message, th);
+ }
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,133 @@
+package org.horizon.loader.jdbc;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+import java.sql.Connection;
+import java.sql.DatabaseMetaData;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Locale;
+ * // TODO: Mircea: Document this!
+ *
+ * @author Mircea.Markus(a)
+ */
+public class TableManipulation {
+ private Connection connection;
+ private JdbcCacheStoreConfig config;
+ private static Log log = LogFactory.getLog(TableManipulation.class);
+ public TableManipulation(Connection connection, JdbcCacheStoreConfig config) {
+ this.connection = connection;
+ this.config = config;
+ }
+ public boolean tableExists() throws CacheLoaderException {
+ return tableExists(config.getTableName());
+ }
+ public boolean tableExists(String tableName) throws CacheLoaderException {
+ assrtNotNull(config.getTableName(), "table name is mandatory");
+ ResultSet rs = null;
+ try {
+ // (a j2ee spec compatible jdbc driver has to fully
+ // implement the DatabaseMetaData)
+ DatabaseMetaData dmd = connection.getMetaData();
+ String catalog = connection.getCatalog();
+ String schema = null;
+ String quote = dmd.getIdentifierQuoteString();
+ if (tableName.startsWith(quote)) {
+ if (!tableName.endsWith(quote)) {
+ throw new IllegalStateException("Mismatched quote in table name:
" + tableName);
+ }
+ int quoteLength = quote.length();
+ tableName = tableName.substring(quoteLength, tableName.length() -
+ if (dmd.storesLowerCaseQuotedIdentifiers()) {
+ tableName = toLowerCase(tableName);
+ } else if (dmd.storesUpperCaseQuotedIdentifiers()) {
+ tableName = toUpperCase(tableName);
+ }
+ } else {
+ if (dmd.storesLowerCaseIdentifiers()) {
+ tableName = toLowerCase(tableName);
+ } else if (dmd.storesUpperCaseIdentifiers()) {
+ tableName = toUpperCase(tableName);
+ }
+ }
+ int dotIndex;
+ if ((dotIndex = tableName.indexOf('.')) != -1) {
+ // Yank out schema name ...
+ schema = tableName.substring(0, dotIndex);
+ tableName = tableName.substring(dotIndex + 1);
+ }
+ rs = dmd.getTables(catalog, schema, tableName, null);
+ return;
+ }
+ catch (SQLException e) {
+ // This should not happen. A J2EE compatible JDBC driver is
+ // required fully support meta data.
+ throw new IllegalStateException("Error while checking if table aleady
exists " + tableName, e);
+ }
+ finally {
+ JdbcUtil.safeClose(rs);
+ }
+ }
+ public void createTable() throws CacheLoaderException {
+ // removed CONSTRAINT clause as this causes problems with some databases, like
+ assertMandatoryElemenetsPresent();
+ String creatTableDdl = "CREATE TABLE " + config.getTableName() +
"(" + config.getKeyColumnName() + " " + config.getKeyColumnType()
+ + " NOT NULL, " + config.getDataColumnName() + " " +
config.getDataColumnType() +
+ ", PRIMARY KEY (" + config.getKeyColumnName() + "))";
+ if (log.isTraceEnabled())
+ log.trace("Creating table with following DDL: '" + creatTableDdl +
+ executeDdlStatement(creatTableDdl);
+ }
+ private void assertMandatoryElemenetsPresent() throws CacheLoaderException {
+ assrtNotNull(config.getKeyColumnType(), "keyColumnType needed in order to
create table");
+ assrtNotNull(config.getKeyColumnName(), "keyColumnName needed in order to
create table");
+ assrtNotNull(config.getTableName(), "tableName needed in order to create
+ assrtNotNull(config.getDataColumnName(), "dataColumnName needed in order to
create table");
+ assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to
create table");
+ }
+ private void assrtNotNull(String keyColumnType, String message) throws
CacheLoaderException {
+ if (keyColumnType == null || keyColumnType.trim().length() == 0) throw new
+ }
+ private void executeDdlStatement(String creatTableDdl) throws CacheLoaderException {
+ Statement statement = null;
+ try {
+ statement = connection.createStatement();
+ statement.executeUpdate(creatTableDdl);
+ } catch (SQLException e) {
+ log.error("Error while creating table",e);
+ throw new CacheLoaderException(e);
+ } finally {
+ JdbcUtil.safeClose(statement);
+ }
+ }
+ public void dropTable() throws CacheLoaderException {
+ String dropTableDdl = "DROP TABLE " + config.getTableName();
+ if (log.isTraceEnabled())
+ log.trace("Dropping table with following DDL '" + dropTableDdl +
+ executeDdlStatement(dropTableDdl);
+ }
+ private static String toLowerCase(String s) {
+ return s.toLowerCase((Locale.ENGLISH));
+ }
+ private static String toUpperCase(String s) {
+ return s.toUpperCase(Locale.ENGLISH);
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/horizon/lock/
--- core/branches/flat/src/main/java/org/horizon/lock/ 2009-02-24 22:02:12
UTC (rev 7777)
+++ core/branches/flat/src/main/java/org/horizon/lock/ 2009-02-25 10:23:49
UTC (rev 7778)
@@ -142,4 +142,16 @@
public void acquireAllLocks(List<Object> keys, boolean exclusive) {
for (Object k : keys) acquireLock(k, exclusive);
+ /**
+ * Returns the total number of locks held by this class.
+ */
+ public int getTotalLockCount() {
+ int count = 0;
+ for (ReentrantReadWriteLock lock : sharedLocks) {
+ count += lock.getReadLockCount();
+ count += lock.isWriteLocked() ? 1 : 0;
+ }
+ return count;
+ }
core/branches/flat/src/test/java/org/horizon/config/parsing/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/test/java/org/horizon/config/parsing/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -8,7 +8,7 @@
import org.horizon.eviction.algorithms.fifo.FIFOAlgorithmConfig;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.decorators.SingletonStoreConfig;
-import org.horizon.loader.jdbc.JDBCCacheStoreConfig;
+import org.horizon.loader.jdbc.JdbcCacheStoreConfig;
import org.horizon.lock.IsolationLevel;
import org.horizon.transaction.GenericTransactionManagerLookup;
import org.testng.annotations.Test;
@@ -152,16 +152,17 @@
assert !c.isUseReplQueue();
+ @Test (enabled = false)
public void testCacheLoaders() throws Exception {
XmlConfigurationParserImpl parser = new XmlConfigurationParserImpl();
String xml = "<loaders passivation=\"true\"
shared=\"true\" preload=\"true\">\n" +
- " <loader
fetchPersistentState=\"true\"\n" +
+ " <loader
fetchPersistentState=\"true\"\n" +
" ignoreModifications=\"false\"
purgeOnStartup=\"false\">\n" +
" <properties>\n" +
" dataSource=HorizonDS\n" +
" tableNamePrefix=horizon\n" +
" createTable=true\n" +
- " dropTable=false\n" +
+ " dropTableOnExit=false\n" +
" </properties>\n" +
" <singletonStore enabled=\"true\"
pushStateWhenCoordinator=\"true\" pushStateTimeout=\"20000\"
/>\n" +
" <async enabled=\"true\"
batchSize=\"15\" />\n" +
@@ -180,7 +181,7 @@
assert clc.isPreload();
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
- assert
+ assert
assert iclc.getAsyncStoreConfig().isEnabled();
assert iclc.getAsyncStoreConfig().getBatchSize() == 15;
assert iclc.getAsyncStoreConfig().getPollWait() == 100;
@@ -190,11 +191,12 @@
assert !iclc.isIgnoreModifications();
assert !iclc.isPurgeOnStartup();
- JDBCCacheStoreConfig jdbcclc = (JDBCCacheStoreConfig) iclc;
- assert jdbcclc.getDataSource().equals("HorizonDS");
- assert jdbcclc.getTableNamePrefix().equals("horizon");
- assert jdbcclc.isCreateTable();
- assert !jdbcclc.isDropTable();
+ JdbcCacheStoreConfig jdbcclc = (JdbcCacheStoreConfig) iclc;
+// assert jdbcclc.getDataSource().equals("HorizonDS");
+// assert jdbcclc.getTableNamePrefix().equals("horizon");
+ assert false : "todo update test according to config";
+ assert jdbcclc.isCreateTableOnStart();
+ assert !jdbcclc.isDropTableOnExit();
SingletonStoreConfig ssc = iclc.getSingletonStoreConfig();
assert ssc.isSingletonStoreEnabled();
@@ -205,7 +207,7 @@
public void testCacheLoadersDefaults() throws Exception {
XmlConfigurationParserImpl parser = new XmlConfigurationParserImpl();
String xml = "<loaders>\n" +
- " <loader
class=\"org.horizon.loader.jdbc.JDBCCacheStore\">\n" +
+ " <loader
class=\"org.horizon.loader.jdbc.JdbcCacheStore\">\n" +
" <properties />\n" +
" </loader>\n" +
" </loaders>";
@@ -222,7 +224,7 @@
assert !clc.isPreload();
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
- assert
+ assert
assert !iclc.getAsyncStoreConfig().isEnabled();
assert !iclc.isFetchPersistentState();
assert !iclc.isIgnoreModifications();
Modified: core/branches/flat/src/test/java/org/horizon/loader/
--- core/branches/flat/src/test/java/org/horizon/loader/ 2009-02-24
22:02:12 UTC (rev 7777)
+++ core/branches/flat/src/test/java/org/horizon/loader/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -29,12 +29,12 @@
// this needs to be here for the test to run in an IDE
public abstract class BaseCacheStoreTest {
- protected abstract CacheStore createCacheStore();
+ protected abstract CacheStore createCacheStore() throws Exception;
protected CacheStore cs;
- public void setUp() {
+ public void setUp() throws Exception {
cs = createCacheStore();
@@ -225,7 +225,7 @@
assert expected.isEmpty();
- public void testPurgeExpired() throws InterruptedException, Exception {
+ public void testPurgeExpired() throws Exception {
long now = System.currentTimeMillis();
long lifespan = 1000; StoredEntry("k1", "v1", now, now + lifespan));
@@ -271,8 +271,8 @@
public void testConcurrency() throws Exception {
- int numThreads = 5;
- final int loops = 1000;
+ int numThreads = 3;
+ final int loops = 500;
final String[] keys = new String[10];
final String[] values = new String[10];
for (int i = 0; i < 10; i++) keys[i] = "k" + i;
@@ -319,7 +319,7 @@
Thread[] threads = new Thread[numThreads];
for (int i = 0; i < numThreads; i++) {
- threads[i] = new Thread() {
+ threads[i] = new Thread(getClass().getSimpleName() + "-" + i) {
public void run() {
for (int i = 0; i < loops; i++) {;
Modified: core/branches/flat/src/test/java/org/horizon/loader/decorators/
core/branches/flat/src/test/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/test/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -2,6 +2,7 @@
import org.horizon.CacheException;
import org.horizon.loader.StoredEntry;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.dummy.DummyInMemoryCacheStore;
import org.horizon.test.TestingUtil;
import org.testng.annotations.AfterMethod;
@@ -19,7 +20,7 @@
- public void setUp() {
+ public void setUp() throws CacheLoaderException {
store = new AsyncStore(new DummyInMemoryCacheStore(), new AsyncStoreConfig());
DummyInMemoryCacheStore.Cfg cfg = new DummyInMemoryCacheStore.Cfg();
@@ -29,7 +30,7 @@
- public void tearDown() {
+ public void tearDown() throws CacheLoaderException {
if (store != null) store.stop();
core/branches/flat/src/test/java/org/horizon/loader/decorators/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/test/java/org/horizon/loader/decorators/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -33,7 +33,7 @@
DummyInMemoryCacheStore[] stores; // for convenient iteration
private static final long lifespan = 6000000;
- protected CacheStore createCacheStore() {
+ protected CacheStore createCacheStore() throws CacheLoaderException {
ChainingCacheStore store = new ChainingCacheStore();
CacheLoaderConfig cfg;
store1 = new DummyInMemoryCacheStore();
core/branches/flat/src/test/java/org/horizon/loader/file/ 2009-02-24
22:02:12 UTC (rev 7777)
core/branches/flat/src/test/java/org/horizon/loader/file/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -1,30 +1,43 @@
package org.horizon.loader.file;
import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
import org.horizon.test.TestingUtil;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
-@Test(groups = "unit", enabled = true, testName =
+@Test(groups = "unit", testName = "loader.file.FileCacheStoreTest")
public class FileCacheStoreTest extends BaseCacheStoreTest {
private final String tmpDirectory = TestingUtil.TEST_FILES + File.separator +
+ private FileCacheStore fcs;
- protected CacheStore createCacheStore() {
- CacheStore cs = new FileCacheStore();
+ protected CacheStore createCacheStore() throws CacheLoaderException {
+ fcs = new FileCacheStore();
FileCacheStoreConfig cfg = new FileCacheStoreConfig();
cfg.setPurgeSynchronously(true); // for more accurate unit testing
- cs.init(cfg, getCache(), getMarshaller());
- cs.start();
- return cs;
+ fcs.init(cfg, getCache(), getMarshaller());
+ fcs.start();
+ return fcs;
+ @AfterMethod
+ public void assertNoLocksHeldAfterTest() {
+ assert fcs.getBucketLockCount() == 0;
+ assert fcs.getGlobalLockCount() == 0;
+ }
public void removeTempDirectory() {
@@ -32,6 +45,11 @@
+ public void testPreload() throws CacheLoaderException {
+ super.testPreload();
+ }
+ @Override
public void testPurgeExpired() throws Exception {
long now = System.currentTimeMillis();
long lifespan = 1000;
@@ -44,36 +62,45 @@
Thread.sleep(lifespan + 100);
FileCacheStore fcs = (FileCacheStore) cs;
- assert fcs.lockAndGetBucket("k1", false, false) == null;
- assert fcs.lockAndGetBucket("k2", false, false) == null;
- assert fcs.lockAndGetBucket("k3", false, false) == null;
- System.out.println("Global lock: " +
+ assert fcs.load("k1") == null;
+ assert fcs.load("k2") == null;
+ assert fcs.load("k3") == null;
public void testBucketRemoval() throws Exception {
- FileCacheStore fcs = (FileCacheStore) cs;
- FileCacheStore.Bucket b = null;
- try {
- b = fcs.lockAndGetBucket("test", true, false);
- assert b != null;
- assert !b.modified;
- b.addEntry(new StoredEntry("test", "value"));
- assert b.modified;
+ Bucket b;
+ StoredEntry se = new StoredEntry("test", "value");
+ b = fcs.loadBucketContainingKey("test");
+ assert b != null;
- fcs.saveBucket(b);
- assert !b.modified;
- assert !b.entries.isEmpty();
+ assert !b.getEntries().isEmpty();
- assert new File(fcs.root, b.fileName).exists();
+ assert new File(fcs.root, b.getBucketName()).exists();
- b.removeEntry("test");
- assert b.entries.isEmpty();
- assert b.modified;
+ b.removeEntry("test");
+ assert b.getEntries().isEmpty();
- fcs.saveBucket(b);
- assert !new File(fcs.root, b.fileName).exists();
+ fcs.saveBucket(b);
+ assert !new File(fcs.root, b.getBucketName()).exists();
+ }
+ public void testToStream() throws Exception {
+ StoredEntry("k1", "v1", -1, -1));
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ cs.toStream(out);
+ out.flush();
+ out.close();
+ ObjectInputStream ois = null;
+ try {
+ ois = new ObjectInputStream(new ByteArrayInputStream(out.toByteArray()));
+ assert ois.readInt() == 1 : "we have 3 different buckets";
+ assert ois.readObject().equals("k1".hashCode() + "");
+ assert ois.readInt() > 0; //size on disk
} finally {
- fcs.unlockBucket(b);
+ if (ois != null) ois.close();
Added: core/branches/flat/src/test/java/org/horizon/loader/jdbc/
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/test/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,38 @@
+package org.horizon.loader.jdbc;
+import org.horizon.loader.CacheStore;
+import org.testng.annotations.Test;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+@Test(groups = "functional", testName =
"loader.jdbc.JdbcCacheStoreTest", enabled = false)
+public class JdbcCacheStoreTest /*extends BaseCacheStoreTest*/ {
+ private JdbcCacheStore jdbcCacheStore;
+ protected CacheStore createCacheStore() throws Exception {
+ try {
+ jdbcCacheStore = new JdbcCacheStore();
+ JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
+ config.setConnectionFactoryClass(NonManagedConnectionFactory.class.getName());
+ config.setConnectionUrl("jdbc:mysql://localhost/horizon");
+ config.setUserName("root");
+ config.setPassword("root");
+ config.setDriverClass("com.mysql.jdbc.Driver");
+ config.setTableName("horizon_jdc");
+ config.setKeyColumnName("key_name");
+ config.setKeyColumnType("varchar(255)");
+ config.setDataColumnName("BUCKET");
+ config.setDataColumnType("BINARY");
+ jdbcCacheStore.init(config, null, null);
+ jdbcCacheStore.start();
+ return jdbcCacheStore;
+ } catch (Throwable e) {
+ e.printStackTrace(); // TODO: Mircea: Customise this generated block
+ throw (Exception) e;
+ }
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/
(rev 0)
core/branches/flat/src/test/java/org/horizon/loader/jdbc/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,139 @@
+package org.horizon.loader.jdbc;
+import static org.easymock.EasyMock.*;
+import org.horizon.loader.CacheLoaderException;
+import org.testng.annotations.AfterTest;
+import org.testng.annotations.BeforeTest;
+import org.testng.annotations.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.sql.Statement;
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+@Test(groups = "functional", testName
="loader.jdbc.TableManipulationTest", enabled = false)
+public class TableManipulationTest {
+ Connection connection;
+ TableManipulation tableManipulation;
+ JdbcCacheStoreConfig config;
+ @BeforeTest
+ public void createConnection() throws Exception {
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ connection =
DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root",
+ Statement st = connection.createStatement();
+ try {
+ st.executeUpdate("DROP TABLE horizon_test");
+ } catch (SQLException e) {
+ //ignore, might be the table does not exist
+ }
+ JdbcUtil.safeClose(st);
+ config = new JdbcCacheStoreConfig();
+ config.setKeyColumnType("VARCHAR(255)");
+ config.setDataColumnType("BLOB");
+ config.setTableName("horizon_test");
+ config.setKeyColumnName("KEY_HASH");
+ config.setDataColumnName("BUCKET");
+ tableManipulation = new TableManipulation(connection, config);
+ }
+ @AfterTest
+ public void closeConnection() throws SQLException {
+ connection.close();
+ }
+ public void testInsufficientConfigParams() throws Exception {
+ JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
+ config.setKeyColumnType("VARCHAR(255)");
+ config.setDataColumnType("BLOB");
+ config.setTableName("horizon");
+ config.setKeyColumnName("dsadsa");
+ config.setDataColumnName("dsadsa");
+ Connection mockConnection = createMock(Connection.class);
+ Statement mockStatement = createNiceMock(Statement.class);
+ expect(mockConnection.createStatement()).andReturn(mockStatement);
+ replay(mockConnection, mockStatement);
+ TableManipulation other = new TableManipulation(mockConnection, config);
+ try {
+ other.createTable();
+ } catch (CacheLoaderException e) {
+ assert false : "We do not expect a failure here";
+ }
+ config.setKeyColumnType(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setKeyColumnType("VARCHAR(255)");
+ assert true : "We do not expect a failure here";
+ }
+ config.setKeyColumnName("");
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setKeyColumnName("abc");
+ assert true : "We do not expect a failure here";
+ }
+ config.setTableName(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setTableName("abc");
+ assert true : "We do not expect a failure here";
+ }
+ config.setDataColumnName(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setDataColumnName("abc");
+ assert true : "We do not expect a failure here";
+ }
+ }
+ public void testCreateTable() throws Exception {
+ assert !existsTable(config.getTableName());
+ tableManipulation.createTable();
+ assert existsTable(config.getTableName());
+ }
+ @Test(dependsOnMethods = "testCreateTable")
+ public void testExists() throws CacheLoaderException {
+ assert tableManipulation.tableExists();
+ assert !tableManipulation.tableExists("does_not_exist");
+ }
+ @Test(dependsOnMethods = "testExists")
+ public void testDrop() throws CacheLoaderException {
+ assert tableManipulation.tableExists();
+ tableManipulation.dropTable();
+ assert !tableManipulation.tableExists();
+ }
+ private boolean existsTable(String tableName) throws Exception {
+ Statement st = connection.createStatement();
+ ResultSet rs = null;
+ try {
+ rs = st.executeQuery("select * from " + tableName);
+ return true;
+ } catch (SQLException e) {
+ return false;
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(st);
+ }
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/test/java/org/horizon/test/
--- core/branches/flat/src/test/java/org/horizon/test/
(rev 0)
core/branches/flat/src/test/java/org/horizon/test/ 2009-02-25
10:23:49 UTC (rev 7778)
@@ -0,0 +1,116 @@
+package org.horizon.test;
+import org.horizon.loader.jdbc.JdbcCacheStoreConfig;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+ * @author Mircea.Markus(a)
+ */
+public class UnitTestDatabaseManager {
+ private static final JdbcCacheStoreConfig realConfig = new JdbcCacheStoreConfig();
+ private static AtomicInteger userIndex = new AtomicInteger(0);
+ static {
+ realConfig.setTableName("horizon");
+ realConfig.setCreateTableOnStart(true);
+ realConfig.setPrimaryKey("horizon_pk");
+ realConfig.setKeyColumnName("key");
+ realConfig.setKeyColumnType("varchar(255)");
+ realConfig.setDataColumnName("bucket");
+ realConfig.setDataColumnType("BINARY");
+ realConfig.setDriverClass("org.hsqldb.jdbcDriver");
+ realConfig.setConnectionUrl("jdbc:hsqldb:mem:jbosscache");
+ realConfig.setUserName("sa");
+ }
+ public static JdbcCacheStoreConfig getUniqueJdbcCacheStoreConfig() {
+ synchronized (realConfig) {
+ return returnBasedOnDifferentInstance();
+ }
+ }
+ public static void shutdownInMemoryDatabase(JdbcCacheStoreConfig config) {
+ Connection conn = null;
+ Statement st = null;
+ try {
+ String shutDownConnection = getShutdownUrl(config);
+ String url = config.getConnectionUrl();
+ assert url != null;
+ conn = DriverManager.getConnection(shutDownConnection);
+ st = conn.createStatement();
+ st.execute("SHUTDOWN");
+ }
+ catch (Throwable e) {
+ throw new IllegalStateException(e);
+ }
+ finally {
+ try {
+ conn.close();
+ st.close();
+ }
+ catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ public static void clearDatabaseFiles(Properties props) {
+ //now delete the disk folder
+ String dbName = getDatabaseName(props);
+ String toDel = TestingUtil.TEST_FILES + File.separator + dbName;
+ TestingUtil.recursiveFileRemove(toDel);
+ }
+ public static String getDatabaseName(Properties prop) {
+ //jdbc:hsqldb:mem:jbosscache
+ StringTokenizer tokenizer = new
StringTokenizer(prop.getProperty("cache.jdbc.url"), ":");
+ tokenizer.nextToken();
+ tokenizer.nextToken();
+ tokenizer.nextToken();
+ return tokenizer.nextToken();
+ }
+ private static String getShutdownUrl(JdbcCacheStoreConfig props) {
+ String url = props.getConnectionUrl();
+ assert url != null;
+ //jdbc:derby:jbossdb;create=true
+ StringTokenizer tokenizer = new StringTokenizer(url, ";");
+ String result = tokenizer.nextToken() + ";" + "shutdown=true";
+ return result;
+ }
+ private static JdbcCacheStoreConfig returnBasedOnDifferentInstance() {
+ //jdbc:hsqldb:mem:jbosscache
+ JdbcCacheStoreConfig result = realConfig.clone();
+ String jdbcUrl = result.getConnectionUrl();
+ Pattern pattern = Pattern.compile("jbosscache");
+ Matcher matcher = pattern.matcher(jdbcUrl);
+ boolean found = matcher.find();
+ assert found;
+ String newJdbcUrl = matcher.replaceFirst(extractTestName() +
+ result.setConnectionUrl(newJdbcUrl);
+ return result;
+ }
+ private static String extractTestName() {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ if (stack.length == 0) return null;
+ for (int i = stack.length - 1; i > 0; i--) {
+ StackTraceElement e = stack[i];
+ String className = e.getClassName();
+ if (className.indexOf("org.jboss.cache") != -1) return
className.replace('.', '_') + "_" + e.getMethodName();
+ }
+ return null;
+ }
Property changes on:
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF