[infinispan-commits] Infinispan SVN: r2425 - trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Sep 22 06:49:55 EDT 2010
Author: NadirX
Date: 2010-09-22 06:49:55 -0400 (Wed, 22 Sep 2010)
New Revision: 2425
Modified:
trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
Log:
Use get_range_slices
Modified: trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-09-21 19:29:34 UTC (rev 2424)
+++ trunk/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-09-22 10:49:55 UTC (rev 2425)
@@ -21,6 +21,7 @@
import org.apache.cassandra.thrift.ColumnPath;
import org.apache.cassandra.thrift.ConsistencyLevel;
import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeyRange;
import org.apache.cassandra.thrift.KeySlice;
import org.apache.cassandra.thrift.Mutation;
import org.apache.cassandra.thrift.NotFoundException;
@@ -78,7 +79,7 @@
@Override
public void start() throws CacheLoaderException {
-
+
try {
pool = new ConnectionPool(config.getPoolProperties());
entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
@@ -132,26 +133,32 @@
slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
String startKey = "";
boolean complete = false;
+
// Get the keys in SLICE_SIZE blocks
int sliceSize = Math.min(SLICE_SIZE, numEntries);
while (!complete) {
- List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", sliceSize,
- ConsistencyLevel.ONE);
+ KeyRange keyRange = new KeyRange(sliceSize);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
if (keySlices.size() < sliceSize) {
+ // Cassandra has returned less keys than what we asked for. Assume we have finished
complete = true;
} else {
+ // Cassandra has returned exactly the amount of keys we asked for. Assume we need to cycle again starting from the last returned key (excluded)
startKey = keySlices.get(keySlices.size() - 1).getKey();
sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
- if(sliceSize==0) {
+ if (sliceSize == 0) {
complete = true;
}
}
+ // Cycle through all the keys
for (KeySlice keySlice : keySlices) {
String key = unhashKey(keySlice.getKey());
List<ColumnOrSuperColumn> columns = keySlice.getColumns();
- if(columns.size()>0) {
- log.debug("COLUMN = "+new String(columns.get(0).getColumn().getName()));
+ if (columns.size() > 0) {
+ log.debug("COLUMN = " + new String(columns.get(0).getColumn().getName()));
byte[] value = columns.get(0).getColumn().getValue();
InternalCacheEntry ice = unmarshall(value, key);
s.add(ice);
@@ -178,8 +185,10 @@
boolean complete = false;
// Get the keys in SLICE_SIZE blocks
while (!complete) {
- List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
- ConsistencyLevel.ONE);
+ KeyRange keyRange = new KeyRange(SLICE_SIZE);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -222,8 +231,10 @@
boolean complete = false;
// Get the keys in SLICE_SIZE blocks
while (!complete) {
- List<KeySlice> keySlices = cassandraClient.get_range_slice(config.keySpace, entryColumnParent, slicePredicate, startKey, "", SLICE_SIZE,
- ConsistencyLevel.ONE);
+ KeyRange keyRange = new KeyRange(SLICE_SIZE);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
if (keySlices.size() < SLICE_SIZE) {
complete = true;
} else {
@@ -281,14 +292,14 @@
return v.toInternalCacheEntry(key);
}
- public void store(InternalCacheEntry entry) throws CacheLoaderException {
+ public void store(InternalCacheEntry entry) throws CacheLoaderException {
Cassandra.Iface cassandraClient = null;
-
+
try {
cassandraClient = pool.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
store0(entry, mutationMap);
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
@@ -296,13 +307,14 @@
}
}
- private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
+ private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
Object key = entry.getKey();
-
+
String cassandraKey = CassandraCacheStore.hashKey(key);
addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
if (entry.canExpire()) {
addMutation(mutationMap, cassandraKey, config.expirationColumnFamily, longToBytes(entry.getExpiryTime()), emptyByteArray);
+
}
}
@@ -312,7 +324,7 @@
*/
public void toStream(ObjectOutput out) throws CacheLoaderException {
try {
- Set<InternalCacheEntry> loadAll = loadAll();
+ Set<InternalCacheEntry> loadAll = loadAll();
int count = 0;
for (InternalCacheEntry entry : loadAll) {
getMarshaller().objectToObjectStream(entry, out);
@@ -358,11 +370,11 @@
@Override
protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
Cassandra.Iface cassandraClient = null;
-
- try {
+
+ try {
cassandraClient = pool.getConnection();
Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
-
+
for (Modification m : mods) {
switch (m.getType()) {
case STORE:
@@ -378,14 +390,13 @@
throw new AssertionError();
}
}
-
- cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
} catch (Exception e) {
throw new CacheLoaderException(e);
} finally {
pool.release(cassandraClient);
}
-
}
@@ -395,13 +406,13 @@
}
public static String hashKey(Object key) {
- return ENTRY_KEY_PREFIX+key.toString();
+ return ENTRY_KEY_PREFIX + key.toString();
}
-
+
public static String unhashKey(String key) {
return key.substring(ENTRY_KEY_PREFIX.length());
}
-
+
public static String expirationColumn(long timestamp) {
return String.format("expiration%013d", timestamp);
}
@@ -426,13 +437,13 @@
deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
} // else Delete entire column family
columnFamilyMutations.add(new Mutation().setDeletion(deletion));
- } else { // Insert/update
+ } else { // Insert/update
ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
}
}
-
+
public static UUID getTimeBasedUUID(long timestamp) {
long lsb = 0;
long msb = 0;
More information about the infinispan-commits
mailing list