[infinispan-commits] Infinispan SVN: r2402 - in trunk/cachestore/cassandra: src/main/java/org/infinispan/loaders/cassandra and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Fri Sep 17 04:16:57 EDT 2010
Author: NadirX
Date: 2010-09-17 04:16:56 -0400 (Fri, 17 Sep 2010)
New Revision: 2402
Added:
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
Modified:
trunk/cachestore/cassandra/
trunk/cachestore/cassandra/pom.xml
Log:
ISPN-653: Initial import of the Cassandra cachestore (still needs work for key expiration)
Property changes on: trunk/cachestore/cassandra
___________________________________________________________________
Name: svn:ignore
+ target
.settings
.classpath
.project
Modified: trunk/cachestore/cassandra/pom.xml
===================================================================
--- trunk/cachestore/cassandra/pom.xml 2010-09-17 07:54:40 UTC (rev 2401)
+++ trunk/cachestore/cassandra/pom.xml 2010-09-17 08:16:56 UTC (rev 2402)
@@ -19,7 +19,7 @@
<dependencies>
<dependency>
- <groupId>net.dataforte</groupId>
+ <groupId>net.dataforte.cassandra</groupId>
<artifactId>cassandra-connection-pool</artifactId>
<version>0.1.0</version>
</dependency>
Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,464 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+
+import net.dataforte.cassandra.pool.ConnectionPool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.AbstractCacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderMetadata;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.StreamingMarshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * A persistent <code>CacheLoader</code> based on Apache Cassandra project. See
+ * http://cassandra.apache.org/
+ *
+ * @author Tristan Tarrant
+ */
+ at CacheLoaderMetadata(configurationClass = CassandraCacheStoreConfig.class)
+public class CassandraCacheStore extends AbstractCacheStore {
+
+ private static final String ENTRY_KEY_PREFIX = "entry_";
+ private static final String ENTRY_COLUMN_NAME = "entry";
+ private static final String EXPIRATION_KEY = "expiration";
+ private static final int SLICE_SIZE = 100;
+ private static final Log log = LogFactory.getLog(CassandraCacheStore.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ private CassandraCacheStoreConfig config;
+
+ private ConnectionPool pool;
+
+ private ColumnPath entryColumnPath;
+ private ColumnParent entryColumnParent;
+
+ static private byte emptyByteArray[] = {};
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return CassandraCacheStoreConfig.class;
+ }
+
+ @Override
+ public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
+ super.init(clc, cache, m);
+ this.config = (CassandraCacheStoreConfig) clc;
+ }
+
+ @Override
+ public void start() throws CacheLoaderException {
+
+ try {
+ pool = new ConnectionPool(config.getPoolProperties());
+ entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
+ entryColumnParent = new ColumnParent(config.entryColumnFamily);
+ } catch (Exception e) {
+ throw new ConfigurationException(e);
+ }
+
+ log.debug("cleaning up expired entries...");
+ purgeInternal();
+
+ log.debug("started");
+ super.start();
+ }
+
+ public InternalCacheEntry load(Object key) throws CacheLoaderException {
+ String hashKey = hashKey(key);
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+ InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
+ if (ice != null && ice.isExpired()) {
+ remove(key);
+ return null;
+ }
+ return ice;
+ } catch (NotFoundException nfe) {
+ log.debug("Key '{0}' not found", hashKey);
+ return null;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ @Override
+ public Set<InternalCacheEntry> loadAll() throws CacheLoaderException {
+ return load(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+ boolean complete = false;
+ // Get the keys in SLICE_SIZE blocks
+ int sliceSize = Math.min(SLICE_SIZE, numEntries);
+ while (!complete) {
+ List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", sliceSize,
+ ConsistencyLevel.ONE);
+ if (keySlices.size() < sliceSize) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+ if(sliceSize==0) {
+ complete = true;
+ }
+ }
+
+ for (KeySlice keySlice : keySlices) {
+ String key = keySlice.getKey();
+ List<ColumnOrSuperColumn> columns = keySlice.getColumns();
+ if(columns.size()>0) {
+ byte[] value = columns.get(0).getColumn().getValue();
+ InternalCacheEntry ice = unmarshall(value, key);
+ s.add(ice);
+ }
+ }
+ }
+ return s;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Set<Object> s = new HashSet<Object>();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+ boolean complete = false;
+ // Get the keys in SLICE_SIZE blocks
+ while (!complete) {
+ List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
+ ConsistencyLevel.ONE);
+ if (keySlices.size() < SLICE_SIZE) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+
+ for (KeySlice keySlice : keySlices) {
+ String key = keySlice.getKey();
+ if (keysToExclude == null || !keysToExclude.contains(key))
+ s.add(key);
+ }
+ }
+ return s;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ /**
+ * Closes all databases, ignoring exceptions, and nulls references to all
+ * database related information.
+ */
+ @Override
+ public void stop() {
+ pool.close();
+ }
+
+ public void clear() throws CacheLoaderException {
+ if (trace)
+ log.trace("clear()");
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+ boolean complete = false;
+ // Get the keys in SLICE_SIZE blocks
+ while (!complete) {
+ List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
+ ConsistencyLevel.ONE);
+ if (keySlices.size() < SLICE_SIZE) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+ for (KeySlice keySlice : keySlices) {
+ String cassandraKey = keySlice.getKey();
+ addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), null);
+ addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, null, null);
+ }
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+
+ }
+
+ public boolean remove(Object key) throws CacheLoaderException {
+ if (trace)
+ log.trace("remove() " + key);
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+ remove0(key, mutationMap);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ return true;
+ } catch (Exception e) {
+ log.error("Exception while removing " + key, e);
+ return false;
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ private void remove0(Object key, Map<String, Map<String, List<Mutation>>> mutationMap) {
+ String cassandraKey = CassandraCacheStore.hashKey(key);
+ addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), null);
+ addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, null, null);
+ }
+
+ private byte[] marshall(InternalCacheEntry entry) throws IOException {
+ return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
+ }
+
+ private InternalCacheEntry unmarshall(Object o, Object key) throws IOException, ClassNotFoundException {
+ if (o == null)
+ return null;
+ byte b[] = (byte[]) o;
+ InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
+ return v.toInternalCacheEntry(key);
+ }
+
+ public void store(InternalCacheEntry entry) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+
+ try {
+ if (trace)
+ log.trace("storing " + entry.getKey());
+
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+ store0(entry, mutationMap);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+
+ if (trace)
+ log.trace("stored " + entry.getKey());
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
+ Object key = entry.getKey();
+
+
+ String cassandraKey = CassandraCacheStore.hashKey(key);
+ addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
+ if (entry.canExpire()) {
+ addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
+ }
+
+
+
+ }
+
+ /**
+ * Writes to a stream the number of entries (long) then the entries
+ * themselves.
+ */
+ public void toStream(ObjectOutput out) throws CacheLoaderException {
+ try {
+ Set<InternalCacheEntry> loadAll = loadAll();
+ log.debug("toStream() entries");
+ int count = 0;
+ for (InternalCacheEntry entry : loadAll) {
+ getMarshaller().objectToObjectStream(entry, out);
+ count++;
+ }
+ getMarshaller().objectToObjectStream(null, out);
+ log.debug("wrote " + count + " entries");
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /**
+ * Reads from a stream the number of entries (long) then the entries
+ * themselves.
+ */
+ public void fromStream(ObjectInput in) throws CacheLoaderException {
+ try {
+ log.debug("fromStream()");
+ int count = 0;
+ while (true) {
+ count++;
+ InternalCacheEntry entry = (InternalCacheEntry) getMarshaller().objectFromObjectStream(in);
+ if (entry == null)
+ break;
+ store(entry);
+ }
+ log.debug("read " + count + " entries");
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /**
+ * Purge expired entries.
+ */
+ @Override
+ protected void purgeInternal() throws CacheLoaderException {
+ log.trace("purgeInternal");
+ // TODO: implement
+
+ }
+
+ @Override
+ protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+
+ try {
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+ for (Modification m : mods) {
+ switch (m.getType()) {
+ case STORE:
+ store0(((Store) m).getStoredEntry(), mutationMap);
+ break;
+ case CLEAR:
+ clear();
+ break;
+ case REMOVE:
+ remove0(((Remove) m).getKey(), mutationMap);
+ break;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+
+
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraCacheStore";
+ }
+
+ public static String hashKey(Object key) {
+ return ENTRY_KEY_PREFIX+key.toString();
+ }
+
+ public static String expirationColumn(long timestamp) {
+ return String.format("expiration%013d", timestamp);
+ }
+
+ private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+ Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
+ // If the key doesn't exist yet, create the mutation holder
+ if (keyMutations == null) {
+ keyMutations = new HashMap<String, List<Mutation>>();
+ mutationMap.put(key, keyMutations);
+ }
+ List<Mutation> columnFamilyMutations = keyMutations.get(columnFamily);
+ if (columnFamilyMutations == null) {
+ columnFamilyMutations = new ArrayList<Mutation>();
+ keyMutations.put(columnFamily, columnFamilyMutations);
+ }
+
+ if (value == null) { // Delete
+ log.debug("Delete '{0}'", key);
+ Deletion deletion = new Deletion(System.currentTimeMillis());
+ if (column != null) { // Single column delete
+ deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
+ } // else Delete entire column family
+ columnFamilyMutations.add(new Mutation().setDeletion(deletion));
+ } else { // Insert/update
+ log.debug("Insert/update '{0}', size={1}", key, value.length);
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+ columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ }
+
+ public static UUID getTimeBasedUUID(long timestamp) {
+ long lsb = 0;
+ long msb = 0;
+ return new UUID(msb, lsb);
+ }
+
+ private static final byte[] longToBytes(long v) throws IOException {
+ byte b[] = new byte[8];
+ b[0] = (byte) (v >>> 56);
+ b[1] = (byte) (v >>> 48);
+ b[2] = (byte) (v >>> 40);
+ b[3] = (byte) (v >>> 32);
+ b[4] = (byte) (v >>> 24);
+ b[5] = (byte) (v >>> 16);
+ b[6] = (byte) (v >>> 8);
+ b[7] = (byte) (v >>> 0);
+ return b;
+ }
+
+}
Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,331 @@
+package org.infinispan.loaders.cassandra;
+
+import net.dataforte.cassandra.pool.PoolProperties;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link CassandraCacheStore}.
+ */
+public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
+
+ /**
+ * @configRef desc="The Cassandra keyspace"
+ */
+ String keySpace = "Infinispan";
+
+ /**
+ * @configRef desc="The Cassandra column family for entries"
+ */
+ String entryColumnFamily = "InfinispanEntries";
+
+ /**
+ * @configRef desc="The Cassandra column family for expirations"
+ */
+ String expirationColumnFamily = "InfinispanExpiration";
+
+ PoolProperties poolProperties;
+
+ public CassandraCacheStoreConfig() {
+ poolProperties = new PoolProperties();
+ }
+
+ public String getKeySpace() {
+ return keySpace;
+ }
+
+ public void setKeySpace(String keySpace) {
+ this.keySpace = keySpace;
+ }
+
+ public String getEntryColumnFamily() {
+ return entryColumnFamily;
+ }
+
+ public void setEntryColumnFamily(String entryColumnFamily) {
+ this.entryColumnFamily = entryColumnFamily;
+ }
+
+ public String getExpirationColumnFamily() {
+ return expirationColumnFamily;
+ }
+
+ public void setExpirationColumnFamily(String expirationColumnFamily) {
+ this.expirationColumnFamily = expirationColumnFamily;
+ }
+
+ public PoolProperties getPoolProperties() {
+ return poolProperties;
+ }
+
+ public void setHost(String host) {
+ poolProperties.setHost(host);
+ }
+
+ public String getHost() {
+ return poolProperties.getHost();
+ }
+
+ public void setPort(int port) {
+ poolProperties.setPort(port);
+ }
+
+ public int getPort() {
+ return poolProperties.getPort();
+ }
+
+
+ public int getAbandonWhenPercentageFull() {
+ return poolProperties.getAbandonWhenPercentageFull();
+ }
+
+
+ public boolean getFramed() {
+ return poolProperties.getFramed();
+ }
+
+
+ public int getInitialSize() {
+ return poolProperties.getInitialSize();
+ }
+
+
+ public int getMaxActive() {
+ return poolProperties.getMaxActive();
+ }
+
+
+ public long getMaxAge() {
+ return poolProperties.getMaxAge();
+ }
+
+
+ public int getMaxIdle() {
+ return poolProperties.getMaxIdle();
+ }
+
+
+ public int getMaxWait() {
+ return poolProperties.getMaxWait();
+ }
+
+
+ public int getMinEvictableIdleTimeMillis() {
+ return poolProperties.getMinEvictableIdleTimeMillis();
+ }
+
+
+ public int getMinIdle() {
+ return poolProperties.getMinIdle();
+ }
+
+
+ public String getName() {
+ return poolProperties.getName();
+ }
+
+
+ public int getNumTestsPerEvictionRun() {
+ return poolProperties.getNumTestsPerEvictionRun();
+ }
+
+
+ public String getPassword() {
+ return poolProperties.getPassword();
+ }
+
+ public int getRemoveAbandonedTimeout() {
+ return poolProperties.getRemoveAbandonedTimeout();
+ }
+
+
+ public int getSuspectTimeout() {
+ return poolProperties.getSuspectTimeout();
+ }
+
+
+ public int getTimeBetweenEvictionRunsMillis() {
+ return poolProperties.getTimeBetweenEvictionRunsMillis();
+ }
+
+
+ public boolean getUseLock() {
+ return poolProperties.getUseLock();
+ }
+
+
+ public String getUsername() {
+ return poolProperties.getUsername();
+ }
+
+
+ public long getValidationInterval() {
+ return poolProperties.getValidationInterval();
+ }
+
+ public boolean isFairQueue() {
+ return poolProperties.isFairQueue();
+ }
+
+
+ public boolean isJmxEnabled() {
+ return poolProperties.isJmxEnabled();
+ }
+
+
+ public boolean isLogAbandoned() {
+ return poolProperties.isLogAbandoned();
+ }
+
+
+ public boolean isRemoveAbandoned() {
+ return poolProperties.isRemoveAbandoned();
+ }
+
+
+ public boolean isTestOnBorrow() {
+ return poolProperties.isTestOnBorrow();
+ }
+
+
+ public boolean isTestOnConnect() {
+ return poolProperties.isTestOnConnect();
+ }
+
+
+ public boolean isTestOnReturn() {
+ return poolProperties.isTestOnReturn();
+ }
+
+
+ public boolean isTestWhileIdle() {
+ return poolProperties.isTestWhileIdle();
+ }
+
+
+ public void setAbandonWhenPercentageFull(int percentage) {
+ poolProperties.setAbandonWhenPercentageFull(percentage);
+
+ }
+
+ public void setFairQueue(boolean fairQueue) {
+ poolProperties.setFairQueue(fairQueue);
+
+ }
+
+
+ public void setFramed(boolean framed) {
+ poolProperties.setFramed(framed);
+
+ }
+
+
+ public void setInitialSize(int initialSize) {
+ poolProperties.setInitialSize(initialSize);
+
+ }
+
+
+ public void setJmxEnabled(boolean jmxEnabled) {
+ poolProperties.setJmxEnabled(jmxEnabled);
+ }
+
+
+ public void setLogAbandoned(boolean logAbandoned) {
+ poolProperties.setLogAbandoned(logAbandoned);
+ }
+
+
+ public void setMaxActive(int maxActive) {
+ poolProperties.setMaxActive(maxActive);
+
+ }
+
+
+ public void setMaxAge(long maxAge) {
+ poolProperties.setMaxAge(maxAge);
+
+ }
+
+
+ public void setMaxIdle(int maxIdle) {
+ poolProperties.setMaxIdle(maxIdle);
+
+ }
+
+
+ public void setMaxWait(int maxWait) {
+ poolProperties.setMaxWait(maxWait);
+
+ }
+
+
+ public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
+ poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+
+ }
+
+ public void setMinIdle(int minIdle) {
+ poolProperties.setMinIdle(minIdle);
+
+ }
+
+ public void setName(String name) {
+ poolProperties.setName(name);
+ }
+
+ public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
+ poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
+
+ }
+
+ public void setPassword(String password) {
+ poolProperties.setPassword(password);
+ }
+
+ public void setRemoveAbandoned(boolean removeAbandoned) {
+ poolProperties.setRemoveAbandoned(removeAbandoned);
+ }
+
+
+ public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
+ poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+
+ }
+
+ public void setSuspectTimeout(int seconds) {
+ poolProperties.setSuspectTimeout(seconds);
+
+ }
+
+ public void setTestOnBorrow(boolean testOnBorrow) {
+ poolProperties.setTestOnBorrow(testOnBorrow);
+
+ }
+
+ public void setTestOnConnect(boolean testOnConnect) {
+ poolProperties.setTestOnConnect(testOnConnect);
+
+ }
+
+ public void setTestOnReturn(boolean testOnReturn) {
+ poolProperties.setTestOnReturn(testOnReturn);
+ }
+
+ public void setTestWhileIdle(boolean testWhileIdle) {
+ poolProperties.setTestWhileIdle(testWhileIdle);
+ }
+
+ public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
+ poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+
+ }
+
+ public void setUsername(String username) {
+ poolProperties.setUsername(username);
+ }
+
+ public void setValidationInterval(long validationInterval) {
+ poolProperties.setValidationInterval(validationInterval);
+ }
+}
Added: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java (rev 0)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java 2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,5 @@
+/**
+ * This package contains a {@link org.infinispan.loaders.CacheStore} implementation based on
+ * persisting to JDBM.
+ */
+package org.infinispan.loaders.cassandra;
\ No newline at end of file
Added: trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
===================================================================
--- trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java (rev 0)
+++ trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java 2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,53 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.thrift.transport.TTransportException;
+import org.infinispan.loaders.BaseCacheStoreTest;
+import org.infinispan.loaders.CacheStore;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+ at Test(groups = "unit", testName = "loaders.cassandra.CassandraCacheStoreTest")
+public class CassandraCacheStoreTest extends BaseCacheStoreTest {
+ private static EmbeddedCassandraService cassandra;
+
+ /**
+ * Set embedded cassandra up and spawn it in a new thread.
+ *
+ * @throws TTransportException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException {
+ // Tell cassandra where the configuration files are.
+ // Use the test configuration file.
+ URL resource = Thread.currentThread().getContextClassLoader().getResource("storage-conf.xml");
+ String configPath = resource.getPath().substring(0, resource.getPath().lastIndexOf(File.separatorChar));
+
+ System.setProperty("storage-config", configPath);
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ @Override
+ protected CacheStore createCacheStore() throws Exception {
+ CassandraCacheStore cs = new CassandraCacheStore();
+ CassandraCacheStoreConfig clc = new CassandraCacheStoreConfig();
+ clc.setHost("localhost");
+ cs.init(clc, getCache(), getMarshaller());
+ cs.start();
+ return cs;
+ }
+
+}
Added: trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
===================================================================
--- trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java (rev 0)
+++ trunk/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java 2010-09-17 08:16:56 UTC (rev 2402)
@@ -0,0 +1,96 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+
+public class CassandraServiceDataCleaner {
+ /**
+ * Creates all data dir if they don't exist and cleans them
+ *
+ * @throws IOException
+ */
+ public void prepare() throws IOException {
+ makeDirsIfNotExist();
+ cleanupDataDirectories();
+ }
+
+ /**
+ * Deletes all data from cassandra data directories, including the commit
+ * log.
+ *
+ * @throws IOException
+ * in case of permissions error etc.
+ */
+ public void cleanupDataDirectories() throws IOException {
+ for (String s : getDataDirs()) {
+ cleanDir(s);
+ }
+ }
+
+ /**
+ * Creates the data diurectories, if they didn't exist.
+ *
+ * @throws IOException
+ * if directories cannot be created (permissions etc).
+ */
+ public void makeDirsIfNotExist() throws IOException {
+ for (String s : getDataDirs()) {
+ mkdir(s);
+ }
+ }
+
+ /**
+ * Collects all data dirs and returns a set of String paths on the file
+ * system.
+ *
+ * @return
+ */
+ private Set<String> getDataDirs() {
+ Set<String> dirs = new HashSet<String>();
+ for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+ dirs.add(s);
+ }
+ dirs.add(DatabaseDescriptor.getLogFileLocation());
+ return dirs;
+ }
+
+ /**
+ * Creates a directory
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void mkdir(String dir) throws IOException {
+ FileUtils.createDirectory(dir);
+ }
+
+ /**
+ * Removes all directory content from file the system
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void cleanDir(String dir) throws IOException {
+ File dirFile = new File(dir);
+ if (dirFile.exists() && dirFile.isDirectory()) {
+ deleteDir(dirFile);
+ }
+ }
+
+ public static void deleteDir(File dir) throws IOException {
+ if (dir.isDirectory()) {
+ String[] children = dir.list();
+ for (int i = 0; i < children.length; i++) {
+ deleteDir(new File(dir, children[i]));
+ }
+ }
+
+ dir.delete();
+ }
+
+}
More information about the infinispan-commits
mailing list