[infinispan-commits] Infinispan SVN: r2436 - in trunk/cachestore/cassandra: src/main/java/org/infinispan/loaders/cassandra and 1 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Sun Sep 26 16:48:35 EDT 2010


Author: NadirX
Date: 2010-09-26 16:48:35 -0400 (Sun, 26 Sep 2010)
New Revision: 2436

Modified:
   trunk/cachestore/cassandra/pom.xml
   trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
   trunk/cachestore/cassandra/src/test/resources/storage-conf.xml
Log:
All tests are green !

Modified: trunk/cachestore/cassandra/pom.xml
===================================================================
--- trunk/cachestore/cassandra/pom.xml	2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/pom.xml	2010-09-26 20:48:35 UTC (rev 2436)
@@ -28,6 +28,11 @@
 			<version>${version.slf4j}</version>
 			<scope>test</scope>
 		</dependency>
+		<dependency>
+			<groupId>com.eaio.uuid</groupId>
+			<artifactId>uuid</artifactId>
+			<version>3.2</version>
+		</dependency>
 	</dependencies>
 
 	<build>

Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-09-26 20:48:35 UTC (rev 2436)
@@ -7,10 +7,10 @@
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.UUID;
 
 import net.dataforte.cassandra.pool.ConnectionPool;
 
@@ -27,6 +27,7 @@
 import org.apache.cassandra.thrift.NotFoundException;
 import org.apache.cassandra.thrift.SlicePredicate;
 import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
 import org.infinispan.container.entries.InternalCacheEntry;
@@ -64,6 +65,7 @@
 
 	private ColumnPath entryColumnPath;
 	private ColumnParent entryColumnParent;
+	private ColumnParent expirationColumnParent;
 
 	static private byte emptyByteArray[] = {};
 
@@ -84,6 +86,7 @@
 			pool = new ConnectionPool(config.getPoolProperties());
 			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
 			entryColumnParent = new ColumnParent(config.entryColumnFamily);
+			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);			
 		} catch (Exception e) {
 			throw new ConfigurationException(e);
 		}
@@ -132,38 +135,47 @@
 			SlicePredicate slicePredicate = new SlicePredicate();
 			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
 			String startKey = "";
-			boolean complete = false;
-			
+
 			// Get the keys in SLICE_SIZE blocks
 			int sliceSize = Math.min(SLICE_SIZE, numEntries);
-			while (!complete) {
+			for(boolean complete = false; !complete; ) {
 				KeyRange keyRange = new KeyRange(sliceSize);
 				keyRange.setStart_token(startKey);
 				keyRange.setEnd_token("");
-				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
-				if (keySlices.size() < sliceSize) {
-					// Cassandra has returned less keys than what we asked for. Assume we have finished
-					complete = true;
-				} else {
-					// Cassandra has returned exactly the amount of keys we asked for. Assume we need to cycle again starting from the last returned key (excluded)
-					startKey = keySlices.get(keySlices.size() - 1).getKey();
-					sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
-					if (sliceSize == 0) {
-						complete = true;
-					}
-				}
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);				
 
 				// Cycle through all the keys
 				for (KeySlice keySlice : keySlices) {
 					String key = unhashKey(keySlice.getKey());
 					List<ColumnOrSuperColumn> columns = keySlice.getColumns();
 					if (columns.size() > 0) {
-						log.debug("COLUMN = " + new String(columns.get(0).getColumn().getName()));
+						if (log.isDebugEnabled()) {
+							log.debug("Loading {0}", key);
+						}
 						byte[] value = columns.get(0).getColumn().getValue();
 						InternalCacheEntry ice = unmarshall(value, key);
 						s.add(ice);
+					} else if (log.isDebugEnabled()) {
+						log.debug("Skipping empty key {0}", key);
 					}
 				}
+				if (keySlices.size() < sliceSize) {
+					// Cassandra has returned less keys than what we asked for.
+					// Assume we have finished
+					complete = true;
+				} else {
+					// Cassandra has returned exactly the amount of keys we
+					// asked for. If we haven't reached the required quota yet, 
+					// assume we need to cycle again starting from
+					// the last returned key (excluded)					
+					sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+					if (sliceSize == 0) {
+						complete = true;
+					} else {
+						startKey = keySlices.get(keySlices.size() - 1).getKey();
+					}
+				}
+				
 			}
 			return s;
 		} catch (Exception e) {
@@ -196,9 +208,11 @@
 				}
 
 				for (KeySlice keySlice : keySlices) {
-					String key = unhashKey(keySlice.getKey());
-					if (keysToExclude == null || !keysToExclude.contains(key))
-						s.add(key);
+					if(keySlice.getColumnsSize()>0) {
+						String key = unhashKey(keySlice.getKey());
+						if (keysToExclude == null || !keysToExclude.contains(key))
+							s.add(key);
+					}
 				}
 			}
 			return s;
@@ -259,7 +273,7 @@
 	@Override
 	public boolean remove(Object key) throws CacheLoaderException {
 		if (trace)
-			log.trace("remove() " + key);
+			log.trace("remove(\"{0}\") ", key);
 		Cassandra.Iface cassandraClient = null;
 		try {
 			cassandraClient = pool.getConnection();
@@ -277,7 +291,6 @@
 
 	private void remove0(String key, Map<String, Map<String, List<Mutation>>> mutationMap) {
 		addMutation(mutationMap, key, config.entryColumnFamily, null, null);
-		addMutation(mutationMap, key, config.expirationColumnFamily, null, null);
 	}
 
 	private byte[] marshall(InternalCacheEntry entry) throws IOException {
@@ -299,6 +312,7 @@
 			cassandraClient = pool.getConnection();
 			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
 			store0(entry, mutationMap);
+			
 			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
 		} catch (Exception e) {
 			throw new CacheLoaderException(e);
@@ -309,14 +323,22 @@
 
 	private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
 		Object key = entry.getKey();
-
+		if (trace)
+			log.trace("store(\"{0}\") ", key);
 		String cassandraKey = CassandraCacheStore.hashKey(key);
 		addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
 		if (entry.canExpire()) {
-			addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
-
+			addExpiryEntry(cassandraKey, entry.getExpiryTime(), mutationMap);
 		}
 	}
+	
+	private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {		
+		try {
+			addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+		} catch (Exception e) {
+			// Should not happen
+		}
+	}
 
 	/**
 	 * Writes to a stream the number of entries (long) then the entries
@@ -359,11 +381,42 @@
 
 	/**
 	 * Purge expired entries.
+	 * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+	 * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
 	 */
 	@Override
 	protected void purgeInternal() throws CacheLoaderException {
-		log.trace("purgeInternal");
-		// TODO: implement
+		if (trace)
+			log.trace("purgeInternal");
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			// We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
+			SlicePredicate predicate = new SlicePredicate();
+			predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			for(boolean complete=false; !complete; ) {
+				// Get all columns
+				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+				complete = slice.size() < SLICE_SIZE;
+				// Delete all keys returned by the slice
+				for(ColumnOrSuperColumn crumb : slice) {
+					SuperColumn scol = crumb.getSuper_column();					
+					for(Iterator<Column> i = scol.getColumnsIterator(); i.hasNext(); ) {
+						Column col = i.next();
+						// Remove the entry row
+						remove0(new String(col.getName(), "UTF-8"), mutationMap);
+					}
+					// Remove the expiration supercolumn
+					addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+				}				
+			}
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
 
 	}
 
@@ -413,11 +466,11 @@
 		return key.substring(ENTRY_KEY_PREFIX.length());
 	}
 
-	public static String expirationColumn(long timestamp) {
-		return String.format("expiration%013d", timestamp);
+	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+		addMutation(mutationMap, key, columnFamily, null, column, value);
 	}
 
-	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] superColumn, byte[] column, byte[] value) {
 		Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
 		// If the key doesn't exist yet, create the mutation holder
 		if (keyMutations == null) {
@@ -433,24 +486,27 @@
 
 		if (value == null) { // Delete
 			Deletion deletion = new Deletion(System.currentTimeMillis());
+			if(superColumn!=null) {
+				deletion.setSuper_column(superColumn);
+			}
 			if (column != null) { // Single column delete
 				deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
-			} // else Delete entire column family
+			} // else Delete entire column family or supercolumn
 			columnFamilyMutations.add(new Mutation().setDeletion(deletion));
 		} else { // Insert/update
 			ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
-			cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+			if(superColumn!=null) {
+				List<Column> columns = new ArrayList<Column>();
+				columns.add(new Column(column, value, System.currentTimeMillis()));
+				cosc.setSuper_column(new SuperColumn(superColumn, columns));
+			} else {
+				cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+			}
 			columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
 		}
 	}
 
-	public static UUID getTimeBasedUUID(long timestamp) {
-		long lsb = 0;
-		long msb = 0;
-		return new UUID(msb, lsb);
-	}
-
-	private static final byte[] longToBytes(long v) throws IOException {
+	private static final byte[] longToBytes(long v) {
 		byte b[] = new byte[8];
 		b[0] = (byte) (v >>> 56);
 		b[1] = (byte) (v >>> 48);
@@ -462,5 +518,4 @@
 		b[7] = (byte) (v >>> 0);
 		return b;
 	}
-
 }

Modified: trunk/cachestore/cassandra/src/test/resources/storage-conf.xml
===================================================================
--- trunk/cachestore/cassandra/src/test/resources/storage-conf.xml	2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/src/test/resources/storage-conf.xml	2010-09-26 20:48:35 UTC (rev 2436)
@@ -79,7 +79,7 @@
       -->
 
       <ColumnFamily CompareWith="BytesType" Name="InfinispanEntries" KeysCached="10%" />
-      <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" />
+      <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" ColumnType="Super" CompareSubcolumnsWith="BytesType"/>
 
       <!--
        ~ Strategy: Setting this to the class that implements
@@ -343,5 +343,5 @@
    ~ elapsed, even in the face of hardware failures.  The default value is
    ~ ten days.
   -->
-  <GCGraceSeconds>864000</GCGraceSeconds>
+  <GCGraceSeconds>86400</GCGraceSeconds>
 </Storage>



More information about the infinispan-commits mailing list