[infinispan-commits] Infinispan SVN: r2657 - branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Nov 3 05:13:36 EDT 2010
Author: NadirX
Date: 2010-11-03 05:13:35 -0400 (Wed, 03 Nov 2010)
New Revision: 2657
Modified:
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
Log:
Implementation of ISPN-757 and ISPN-758
Modified: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-11-03 09:03:55 UTC (rev 2656)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-11-03 09:13:35 UTC (rev 2657)
@@ -63,10 +63,16 @@
private CassandraCacheStoreConfig config;
private CassandraThriftDataSource dataSource;
+
+ private ConsistencyLevel readConsistencyLevel;
+ private ConsistencyLevel writeConsistencyLevel;
+ private String cacheName;
private ColumnPath entryColumnPath;
private ColumnParent entryColumnParent;
private ColumnParent expirationColumnParent;
+ private String entryKeyPrefix;
+ private String expirationKey;
static private byte emptyByteArray[] = {};
@@ -77,6 +83,7 @@
@Override
public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
super.init(clc, cache, m);
+ this.cacheName = cache.getName();
this.config = (CassandraCacheStoreConfig) clc;
}
@@ -84,10 +91,14 @@
public void start() throws CacheLoaderException {
try {
- dataSource = new DataSource(config.getPoolProperties());
+ dataSource = new DataSource(config.getPoolProperties());
+ readConsistencyLevel = ConsistencyLevel.valueOf(config.readConsistencyLevel);
+ writeConsistencyLevel = ConsistencyLevel.valueOf(config.writeConsistencyLevel);
entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
entryColumnParent = new ColumnParent(config.entryColumnFamily);
- expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+ entryKeyPrefix = ENTRY_KEY_PREFIX+(config.isSharedKeyspace()?cacheName+"_":"");
+ expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+ expirationKey = EXPIRATION_KEY+(config.isSharedKeyspace()?"_"+cacheName:"");
} catch (Exception e) {
throw new ConfigurationException(e);
}
@@ -101,11 +112,11 @@
@Override
public InternalCacheEntry load(Object key) throws CacheLoaderException {
- String hashKey = CassandraCacheStore.hashKey(key);
+ String hashKey = hashKey(key);
Cassandra.Client cassandraClient = null;
try {
cassandraClient = dataSource.getConnection();
- ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+ ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, readConsistencyLevel);
InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
if (ice != null && ice.isExpired()) {
remove(key);
@@ -143,7 +154,7 @@
KeyRange keyRange = new KeyRange(sliceSize);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
// Cycle through all the keys
for (KeySlice keySlice : keySlices) {
@@ -203,7 +214,7 @@
KeyRange keyRange = new KeyRange(SLICE_SIZE);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -249,7 +260,7 @@
KeyRange keyRange = new KeyRange(SLICE_SIZE);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -279,8 +290,8 @@
try {
cassandraClient = dataSource.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
- remove0(CassandraCacheStore.hashKey(key), mutationMap);
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ remove0(hashKey(key), mutationMap);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
return true;
} catch (Exception e) {
log.error("Exception while removing " + key, e);
@@ -314,7 +325,7 @@
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
store0(entry, mutationMap);
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
@@ -326,7 +337,7 @@
Object key = entry.getKey();
if (trace)
log.trace("store(\"{0}\") ", key);
- String cassandraKey = CassandraCacheStore.hashKey(key);
+ String cassandraKey = hashKey(key);
try {
addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
if (entry.canExpire()) {
@@ -340,7 +351,7 @@
private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {
try {
- addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+ addMutation(mutationMap, expirationKey, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
} catch (Exception e) {
// Should not happen
}
@@ -390,7 +401,7 @@
/**
* Purge expired entries.
- * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+ * Expiration entries are stored in a single key (expirationKey) within a specific ColumnFamily (set by configuration).
* The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
*/
@Override
@@ -406,7 +417,7 @@
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
for(boolean complete=false; !complete; ) {
// Get all columns
- List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+ List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, expirationKey, expirationColumnParent, predicate, readConsistencyLevel);
complete = slice.size() < SLICE_SIZE;
// Delete all keys returned by the slice
for(ColumnOrSuperColumn crumb : slice) {
@@ -417,10 +428,10 @@
remove0(new String(col.getName(), "UTF-8"), mutationMap);
}
// Remove the expiration supercolumn
- addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+ addMutation(mutationMap, expirationKey, config.expirationColumnFamily, scol.getName(), null, null);
}
}
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
@@ -453,7 +464,7 @@
}
}
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
@@ -467,13 +478,13 @@
return "CassandraCacheStore";
}
- private static String hashKey(Object key) {
- return ENTRY_KEY_PREFIX + key.toString();
+ private String hashKey(Object key) {
+ return entryKeyPrefix + key.toString();
}
- private static String unhashKey(String key) {
- if(key.startsWith(ENTRY_KEY_PREFIX))
- return key.substring(ENTRY_KEY_PREFIX.length());
+ private String unhashKey(String key) {
+ if(key.startsWith(entryKeyPrefix))
+ return key.substring(entryKeyPrefix.length());
else
return null;
}
Modified: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-11-03 09:03:55 UTC (rev 2656)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-11-03 09:13:35 UTC (rev 2657)
@@ -23,6 +23,15 @@
* @configRef desc="The Cassandra column family for expirations"
*/
String expirationColumnFamily = "InfinispanExpiration";
+
+ /**
+ * @configRef desc="Whether the keySpace is shared between multiple caches"
+ */
+ boolean sharedKeyspace = false;
+
+ String readConsistencyLevel = "ONE";
+
+ String writeConsistencyLevel = "ONE";
protected PoolProperties poolProperties;
@@ -55,6 +64,30 @@
this.expirationColumnFamily = expirationColumnFamily;
}
+ public boolean isSharedKeyspace() {
+ return sharedKeyspace;
+ }
+
+ public void setSharedKeyspace(boolean sharedKeyspace) {
+ this.sharedKeyspace = sharedKeyspace;
+ }
+
+ public String getReadConsistencyLevel() {
+ return readConsistencyLevel;
+ }
+
+ public void setReadConsistencyLevel(String readConsistencyLevel) {
+ this.readConsistencyLevel = readConsistencyLevel;
+ }
+
+ public String getWriteConsistencyLevel() {
+ return writeConsistencyLevel;
+ }
+
+ public void setWriteConsistencyLevel(String writeConsistencyLevel) {
+ this.writeConsistencyLevel = writeConsistencyLevel;
+ }
+
public PoolProperties getPoolProperties() {
return poolProperties;
}
More information about the infinispan-commits
mailing list