[infinispan-commits] Infinispan SVN: r2402 - in trunk/cachestore/cassandra: src/main/java/org/infinispan/loaders/cassandra and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Fri Sep 17 04:16:57 EDT 2010


Author: NadirX
Date: 2010-09-17 04:16:56 -0400 (Fri, 17 Sep 2010)
New Revision: 2402

Added:
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
   trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
   trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
Modified:
   trunk/cachestore/cassandra/
   trunk/cachestore/cassandra/pom.xml
Log:
ISPN-653: Initial import of the Cassandra cachestore (still needs work for key expiration)


Property changes on: trunk/cachestore/cassandra
___________________________________________________________________
Name: svn:ignore
   + target
.settings
.classpath
.project


Modified: trunk/cachestore/cassandra/pom.xml
===================================================================
--- trunk/cachestore/cassandra/pom.xml	2010-09-17 07:54:40 UTC (rev 2401)
+++ trunk/cachestore/cassandra/pom.xml	2010-09-17 08:16:56 UTC (rev 2402)
@@ -19,7 +19,7 @@
 
    <dependencies>
    	<dependency>
-   		<groupId>net.dataforte</groupId>
+   		<groupId>net.dataforte.cassandra</groupId>
    		<artifactId>cassandra-connection-pool</artifactId>
    		<version>0.1.0</version>
    	</dependency>

Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	                        (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,464 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import net.dataforte.cassandra.pool.ConnectionPool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.AbstractCacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderMetadata;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.StreamingMarshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * A persistent <code>CacheLoader</code> based on Apache Cassandra project. See
+ * http://cassandra.apache.org/
+ * 
+ * @author Tristan Tarrant
+ */
+ at CacheLoaderMetadata(configurationClass = CassandraCacheStoreConfig.class)
+public class CassandraCacheStore extends AbstractCacheStore {
+
+	private static final String ENTRY_KEY_PREFIX = "entry_";
+	private static final String ENTRY_COLUMN_NAME = "entry";
+	private static final String EXPIRATION_KEY = "expiration";
+	private static final int SLICE_SIZE = 100;
+	private static final Log log = LogFactory.getLog(CassandraCacheStore.class);
+	private static final boolean trace = log.isTraceEnabled();
+
+	private CassandraCacheStoreConfig config;
+
+	private ConnectionPool pool;
+
+	private ColumnPath entryColumnPath;
+	private ColumnParent entryColumnParent;
+
+	static private byte emptyByteArray[] = {};
+
+	public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+		return CassandraCacheStoreConfig.class;
+	}
+
+	@Override
+	public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
+		super.init(clc, cache, m);
+		this.config = (CassandraCacheStoreConfig) clc;
+	}
+
+	@Override
+	public void start() throws CacheLoaderException {
+		
+		try {
+			pool = new ConnectionPool(config.getPoolProperties());
+			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
+			entryColumnParent = new ColumnParent(config.entryColumnFamily);
+		} catch (Exception e) {
+			throw new ConfigurationException(e);
+		}
+
+		log.debug("cleaning up expired entries...");
+		purgeInternal();
+
+		log.debug("started");
+		super.start();
+	}
+
+	public InternalCacheEntry load(Object key) throws CacheLoaderException {
+		String hashKey = hashKey(key);
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+			InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
+			if (ice != null && ice.isExpired()) {
+				remove(key);
+				return null;
+			}
+			return ice;
+		} catch (NotFoundException nfe) {
+			log.debug("Key '{0}' not found", hashKey);
+			return null;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	@Override
+	public Set<InternalCacheEntry> loadAll() throws CacheLoaderException {
+		return load(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+			boolean complete = false;
+			// Get the keys in SLICE_SIZE blocks
+			int sliceSize = Math.min(SLICE_SIZE, numEntries);
+			while (!complete) {
+				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", sliceSize,
+						ConsistencyLevel.ONE);
+				if (keySlices.size() < sliceSize) {
+					complete = true;
+				} else {
+					startKey = keySlices.get(keySlices.size() - 1).getKey();
+					sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+					if(sliceSize==0) {
+						complete = true;
+					}
+				}
+
+				for (KeySlice keySlice : keySlices) {
+					String key = keySlice.getKey();
+					List<ColumnOrSuperColumn> columns = keySlice.getColumns();
+					if(columns.size()>0) {
+						byte[] value = columns.get(0).getColumn().getValue();
+						InternalCacheEntry ice = unmarshall(value, key);
+						s.add(ice);
+					}
+				}
+			}
+			return s;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	@Override
+	public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Set<Object> s = new HashSet<Object>();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+			boolean complete = false;
+			// Get the keys in SLICE_SIZE blocks
+			while (!complete) {
+				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
+						ConsistencyLevel.ONE);
+				if (keySlices.size() < SLICE_SIZE) {
+					complete = true;
+				} else {
+					startKey = keySlices.get(keySlices.size() - 1).getKey();
+				}
+
+				for (KeySlice keySlice : keySlices) {
+					String key = keySlice.getKey();
+					if (keysToExclude == null || !keysToExclude.contains(key))
+						s.add(key);
+				}
+			}
+			return s;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	/**
+	 * Closes all databases, ignoring exceptions, and nulls references to all
+	 * database related information.
+	 */
+	@Override
+	public void stop() {
+		pool.close();
+	}
+
+	public void clear() throws CacheLoaderException {
+		if (trace)
+			log.trace("clear()");
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+			boolean complete = false;
+			// Get the keys in SLICE_SIZE blocks
+			while (!complete) {
+				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
+						ConsistencyLevel.ONE);
+				if (keySlices.size() < SLICE_SIZE) {
+					complete = true;
+				} else {
+					startKey = keySlices.get(keySlices.size() - 1).getKey();
+				}
+				Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+				for (KeySlice keySlice : keySlices) {
+					String cassandraKey = keySlice.getKey();
+					addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), null);
+					addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, null, null);
+				}
+				cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			}
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+
+	}
+
+	public boolean remove(Object key) throws CacheLoaderException {
+		if (trace)
+			log.trace("remove() " + key);
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			remove0(key, mutationMap);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			return true;
+		} catch (Exception e) {
+			log.error("Exception while removing " + key, e);
+			return false;
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	private void remove0(Object key, Map<String, Map<String, List<Mutation>>> mutationMap) {
+		String cassandraKey = CassandraCacheStore.hashKey(key);
+		addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), null);
+		addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, null, null);
+	}
+
+	private byte[] marshall(InternalCacheEntry entry) throws IOException {
+		return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
+	}
+
+	private InternalCacheEntry unmarshall(Object o, Object key) throws IOException, ClassNotFoundException {
+		if (o == null)
+			return null;
+		byte b[] = (byte[]) o;
+		InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
+		return v.toInternalCacheEntry(key);
+	}
+
+	public void store(InternalCacheEntry entry) throws CacheLoaderException {		
+		Cassandra.Iface cassandraClient = null;
+		
+		try {
+			if (trace)
+				log.trace("storing " + entry.getKey());
+			
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			store0(entry, mutationMap);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			
+			if (trace)
+				log.trace("stored " + entry.getKey());
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException  {
+		Object key = entry.getKey();
+				
+			
+		String cassandraKey = CassandraCacheStore.hashKey(key);
+		addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
+		if (entry.canExpire()) {
+			addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
+		}
+			
+		
+		
+	}
+
+	/**
+	 * Writes to a stream the number of entries (long) then the entries
+	 * themselves.
+	 */
+	public void toStream(ObjectOutput out) throws CacheLoaderException {
+		try {
+			Set<InternalCacheEntry> loadAll = loadAll();
+			log.debug("toStream() entries");
+			int count = 0;
+			for (InternalCacheEntry entry : loadAll) {
+				getMarshaller().objectToObjectStream(entry, out);
+				count++;
+			}
+			getMarshaller().objectToObjectStream(null, out);
+			log.debug("wrote " + count + " entries");
+		} catch (IOException e) {
+			throw new CacheLoaderException(e);
+		}
+	}
+
+	/**
+	 * Reads from a stream the number of entries (long) then the entries
+	 * themselves.
+	 */
+	public void fromStream(ObjectInput in) throws CacheLoaderException {
+		try {
+			log.debug("fromStream()");
+			int count = 0;
+			while (true) {
+				count++;
+				InternalCacheEntry entry = (InternalCacheEntry) getMarshaller().objectFromObjectStream(in);
+				if (entry == null)
+					break;
+				store(entry);
+			}
+			log.debug("read " + count + " entries");
+		} catch (IOException e) {
+			throw new CacheLoaderException(e);
+		} catch (ClassNotFoundException e) {
+			throw new CacheLoaderException(e);
+		}
+	}
+
+	/**
+	 * Purge expired entries.
+	 */
+	@Override
+	protected void purgeInternal() throws CacheLoaderException {
+		log.trace("purgeInternal");
+		// TODO: implement
+
+	}
+
+	@Override
+	protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		
+		try {					
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			
+			for (Modification m : mods) {
+				switch (m.getType()) {
+				case STORE:
+					store0(((Store) m).getStoredEntry(), mutationMap);
+					break;
+				case CLEAR:
+					clear();
+					break;
+				case REMOVE:
+					remove0(((Remove) m).getKey(), mutationMap);
+					break;
+				default:
+					throw new AssertionError();
+				}
+			}
+			
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);				
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+		
+
+	}
+
+	@Override
+	public String toString() {
+		return "CassandraCacheStore";
+	}
+
+	public static String hashKey(Object key) {
+		return ENTRY_KEY_PREFIX+key.toString();
+	}
+	
+	public static String expirationColumn(long timestamp) {
+		return String.format("expiration%013d", timestamp);
+	}
+
+	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+		Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
+		// If the key doesn't exist yet, create the mutation holder
+		if (keyMutations == null) {
+			keyMutations = new HashMap<String, List<Mutation>>();
+			mutationMap.put(key, keyMutations);
+		}
+		List<Mutation> columnFamilyMutations = keyMutations.get(columnFamily);
+		if (columnFamilyMutations == null) {
+			columnFamilyMutations = new ArrayList<Mutation>();
+			keyMutations.put(columnFamily, columnFamilyMutations);
+		}
+
+		if (value == null) { // Delete
+			log.debug("Delete '{0}'", key);
+			Deletion deletion = new Deletion(System.currentTimeMillis());
+			if (column != null) { // Single column delete
+				deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
+			} // else Delete entire column family
+			columnFamilyMutations.add(new Mutation().setDeletion(deletion));
+		} else { // Insert/update
+			log.debug("Insert/update '{0}', size={1}", key, value.length);
+			ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+			cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+			columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+		}
+	}
+	
+	public static UUID getTimeBasedUUID(long timestamp) {
+		long lsb = 0;
+		long msb = 0;
+		return new UUID(msb, lsb);
+	}
+
+	private static final byte[] longToBytes(long v) throws IOException {
+		byte b[] = new byte[8];
+		b[0] = (byte) (v >>> 56);
+		b[1] = (byte) (v >>> 48);
+		b[2] = (byte) (v >>> 40);
+		b[3] = (byte) (v >>> 32);
+		b[4] = (byte) (v >>> 24);
+		b[5] = (byte) (v >>> 16);
+		b[6] = (byte) (v >>> 8);
+		b[7] = (byte) (v >>> 0);
+		return b;
+	}
+
+}

Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	                        (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,331 @@
+package org.infinispan.loaders.cassandra;
+
+import net.dataforte.cassandra.pool.PoolProperties;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link CassandraCacheStore}.
+ */
+public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
+
+	/**
+	 * @configRef desc="The Cassandra keyspace"
+	 */
+	String keySpace = "Infinispan";
+
+	/**
+	 * @configRef desc="The Cassandra column family for entries"
+	 */
+	String entryColumnFamily = "InfinispanEntries";
+
+	/**
+	 * @configRef desc="The Cassandra column family for expirations"
+	 */
+	String expirationColumnFamily = "InfinispanExpiration";
+
+	PoolProperties poolProperties;
+
+	public CassandraCacheStoreConfig() {
+		poolProperties = new PoolProperties();
+	}
+
+	public String getKeySpace() {
+		return keySpace;
+	}
+
+	public void setKeySpace(String keySpace) {
+		this.keySpace = keySpace;
+	}
+
+	public String getEntryColumnFamily() {
+		return entryColumnFamily;
+	}
+
+	public void setEntryColumnFamily(String entryColumnFamily) {
+		this.entryColumnFamily = entryColumnFamily;
+	}
+
+	public String getExpirationColumnFamily() {
+		return expirationColumnFamily;
+	}
+
+	public void setExpirationColumnFamily(String expirationColumnFamily) {
+		this.expirationColumnFamily = expirationColumnFamily;
+	}
+
+	public PoolProperties getPoolProperties() {
+		return poolProperties;
+	}
+
+	public void setHost(String host) {
+		poolProperties.setHost(host);
+	}
+
+	public String getHost() {
+		return poolProperties.getHost();
+	}
+
+	public void setPort(int port) {
+		poolProperties.setPort(port);
+	}
+
+	public int getPort() {
+		return poolProperties.getPort();
+	}
+
+	
+	public int getAbandonWhenPercentageFull() {
+		return poolProperties.getAbandonWhenPercentageFull();
+	}
+
+	
+	public boolean getFramed() {
+		return poolProperties.getFramed();
+	}
+
+	
+	public int getInitialSize() {
+		return poolProperties.getInitialSize();
+	}
+
+	
+	public int getMaxActive() {
+		return poolProperties.getMaxActive();
+	}
+
+	
+	public long getMaxAge() {
+		return poolProperties.getMaxAge();
+	}
+
+	
+	public int getMaxIdle() {
+		return poolProperties.getMaxIdle();
+	}
+
+	
+	public int getMaxWait() {
+		return poolProperties.getMaxWait();
+	}
+
+	
+	public int getMinEvictableIdleTimeMillis() {
+		return poolProperties.getMinEvictableIdleTimeMillis();
+	}
+
+	
+	public int getMinIdle() {
+		return poolProperties.getMinIdle();
+	}
+
+	
+	public String getName() {
+		return poolProperties.getName();
+	}
+
+	
+	public int getNumTestsPerEvictionRun() {
+		return poolProperties.getNumTestsPerEvictionRun();
+	}
+
+	
+	public String getPassword() {
+		return poolProperties.getPassword();
+	}
+	
+	public int getRemoveAbandonedTimeout() {
+		return poolProperties.getRemoveAbandonedTimeout();
+	}
+
+	
+	public int getSuspectTimeout() {
+		return poolProperties.getSuspectTimeout();
+	}
+
+	
+	public int getTimeBetweenEvictionRunsMillis() {
+		return poolProperties.getTimeBetweenEvictionRunsMillis();
+	}
+
+	
+	public boolean getUseLock() {
+		return poolProperties.getUseLock();
+	}
+
+	
+	public String getUsername() {
+		return poolProperties.getUsername();
+	}
+
+	
+	public long getValidationInterval() {
+		return poolProperties.getValidationInterval();
+	}
+	
+	public boolean isFairQueue() {
+		return poolProperties.isFairQueue();
+	}
+
+	
+	public boolean isJmxEnabled() {
+		return poolProperties.isJmxEnabled();
+	}
+
+	
+	public boolean isLogAbandoned() {
+		return poolProperties.isLogAbandoned();
+	}
+
+	
+	public boolean isRemoveAbandoned() {
+		return poolProperties.isRemoveAbandoned();
+	}
+
+	
+	public boolean isTestOnBorrow() {
+		return poolProperties.isTestOnBorrow();
+	}
+
+	
+	public boolean isTestOnConnect() {
+		return poolProperties.isTestOnConnect();
+	}
+
+	
+	public boolean isTestOnReturn() {
+		return poolProperties.isTestOnReturn();
+	}
+
+	
+	public boolean isTestWhileIdle() {
+		return poolProperties.isTestWhileIdle();
+	}
+
+	
+	public void setAbandonWhenPercentageFull(int percentage) {
+		poolProperties.setAbandonWhenPercentageFull(percentage);
+
+	}
+	
+	public void setFairQueue(boolean fairQueue) {
+		poolProperties.setFairQueue(fairQueue);
+
+	}
+
+	
+	public void setFramed(boolean framed) {
+		poolProperties.setFramed(framed);
+
+	}
+
+	
+	public void setInitialSize(int initialSize) {
+		poolProperties.setInitialSize(initialSize);
+
+	}
+
+	
+	public void setJmxEnabled(boolean jmxEnabled) {
+		poolProperties.setJmxEnabled(jmxEnabled);
+	}
+
+	
+	public void setLogAbandoned(boolean logAbandoned) {
+		poolProperties.setLogAbandoned(logAbandoned);
+	}
+
+	
+	public void setMaxActive(int maxActive) {
+		poolProperties.setMaxActive(maxActive);
+
+	}
+
+	
+	public void setMaxAge(long maxAge) {
+		poolProperties.setMaxAge(maxAge);
+
+	}
+
+	
+	public void setMaxIdle(int maxIdle) {
+		poolProperties.setMaxIdle(maxIdle);
+
+	}
+
+	
+	public void setMaxWait(int maxWait) {
+		poolProperties.setMaxWait(maxWait);
+
+	}
+
+	
+	public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
+		poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+
+	}
+
+	public void setMinIdle(int minIdle) {
+		poolProperties.setMinIdle(minIdle);
+
+	}
+
+	public void setName(String name) {
+		poolProperties.setName(name);
+	}
+
+	public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
+		poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
+
+	}
+
+	public void setPassword(String password) {
+		poolProperties.setPassword(password);
+	}
+
+	public void setRemoveAbandoned(boolean removeAbandoned) {
+		poolProperties.setRemoveAbandoned(removeAbandoned);
+	}
+
+	
+	public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
+		poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+
+	}
+
+	public void setSuspectTimeout(int seconds) {
+		poolProperties.setSuspectTimeout(seconds);
+
+	}
+
+	public void setTestOnBorrow(boolean testOnBorrow) {
+		poolProperties.setTestOnBorrow(testOnBorrow);
+
+	}
+
+	public void setTestOnConnect(boolean testOnConnect) {
+		poolProperties.setTestOnConnect(testOnConnect);
+
+	}
+
+	public void setTestOnReturn(boolean testOnReturn) {
+		poolProperties.setTestOnReturn(testOnReturn);
+	}
+
+	public void setTestWhileIdle(boolean testWhileIdle) {
+		poolProperties.setTestWhileIdle(testWhileIdle);
+	}
+
+	public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
+		poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+
+	}
+
+	public void setUsername(String username) {
+		poolProperties.setUsername(username);
+	}
+
+	public void setValidationInterval(long validationInterval) {
+		poolProperties.setValidationInterval(validationInterval);
+	}
+}

Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java	                        (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java	2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,5 @@
+/**
+ * This package contains a {@link org.infinispan.loaders.CacheStore} implementation based on
+ * persisting to JDBM.
+ */
+package org.infinispan.loaders.cassandra;
\ No newline at end of file

Added: trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
===================================================================
--- trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java	                        (rev 0)
+++ trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java	2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,53 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.thrift.transport.TTransportException;
+import org.infinispan.loaders.BaseCacheStoreTest;
+import org.infinispan.loaders.CacheStore;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+ at Test(groups = "unit", testName = "loaders.cassandra.CassandraCacheStoreTest")
+public class CassandraCacheStoreTest extends BaseCacheStoreTest {
+	private static EmbeddedCassandraService cassandra;
+
+	/**
+	 * Set embedded cassandra up and spawn it in a new thread.
+	 * 
+	 * @throws TTransportException
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@BeforeClass
+	public static void setup() throws TTransportException, IOException, InterruptedException {
+		// Tell cassandra where the configuration files are.
+		// Use the test configuration file.
+		URL resource = Thread.currentThread().getContextClassLoader().getResource("storage-conf.xml");
+		String configPath = resource.getPath().substring(0, resource.getPath().lastIndexOf(File.separatorChar));
+		
+		System.setProperty("storage-config", configPath);
+
+		CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+		cleaner.prepare();
+		cassandra = new EmbeddedCassandraService();
+		cassandra.init();
+		Thread t = new Thread(cassandra);
+		t.setDaemon(true);
+		t.start();
+	}
+
+	@Override
+	protected CacheStore createCacheStore() throws Exception {
+		CassandraCacheStore cs = new CassandraCacheStore();
+		CassandraCacheStoreConfig clc = new CassandraCacheStoreConfig();
+		clc.setHost("localhost");
+		cs.init(clc, getCache(), getMarshaller());
+		cs.start();
+		return cs;
+	}
+
+}

Added: trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
===================================================================
--- trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java	                        (rev 0)
+++ trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java	2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,96 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CassandraServiceDataCleaner {
+	/**
+	 * Creates all data dir if they don't exist and cleans them
+	 * 
+	 * @throws IOException
+	 */
+	public void prepare() throws IOException {
+		makeDirsIfNotExist();
+		cleanupDataDirectories();
+	}
+
+	/**
+	 * Deletes all data from cassandra data directories, including the commit
+	 * log.
+	 * 
+	 * @throws IOException
+	 *             in case of permissions error etc.
+	 */
+	public void cleanupDataDirectories() throws IOException {
+		for (String s : getDataDirs()) {
+			cleanDir(s);
+		}
+	}
+
+	/**
+	 * Creates the data diurectories, if they didn't exist.
+	 * 
+	 * @throws IOException
+	 *             if directories cannot be created (permissions etc).
+	 */
+	public void makeDirsIfNotExist() throws IOException {
+		for (String s : getDataDirs()) {
+			mkdir(s);
+		}
+	}
+
+	/**
+	 * Collects all data dirs and returns a set of String paths on the file
+	 * system.
+	 * 
+	 * @return
+	 */
+	private Set<String> getDataDirs() {
+		Set<String> dirs = new HashSet<String>();
+		for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+			dirs.add(s);
+		}
+		dirs.add(DatabaseDescriptor.getLogFileLocation());
+		return dirs;
+	}
+
+	/**
+	 * Creates a directory
+	 * 
+	 * @param dir
+	 * @throws IOException
+	 */
+	private void mkdir(String dir) throws IOException {
+		FileUtils.createDirectory(dir);
+	}
+
+	/**
+	 * Removes all directory content from file the system
+	 * 
+	 * @param dir
+	 * @throws IOException
+	 */
+	private void cleanDir(String dir) throws IOException {
+		File dirFile = new File(dir);
+		if (dirFile.exists() && dirFile.isDirectory()) {
+			deleteDir(dirFile);
+		}
+	}
+
+	public static void deleteDir(File dir) throws IOException {
+		if (dir.isDirectory()) {
+			String[] children = dir.list();
+			for (int i = 0; i < children.length; i++) {
+				deleteDir(new File(dir, children[i]));
+			}
+		}
+
+		dir.delete();
+	}
+
+}



More information about the infinispan-commits mailing list