[infinispan-commits] Infinispan SVN: r2436 - in trunk/cachestore/cassandra: src/main/java/org/infinispan/loaders/cassandra and 1 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Sun Sep 26 16:48:35 EDT 2010
Author: NadirX
Date: 2010-09-26 16:48:35 -0400 (Sun, 26 Sep 2010)
New Revision: 2436
Modified:
trunk/cachestore/cassandra/pom.xml
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
trunk/cachestore/cassandra/src/test/resources/storage-conf.xml
Log:
All tests are green !
Modified: trunk/cachestore/cassandra/pom.xml
===================================================================
--- trunk/cachestore/cassandra/pom.xml 2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/pom.xml 2010-09-26 20:48:35 UTC (rev 2436)
@@ -28,6 +28,11 @@
<version>${version.slf4j}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>com.eaio.uuid</groupId>
+ <artifactId>uuid</artifactId>
+ <version>3.2</version>
+ </dependency>
</dependencies>
<build>
Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-09-26 20:48:35 UTC (rev 2436)
@@ -7,10 +7,10 @@
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.UUID;
import net.dataforte.cassandra.pool.ConnectionPool;
@@ -27,6 +27,7 @@
import org.apache.cassandra.thrift.NotFoundException;
import org.apache.cassandra.thrift.SlicePredicate;
import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
import org.infinispan.container.entries.InternalCacheEntry;
@@ -64,6 +65,7 @@
private ColumnPath entryColumnPath;
private ColumnParent entryColumnParent;
+ private ColumnParent expirationColumnParent;
static private byte emptyByteArray[] = {};
@@ -84,6 +86,7 @@
pool = new ConnectionPool(config.getPoolProperties());
entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
entryColumnParent = new ColumnParent(config.entryColumnFamily);
+ expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
} catch (Exception e) {
throw new ConfigurationException(e);
}
@@ -132,38 +135,47 @@
SlicePredicate slicePredicate = new SlicePredicate();
slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
String startKey = "";
- boolean complete = false;
-
+
// Get the keys in SLICE_SIZE blocks
int sliceSize = Math.min(SLICE_SIZE, numEntries);
- while (!complete) {
+ for(boolean complete = false; !complete; ) {
KeyRange keyRange = new KeyRange(sliceSize);
keyRange.setStart_token(startKey);
keyRange.setEnd_token("");
- List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
- if (keySlices.size() < sliceSize) {
- // Cassandra has returned less keys than what we asked for. Assume we have finished
- complete = true;
- } else {
- // Cassandra has returned exactly the amount of keys we asked for. Assume we need to cycle again starting from the last returned key (excluded)
- startKey = keySlices.get(keySlices.size() - 1).getKey();
- sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
- if (sliceSize == 0) {
- complete = true;
- }
- }
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
// Cycle through all the keys
for (KeySlice keySlice : keySlices) {
String key = unhashKey(keySlice.getKey());
List<ColumnOrSuperColumn> columns = keySlice.getColumns();
if (columns.size() > 0) {
- log.debug("COLUMN = " + new String(columns.get(0).getColumn().getName()));
+ if (log.isDebugEnabled()) {
+ log.debug("Loading {0}", key);
+ }
byte[] value = columns.get(0).getColumn().getValue();
InternalCacheEntry ice = unmarshall(value, key);
s.add(ice);
+ } else if (log.isDebugEnabled()) {
+ log.debug("Skipping empty key {0}", key);
}
}
+ if (keySlices.size() < sliceSize) {
+ // Cassandra has returned less keys than what we asked for.
+ // Assume we have finished
+ complete = true;
+ } else {
+ // Cassandra has returned exactly the amount of keys we
+ // asked for. If we haven't reached the required quota yet,
+ // assume we need to cycle again starting from
+ // the last returned key (excluded)
+ sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+ if (sliceSize == 0) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+ }
+
}
return s;
} catch (Exception e) {
@@ -196,9 +208,11 @@
}
for (KeySlice keySlice : keySlices) {
- String key = unhashKey(keySlice.getKey());
- if (keysToExclude == null || !keysToExclude.contains(key))
- s.add(key);
+ if(keySlice.getColumnsSize()>0) {
+ String key = unhashKey(keySlice.getKey());
+ if (keysToExclude == null || !keysToExclude.contains(key))
+ s.add(key);
+ }
}
}
return s;
@@ -259,7 +273,7 @@
@Override
public boolean remove(Object key) throws CacheLoaderException {
if (trace)
- log.trace("remove() " + key);
+ log.trace("remove(\"{0}\") ", key);
Cassandra.Iface cassandraClient = null;
try {
cassandraClient = pool.getConnection();
@@ -277,7 +291,6 @@
private void remove0(String key, Map<String, Map<String, List<Mutation>>> mutationMap) {
addMutation(mutationMap, key, config.entryColumnFamily, null, null);
- addMutation(mutationMap, key, config.expirationColumnFamily, null, null);
}
private byte[] marshall(InternalCacheEntry entry) throws IOException {
@@ -299,6 +312,7 @@
cassandraClient = pool.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
store0(entry, mutationMap);
+
cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
} catch (Exception e) {
throw new CacheLoaderException(e);
@@ -309,14 +323,22 @@
private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
Object key = entry.getKey();
-
+ if (trace)
+ log.trace("store(\"{0}\") ", key);
String cassandraKey = CassandraCacheStore.hashKey(key);
addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
if (entry.canExpire()) {
- addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
-
+ addExpiryEntry(cassandraKey, entry.getExpiryTime(), mutationMap);
}
}
+
+ private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {
+ try {
+ addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+ } catch (Exception e) {
+ // Should not happen
+ }
+ }
/**
* Writes to a stream the number of entries (long) then the entries
@@ -359,11 +381,42 @@
/**
* Purge expired entries.
+ * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+ * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
*/
@Override
protected void purgeInternal() throws CacheLoaderException {
- log.trace("purgeInternal");
- // TODO: implement
+ if (trace)
+ log.trace("purgeInternal");
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ // We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
+ SlicePredicate predicate = new SlicePredicate();
+ predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+ for(boolean complete=false; !complete; ) {
+ // Get all columns
+ List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+ complete = slice.size() < SLICE_SIZE;
+ // Delete all keys returned by the slice
+ for(ColumnOrSuperColumn crumb : slice) {
+ SuperColumn scol = crumb.getSuper_column();
+ for(Iterator<Column> i = scol.getColumnsIterator(); i.hasNext(); ) {
+ Column col = i.next();
+ // Remove the entry row
+ remove0(new String(col.getName(), "UTF-8"), mutationMap);
+ }
+ // Remove the expiration supercolumn
+ addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+ }
+ }
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
}
@@ -413,11 +466,11 @@
return key.substring(ENTRY_KEY_PREFIX.length());
}
- public static String expirationColumn(long timestamp) {
- return String.format("expiration%013d", timestamp);
+ private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+ addMutation(mutationMap, key, columnFamily, null, column, value);
}
- private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+ private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] superColumn, byte[] column, byte[] value) {
Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
// If the key doesn't exist yet, create the mutation holder
if (keyMutations == null) {
@@ -433,24 +486,27 @@
if (value == null) { // Delete
Deletion deletion = new Deletion(System.currentTimeMillis());
+ if(superColumn!=null) {
+ deletion.setSuper_column(superColumn);
+ }
if (column != null) { // Single column delete
deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
- } // else Delete entire column family
+ } // else Delete entire column family or supercolumn
columnFamilyMutations.add(new Mutation().setDeletion(deletion));
} else { // Insert/update
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
- cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+ if(superColumn!=null) {
+ List<Column> columns = new ArrayList<Column>();
+ columns.add(new Column(column, value, System.currentTimeMillis()));
+ cosc.setSuper_column(new SuperColumn(superColumn, columns));
+ } else {
+ cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+ }
columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
}
}
- public static UUID getTimeBasedUUID(long timestamp) {
- long lsb = 0;
- long msb = 0;
- return new UUID(msb, lsb);
- }
-
- private static final byte[] longToBytes(long v) throws IOException {
+ private static final byte[] longToBytes(long v) {
byte b[] = new byte[8];
b[0] = (byte) (v >>> 56);
b[1] = (byte) (v >>> 48);
@@ -462,5 +518,4 @@
b[7] = (byte) (v >>> 0);
return b;
}
-
}
Modified: trunk/cachestore/cassandra/src/test/resources/storage-conf.xml
===================================================================
--- trunk/cachestore/cassandra/src/test/resources/storage-conf.xml 2010-09-26 16:26:47 UTC (rev 2435)
+++ trunk/cachestore/cassandra/src/test/resources/storage-conf.xml 2010-09-26 20:48:35 UTC (rev 2436)
@@ -79,7 +79,7 @@
-->
<ColumnFamily CompareWith="BytesType" Name="InfinispanEntries" KeysCached="10%" />
- <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" />
+ <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" ColumnType="Super" CompareSubcolumnsWith="BytesType"/>
<!--
~ Strategy: Setting this to the class that implements
@@ -343,5 +343,5 @@
~ elapsed, even in the face of hardware failures. The default value is
~ ten days.
-->
- <GCGraceSeconds>864000</GCGraceSeconds>
+ <GCGraceSeconds>86400</GCGraceSeconds>
</Storage>
More information about the infinispan-commits
mailing list