[infinispan-commits] Infinispan SVN: r2425 - trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Sep 22 06:49:55 EDT 2010


Author: NadirX
Date: 2010-09-22 06:49:55 -0400 (Wed, 22 Sep 2010)
New Revision: 2425

Modified:
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
Log:
Use get_range_slices

Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-09-21 19:29:34 UTC (rev 2424)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-09-22 10:49:55 UTC (rev 2425)
@@ -21,6 +21,7 @@
 import org.apache.cassandra.thrift.ColumnPath;
 import org.apache.cassandra.thrift.ConsistencyLevel;
 import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeyRange;
 import org.apache.cassandra.thrift.KeySlice;
 import org.apache.cassandra.thrift.Mutation;
 import org.apache.cassandra.thrift.NotFoundException;
@@ -78,7 +79,7 @@
 
 	@Override
 	public void start() throws CacheLoaderException {
-		
+
 		try {
 			pool = new ConnectionPool(config.getPoolProperties());
 			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
@@ -132,26 +133,32 @@
 			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
 			String startKey = "";
 			boolean complete = false;
+			
 			// Get the keys in SLICE_SIZE blocks
 			int sliceSize = Math.min(SLICE_SIZE, numEntries);
 			while (!complete) {
-				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", sliceSize,
-						ConsistencyLevel.ONE);
+				KeyRange keyRange = new KeyRange(sliceSize);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
 				if (keySlices.size() < sliceSize) {
+					// Cassandra has returned less keys than what we asked for. Assume we have finished
 					complete = true;
 				} else {
+					// Cassandra has returned exactly the amount of keys we asked for. Assume we need to cycle again starting from the last returned key (excluded)
 					startKey = keySlices.get(keySlices.size() - 1).getKey();
 					sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
-					if(sliceSize==0) {
+					if (sliceSize == 0) {
 						complete = true;
 					}
 				}
 
+				// Cycle through all the keys
 				for (KeySlice keySlice : keySlices) {
 					String key = unhashKey(keySlice.getKey());
 					List<ColumnOrSuperColumn> columns = keySlice.getColumns();
-					if(columns.size()>0) {
-						log.debug("COLUMN = "+new String(columns.get(0).getColumn().getName()));
+					if (columns.size() > 0) {
+						log.debug("COLUMN = " + new String(columns.get(0).getColumn().getName()));
 						byte[] value = columns.get(0).getColumn().getValue();
 						InternalCacheEntry ice = unmarshall(value, key);
 						s.add(ice);
@@ -178,8 +185,10 @@
 			boolean complete = false;
 			// Get the keys in SLICE_SIZE blocks
 			while (!complete) {
-				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
-						ConsistencyLevel.ONE);
+				KeyRange keyRange = new KeyRange(SLICE_SIZE);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -222,8 +231,10 @@
 			boolean complete = false;
 			// Get the keys in SLICE_SIZE blocks
 			while (!complete) {
-				List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
-						ConsistencyLevel.ONE);
+				KeyRange keyRange = new KeyRange(SLICE_SIZE);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
 				if (keySlices.size() < SLICE_SIZE) {
 					complete = true;
 				} else {
@@ -281,14 +292,14 @@
 		return v.toInternalCacheEntry(key);
 	}
 
-	public void store(InternalCacheEntry entry) throws CacheLoaderException {		
+	public void store(InternalCacheEntry entry) throws CacheLoaderException {
 		Cassandra.Iface cassandraClient = null;
-		
+
 		try {
 			cassandraClient = pool.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
 			store0(entry, mutationMap);
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);			
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
@@ -296,13 +307,14 @@
 		}
 	}
 
-	private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException  {
+	private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
 		Object key = entry.getKey();
-			
+
 		String cassandraKey = CassandraCacheStore.hashKey(key);
 		addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
 		if (entry.canExpire()) {
 			addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
+
 		}
 	}
 
@@ -312,7 +324,7 @@
 	 */
 	public void toStream(ObjectOutput out) throws CacheLoaderException {
 		try {
-			Set<InternalCacheEntry> loadAll = loadAll();		
+			Set<InternalCacheEntry> loadAll = loadAll();
 			int count = 0;
 			for (InternalCacheEntry entry : loadAll) {
 				getMarshaller().objectToObjectStream(entry, out);
@@ -358,11 +370,11 @@
 	@Override
 	protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
 		Cassandra.Iface cassandraClient = null;
-		
-		try {					
+
+		try {
 			cassandraClient = pool.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
-			
+
 			for (Modification m : mods) {
 				switch (m.getType()) {
 				case STORE:
@@ -378,14 +390,13 @@
 					throw new AssertionError();
 				}
 			}
-			
-			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);				
+
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
 		} finally {
 			pool.release(cassandraClient);
 		}
-		
 
 	}
 
@@ -395,13 +406,13 @@
 	}
 
 	public static String hashKey(Object key) {
-		return ENTRY_KEY_PREFIX+key.toString();
+		return ENTRY_KEY_PREFIX + key.toString();
 	}
-	
+
 	public static String unhashKey(String key) {
 		return key.substring(ENTRY_KEY_PREFIX.length());
 	}
-	
+
 	public static String expirationColumn(long timestamp) {
 		return String.format("expiration%013d", timestamp);
 	}
@@ -426,13 +437,13 @@
 				deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
 			} // else Delete entire column family
 			columnFamilyMutations.add(new Mutation().setDeletion(deletion));
-		} else { // Insert/update			
+		} else { // Insert/update
 			ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
 			cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
 			columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
 		}
 	}
-	
+
 	public static UUID getTimeBasedUUID(long timestamp) {
 		long lsb = 0;
 		long msb = 0;



More information about the infinispan-commits mailing list