[infinispan-commits] Infinispan SVN: r2657 - branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Nov 3 05:13:36 EDT 2010


Author: NadirX
Date: 2010-11-03 05:13:35 -0400 (Wed, 03 Nov 2010)
New Revision: 2657

Modified:
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
Log:
Implementation of ISPN-757 and ISPN-758

Modified: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-11-03 09:03:55 UTC (rev 2656)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-11-03 09:13:35 UTC (rev 2657)
@@ -63,10 +63,16 @@
 	private CassandraCacheStoreConfig config;
 	
 	private CassandraThriftDataSource dataSource;
+	
+	private ConsistencyLevel readConsistencyLevel;
+	private ConsistencyLevel writeConsistencyLevel;
 
+	private String cacheName;
 	private ColumnPath entryColumnPath;
 	private ColumnParent entryColumnParent;
 	private ColumnParent expirationColumnParent;
+	private String entryKeyPrefix;
+	private String expirationKey;
 
 	static private byte emptyByteArray[] = {};
 
@@ -77,6 +83,7 @@
 	@Override
 	public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
 		super.init(clc, cache, m);
+		this.cacheName = cache.getName();
 		this.config = (CassandraCacheStoreConfig) clc;
 	}
 
@@ -84,10 +91,14 @@
 	public void start() throws CacheLoaderException {
 
 		try {
-			dataSource = new DataSource(config.getPoolProperties());						
+			dataSource = new DataSource(config.getPoolProperties());
+			readConsistencyLevel = ConsistencyLevel.valueOf(config.readConsistencyLevel);
+			writeConsistencyLevel = ConsistencyLevel.valueOf(config.writeConsistencyLevel);
 			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
 			entryColumnParent = new ColumnParent(config.entryColumnFamily);
-			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);			
+			entryKeyPrefix = ENTRY_KEY_PREFIX+(config.isSharedKeyspace()?cacheName+"_":"");
+			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+			expirationKey = EXPIRATION_KEY+(config.isSharedKeyspace()?"_"+cacheName:"");
 		} catch (Exception e) {
 			throw new ConfigurationException(e);
 		}
@@ -101,11 +112,11 @@
 
 	@Override
 	public InternalCacheEntry load(Object key) throws CacheLoaderException {
-		String hashKey = CassandraCacheStore.hashKey(key);
+		String hashKey = hashKey(key);
 		Cassandra.Client cassandraClient = null;
 		try {
 			cassandraClient = dataSource.getConnection();
-			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, readConsistencyLevel);
 			InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
 			if (ice != null && ice.isExpired()) {
 				remove(key);
@@ -143,7 +154,7 @@
 				KeyRange keyRange = new KeyRange(sliceSize);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);				
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);				
 
 				// Cycle through all the keys
 				for (KeySlice keySlice : keySlices) {
@@ -203,7 +214,7 @@
 				KeyRange keyRange = new KeyRange(SLICE_SIZE);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -249,7 +260,7 @@
 				KeyRange keyRange = new KeyRange(SLICE_SIZE);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, readConsistencyLevel);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -279,8 +290,8 @@
 		try {
 			cassandraClient = dataSource.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
-			remove0(CassandraCacheStore.hashKey(key), mutationMap);
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			remove0(hashKey(key), mutationMap);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 			return true;
 		} catch (Exception e) {
 			log.error("Exception while removing " + key, e);
@@ -314,7 +325,7 @@
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
 			store0(entry, mutationMap);
 			
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
@@ -326,7 +337,7 @@
 		Object key = entry.getKey();
 		if (trace)
 			log.trace("store(\"{0}\") ", key);
-		String cassandraKey = CassandraCacheStore.hashKey(key);
+		String cassandraKey = hashKey(key);
       try {
          addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
          if (entry.canExpire()) {
@@ -340,7 +351,7 @@
 	
 	private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {		
 		try {
-			addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+			addMutation(mutationMap, expirationKey, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
 		} catch (Exception e) {
 			// Should not happen
 		}
@@ -390,7 +401,7 @@
 
 	/**
 	 * Purge expired entries.
-	 * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+	 * Expiration entries are stored in a single key (expirationKey) within a specific ColumnFamily (set by configuration).
 	 * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
 	 */
 	@Override
@@ -406,7 +417,7 @@
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
 			for(boolean complete=false; !complete; ) {
 				// Get all columns
-				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, expirationKey, expirationColumnParent, predicate, readConsistencyLevel);
 				complete = slice.size() < SLICE_SIZE;
 				// Delete all keys returned by the slice
 				for(ColumnOrSuperColumn crumb : slice) {
@@ -417,10 +428,10 @@
 						remove0(new String(col.getName(), "UTF-8"), mutationMap);
 					}
 					// Remove the expiration supercolumn
-					addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+					addMutation(mutationMap, expirationKey, config.expirationColumnFamily, scol.getName(), null, null);
 				}				
 			}
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
@@ -453,7 +464,7 @@
 				}
 			}
 
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, writeConsistencyLevel);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
@@ -467,13 +478,13 @@
 		return "CassandraCacheStore";
 	}
 
-	private static String hashKey(Object key) {
-		return ENTRY_KEY_PREFIX + key.toString();
+	private String hashKey(Object key) {
+		return entryKeyPrefix + key.toString();
 	}
 
-	private static String unhashKey(String key) {
-		if(key.startsWith(ENTRY_KEY_PREFIX))
-			return key.substring(ENTRY_KEY_PREFIX.length());
+	private String unhashKey(String key) {
+		if(key.startsWith(entryKeyPrefix))
+			return key.substring(entryKeyPrefix.length());
 		else
 			return null;
 	}

Modified: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-11-03 09:03:55 UTC (rev 2656)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-11-03 09:13:35 UTC (rev 2657)
@@ -23,6 +23,15 @@
 	 * @configRef desc="The Cassandra column family for expirations"
 	 */
 	String expirationColumnFamily = "InfinispanExpiration";
+	
+	/**
+	 * @configRef desc="Whether the keySpace is shared between multiple caches"
+	 */
+	boolean sharedKeyspace = false;
+	
+	String readConsistencyLevel = "ONE";
+	
+	String writeConsistencyLevel = "ONE";
 
 	protected PoolProperties poolProperties;
 
@@ -55,6 +64,30 @@
 		this.expirationColumnFamily = expirationColumnFamily;
 	}
 
+	public boolean isSharedKeyspace() {
+		return sharedKeyspace;
+	}
+
+	public void setSharedKeyspace(boolean sharedKeyspace) {
+		this.sharedKeyspace = sharedKeyspace;
+	}
+
+	public String getReadConsistencyLevel() {
+		return readConsistencyLevel;
+	}
+
+	public void setReadConsistencyLevel(String readConsistencyLevel) {
+		this.readConsistencyLevel = readConsistencyLevel;
+	}
+
+	public String getWriteConsistencyLevel() {
+		return writeConsistencyLevel;
+	}
+
+	public void setWriteConsistencyLevel(String writeConsistencyLevel) {
+		this.writeConsistencyLevel = writeConsistencyLevel;
+	}
+
 	public PoolProperties getPoolProperties() {
 		return poolProperties;
 	}



More information about the infinispan-commits mailing list