[infinispan-commits] Infinispan SVN: r2656 - trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Nov 3 05:03:56 EDT 2010
Author: NadirX
Date: 2010-11-03 05:03:55 -0400 (Wed, 03 Nov 2010)
New Revision: 2656
Modified:
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
Log:
Implement ISPN-757
Implement ISPN-758
Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-11-02 18:32:46 UTC (rev 2655)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-11-03 09:03:55 UTC (rev 2656)
@@ -12,9 +12,10 @@
import java.util.Map;
import java.util.Set;
-import net.dataforte.cassandra.pool.ConnectionPool;
+import net.dataforte.cassandra.pool.DataSource;
import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.CassandraThriftDataSource;
import org.apache.cassandra.thrift.Column;
import org.apache.cassandra.thrift.ColumnOrSuperColumn;
import org.apache.cassandra.thrift.ColumnParent;
@@ -60,12 +61,18 @@
private static final boolean trace = log.isTraceEnabled();
private CassandraCacheStoreConfig config;
+
+ private CassandraThriftDataSource dataSource;
+
+ private ConsistencyLevel readConsistencyLevel;
+ private ConsistencyLevel writeConsistencyLevel;
- private ConnectionPool pool;
-
+ private String cacheName;
private ColumnPath entryColumnPath;
private ColumnParent entryColumnParent;
private ColumnParent expirationColumnParent;
+ private String entryKeyPrefix;
+ private String expirationKey;
static private byte emptyByteArray[] = {};
@@ -76,6 +83,7 @@
@Override
public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
super.init(clc, cache, m);
+ this.cacheName = cache.getName();
this.config = (CassandraCacheStoreConfig) clc;
}
@@ -83,10 +91,14 @@
public void start() throws CacheLoaderException {
try {
- pool = new ConnectionPool(config.getPoolProperties());
+ dataSource = new DataSource(config.getPoolProperties());
+ readConsistencyLevel = ConsistencyLevel.valueOf(config.readConsistencyLevel);
+ writeConsistencyLevel = ConsistencyLevel.valueOf(config.writeConsistencyLevel);
entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
entryColumnParent = new ColumnParent(config.entryColumnFamily);
- expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+ entryKeyPrefix = ENTRY_KEY_PREFIX+(config.isSharedKeyspace()?cacheName+"_":"");
+ expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+ expirationKey = EXPIRATION_KEY+(config.isSharedKeyspace()?"_"+cacheName:"");
} catch (Exception e) {
throw new ConfigurationException(e);
}
@@ -100,11 +112,11 @@
@Override
public InternalCacheEntry load(Object key) throws CacheLoaderException {
- String hashKey = CassandraCacheStore.hashKey(key);
+ String hashKey = hashKey(key);
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
- ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+ cassandraClient = dataSource.getConnection();
+ ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, readConsistencyLevel);
InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
if (ice != null && ice.isExpired()) {
remove(key);
@@ -117,7 +129,7 @@
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -130,7 +142,7 @@
public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
@@ -142,7 +154,7 @@
KeyRange keyRange = new KeyRange(sliceSize);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
// Cycle through all the keys
for (KeySlice keySlice : keySlices) {
@@ -183,7 +195,7 @@
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -191,7 +203,7 @@
public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
Set<Object> s = new HashSet<Object>();
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
@@ -202,7 +214,7 @@
KeyRange keyRange = new KeyRange(SLICE_SIZE);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -221,7 +233,7 @@
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -231,14 +243,14 @@
*/
@Override
public void stop() {
- pool.close();
+
}
@Override
public void clear() throws CacheLoaderException {
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
String startKey = "";
@@ -248,7 +260,7 @@
KeyRange keyRange = new KeyRange(SLICE_SIZE);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -265,7 +277,7 @@
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -276,16 +288,16 @@
log.trace("remove(\"{0}\") ", key);
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
- remove0(CassandraCacheStore.hashKey(key), mutationMap);
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ remove0(hashKey(key), mutationMap);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
return true;
} catch (Exception e) {
log.error("Exception while removing " + key, e);
return false;
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -309,15 +321,15 @@
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
store0(entry, mutationMap);
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -325,7 +337,7 @@
Object key = entry.getKey();
if (trace)
log.trace("store(\"{0}\") ", key);
- String cassandraKey = CassandraCacheStore.hashKey(key);
+ String cassandraKey = hashKey(key);
try {
addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
if (entry.canExpire()) {
@@ -339,7 +351,7 @@
private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {
try {
- addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+ addMutation(mutationMap, expirationKey, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
} catch (Exception e) {
// Should not happen
}
@@ -389,7 +401,7 @@
/**
* Purge expired entries.
- * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+ * Expiration entries are stored in a single key (expirationKey) within a specific ColumnFamily (set by configuration).
* The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
*/
@Override
@@ -398,14 +410,14 @@
log.trace("purgeInternal");
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
// We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
SlicePredicate predicate = new SlicePredicate();
predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
for(boolean complete=false; !complete; ) {
// Get all columns
- List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+ List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, expirationKey, expirationColumnParent, predicate, readConsistencyLevel);
complete = slice.size() < SLICE_SIZE;
// Delete all keys returned by the slice
for(ColumnOrSuperColumn crumb : slice) {
@@ -416,14 +428,14 @@
remove0(new String(col.getName(), "UTF-8"), mutationMap);
}
// Remove the expiration supercolumn
- addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+ addMutation(mutationMap, expirationKey, config.expirationColumnFamily, scol.getName(), null, null);
}
}
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -433,7 +445,7 @@
Cassandra.Client cassandraClient = null;
try {
- cassandraClient = pool.getConnection();
+ cassandraClient = dataSource.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
for (Modification m : mods) {
@@ -452,11 +464,11 @@
}
}
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
- pool.release(cassandraClient);
+ dataSource.releaseConnection(cassandraClient);
}
}
@@ -466,13 +478,13 @@
return "CassandraCacheStore";
}
- private static String hashKey(Object key) {
- return ENTRY_KEY_PREFIX + key.toString();
+ private String hashKey(Object key) {
+ return entryKeyPrefix + key.toString();
}
- private static String unhashKey(String key) {
- if(key.startsWith(ENTRY_KEY_PREFIX))
- return key.substring(ENTRY_KEY_PREFIX.length());
+ private String unhashKey(String key) {
+ if(key.startsWith(entryKeyPrefix))
+ return key.substring(entryKeyPrefix.length());
else
return null;
}
Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-11-02 18:32:46 UTC (rev 2655)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-11-03 09:03:55 UTC (rev 2656)
@@ -8,8 +8,6 @@
* Configures {@link CassandraCacheStore}.
*/
public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
-
-
/**
* @configRef desc="The Cassandra keyspace"
@@ -25,8 +23,17 @@
* @configRef desc="The Cassandra column family for expirations"
*/
String expirationColumnFamily = "InfinispanExpiration";
+
+ /**
+ * @configRef desc="Whether the keySpace is shared between multiple caches"
+ */
+ boolean sharedKeyspace = false;
+
+ String readConsistencyLevel = "ONE";
+
+ String writeConsistencyLevel = "ONE";
- PoolProperties poolProperties;
+ protected PoolProperties poolProperties;
public CassandraCacheStoreConfig() {
setCacheLoaderClassName(CassandraCacheStore.class.getName());
@@ -57,6 +64,30 @@
this.expirationColumnFamily = expirationColumnFamily;
}
+ public boolean isSharedKeyspace() {
+ return sharedKeyspace;
+ }
+
+ public void setSharedKeyspace(boolean sharedKeyspace) {
+ this.sharedKeyspace = sharedKeyspace;
+ }
+
+ public String getReadConsistencyLevel() {
+ return readConsistencyLevel;
+ }
+
+ public void setReadConsistencyLevel(String readConsistencyLevel) {
+ this.readConsistencyLevel = readConsistencyLevel;
+ }
+
+ public String getWriteConsistencyLevel() {
+ return writeConsistencyLevel;
+ }
+
+ public void setWriteConsistencyLevel(String writeConsistencyLevel) {
+ this.writeConsistencyLevel = writeConsistencyLevel;
+ }
+
public PoolProperties getPoolProperties() {
return poolProperties;
}
@@ -77,258 +108,37 @@
return poolProperties.getPort();
}
-
- public int getAbandonWhenPercentageFull() {
- return poolProperties.getAbandonWhenPercentageFull();
- }
-
-
public boolean isFramed() {
return poolProperties.isFramed();
}
-
- public int getInitialSize() {
- return poolProperties.getInitialSize();
- }
-
-
- public int getMaxActive() {
- return poolProperties.getMaxActive();
- }
-
-
- public long getMaxAge() {
- return poolProperties.getMaxAge();
- }
-
-
- public int getMaxIdle() {
- return poolProperties.getMaxIdle();
- }
-
-
- public int getMaxWait() {
- return poolProperties.getMaxWait();
- }
-
-
- public int getMinEvictableIdleTimeMillis() {
- return poolProperties.getMinEvictableIdleTimeMillis();
- }
-
-
- public int getMinIdle() {
- return poolProperties.getMinIdle();
- }
-
-
- public String getName() {
- return poolProperties.getName();
- }
-
-
- public int getNumTestsPerEvictionRun() {
- return poolProperties.getNumTestsPerEvictionRun();
- }
-
-
public String getPassword() {
return poolProperties.getPassword();
}
-
- public int getRemoveAbandonedTimeout() {
- return poolProperties.getRemoveAbandonedTimeout();
- }
-
- public int getSuspectTimeout() {
- return poolProperties.getSuspectTimeout();
- }
-
-
- public int getTimeBetweenEvictionRunsMillis() {
- return poolProperties.getTimeBetweenEvictionRunsMillis();
- }
-
-
- public boolean getUseLock() {
- return poolProperties.getUseLock();
- }
-
-
public String getUsername() {
return poolProperties.getUsername();
}
-
- public long getValidationInterval() {
- return poolProperties.getValidationInterval();
- }
-
- public boolean isFairQueue() {
- return poolProperties.isFairQueue();
- }
-
-
- public boolean isJmxEnabled() {
- return poolProperties.isJmxEnabled();
- }
-
-
- public boolean isLogAbandoned() {
- return poolProperties.isLogAbandoned();
- }
-
-
- public boolean isRemoveAbandoned() {
- return poolProperties.isRemoveAbandoned();
- }
-
-
- public boolean isTestOnBorrow() {
- return poolProperties.isTestOnBorrow();
- }
-
-
- public boolean isTestOnConnect() {
- return poolProperties.isTestOnConnect();
- }
-
-
- public boolean isTestOnReturn() {
- return poolProperties.isTestOnReturn();
- }
-
-
- public boolean isTestWhileIdle() {
- return poolProperties.isTestWhileIdle();
- }
-
-
- public void setAbandonWhenPercentageFull(int percentage) {
- poolProperties.setAbandonWhenPercentageFull(percentage);
-
- }
-
- public void setFairQueue(boolean fairQueue) {
- poolProperties.setFairQueue(fairQueue);
-
- }
-
-
public void setFramed(boolean framed) {
poolProperties.setFramed(framed);
}
-
- public void setInitialSize(int initialSize) {
- poolProperties.setInitialSize(initialSize);
-
- }
-
-
- public void setJmxEnabled(boolean jmxEnabled) {
- poolProperties.setJmxEnabled(jmxEnabled);
- }
-
-
- public void setLogAbandoned(boolean logAbandoned) {
- poolProperties.setLogAbandoned(logAbandoned);
- }
-
-
- public void setMaxActive(int maxActive) {
- poolProperties.setMaxActive(maxActive);
-
- }
-
-
- public void setMaxAge(long maxAge) {
- poolProperties.setMaxAge(maxAge);
-
- }
-
-
- public void setMaxIdle(int maxIdle) {
- poolProperties.setMaxIdle(maxIdle);
-
- }
-
-
- public void setMaxWait(int maxWait) {
- poolProperties.setMaxWait(maxWait);
-
- }
-
-
- public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
- poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
-
- }
-
- public void setMinIdle(int minIdle) {
- poolProperties.setMinIdle(minIdle);
-
- }
-
- public void setName(String name) {
- poolProperties.setName(name);
- }
-
- public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
- poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
-
- }
-
public void setPassword(String password) {
poolProperties.setPassword(password);
}
- public void setRemoveAbandoned(boolean removeAbandoned) {
- poolProperties.setRemoveAbandoned(removeAbandoned);
+ public void setUsername(String username) {
+ poolProperties.setUsername(username);
}
-
- public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
- poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
-
+ public void setDatasourceJndiLocation(String location) {
+ poolProperties.setDataSourceJNDI(location);
}
-
- public void setSuspectTimeout(int seconds) {
- poolProperties.setSuspectTimeout(seconds);
-
+
+ public String getDatasourceJndiLocation() {
+ return poolProperties.getDataSourceJNDI();
}
- public void setTestOnBorrow(boolean testOnBorrow) {
- poolProperties.setTestOnBorrow(testOnBorrow);
-
- }
-
- public void setTestOnConnect(boolean testOnConnect) {
- poolProperties.setTestOnConnect(testOnConnect);
-
- }
-
- public void setTestOnReturn(boolean testOnReturn) {
- poolProperties.setTestOnReturn(testOnReturn);
- }
-
- public void setTestWhileIdle(boolean testWhileIdle) {
- poolProperties.setTestWhileIdle(testWhileIdle);
- }
-
- public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
- poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
-
- }
-
- public void setUsername(String username) {
- poolProperties.setUsername(username);
- }
-
- public void setValidationInterval(long validationInterval) {
- poolProperties.setValidationInterval(validationInterval);
- }
}
More information about the infinispan-commits
mailing list