[infinispan-commits] Infinispan SVN: r2497 - in branches/4.2.x/cachestore: cassandra and 14 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Mon Oct 11 04:49:34 EDT 2010
Author: NadirX
Date: 2010-10-11 04:49:33 -0400 (Mon, 11 Oct 2010)
New Revision: 2497
Added:
branches/4.2.x/cachestore/cassandra/
branches/4.2.x/cachestore/cassandra/pom.xml
branches/4.2.x/cachestore/cassandra/src/
branches/4.2.x/cachestore/cassandra/src/main/
branches/4.2.x/cachestore/cassandra/src/main/java/
branches/4.2.x/cachestore/cassandra/src/main/java/org/
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
branches/4.2.x/cachestore/cassandra/src/main/resources/
branches/4.2.x/cachestore/cassandra/src/test/
branches/4.2.x/cachestore/cassandra/src/test/java/
branches/4.2.x/cachestore/cassandra/src/test/java/org/
branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/
branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/
branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/
branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
branches/4.2.x/cachestore/cassandra/src/test/resources/
branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties
branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
Log:
Bring in Cassandra Cache Store from trunk
Added: branches/4.2.x/cachestore/cassandra/pom.xml
===================================================================
--- branches/4.2.x/cachestore/cassandra/pom.xml (rev 0)
+++ branches/4.2.x/cachestore/cassandra/pom.xml 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.infinispan</groupId>
+ <artifactId>infinispan-cachestore-parent</artifactId>
+ <version>4.2.0-SNAPSHOT</version>
+ <relativePath>../pom.xml</relativePath>
+ </parent>
+ <artifactId>infinispan-cachestore-cassandra</artifactId>
+ <name>Infinispan CassandraCacheStore</name>
+ <description>Infinispan CassandraCacheStore module</description>
+
+ <properties>
+ <test.src.dir>src/test/java</test.src.dir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>net.dataforte.cassandra</groupId>
+ <artifactId>cassandra-connection-pool</artifactId>
+ <version>0.1.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-log4j12</artifactId>
+ <version>${version.slf4j}</version>
+ <scope>test</scope>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <testSourceDirectory>${test.src.dir}</testSourceDirectory>
+ <testResources>
+ <testResource>
+ <directory>src/test/resources</directory>
+ <filtering>true</filtering>
+ </testResource>
+ </testResources>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <version>2.4.3</version>
+ <configuration>
+ <forkMode>once</forkMode>
+ <parallel>false</parallel>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+ <repositories>
+ <repository>
+ <id>dataforte</id>
+ <url>http://www.dataforte.net/listing/maven/releases</url>
+ <snapshots>
+ <enabled>false</enabled>
+ </snapshots>
+ </repository>
+ </repositories>
+</project>
Property changes on: branches/4.2.x/cachestore/cassandra/pom.xml
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,524 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.dataforte.cassandra.pool.ConnectionPool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.AbstractCacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderMetadata;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.StreamingMarshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * A persistent <code>CacheLoader</code> based on Apache Cassandra project. See
+ * http://cassandra.apache.org/
+ *
+ * @author Tristan Tarrant
+ */
+ at CacheLoaderMetadata(configurationClass = CassandraCacheStoreConfig.class)
+public class CassandraCacheStore extends AbstractCacheStore {
+
+ private static final String ENTRY_KEY_PREFIX = "entry_";
+ private static final String ENTRY_COLUMN_NAME = "entry";
+ private static final String EXPIRATION_KEY = "expiration";
+ private static final int SLICE_SIZE = 100;
+ private static final Log log = LogFactory.getLog(CassandraCacheStore.class);
+ private static final boolean trace = log.isTraceEnabled();
+
+ private CassandraCacheStoreConfig config;
+
+ private ConnectionPool pool;
+
+ private ColumnPath entryColumnPath;
+ private ColumnParent entryColumnParent;
+ private ColumnParent expirationColumnParent;
+
+ static private byte emptyByteArray[] = {};
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return CassandraCacheStoreConfig.class;
+ }
+
+ @Override
+ public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
+ super.init(clc, cache, m);
+ this.config = (CassandraCacheStoreConfig) clc;
+ }
+
+ @Override
+ public void start() throws CacheLoaderException {
+
+ try {
+ pool = new ConnectionPool(config.getPoolProperties());
+ entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
+ entryColumnParent = new ColumnParent(config.entryColumnFamily);
+ expirationColumnParent = new ColumnParent(config.expirationColumnFamily);
+ } catch (Exception e) {
+ throw new ConfigurationException(e);
+ }
+
+ log.debug("cleaning up expired entries...");
+ purgeInternal();
+
+ log.debug("started");
+ super.start();
+ }
+
+ @Override
+ public InternalCacheEntry load(Object key) throws CacheLoaderException {
+ String hashKey = CassandraCacheStore.hashKey(key);
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+ InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
+ if (ice != null && ice.isExpired()) {
+ remove(key);
+ return null;
+ }
+ return ice;
+ } catch (NotFoundException nfe) {
+ log.debug("Key '{0}' not found", hashKey);
+ return null;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ @Override
+ public Set<InternalCacheEntry> loadAll() throws CacheLoaderException {
+ return load(Integer.MAX_VALUE);
+ }
+
+ @Override
+ public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+
+ // Get the keys in SLICE_SIZE blocks
+ int sliceSize = Math.min(SLICE_SIZE, numEntries);
+ for(boolean complete = false; !complete; ) {
+ KeyRange keyRange = new KeyRange(sliceSize);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+
+ // Cycle through all the keys
+ for (KeySlice keySlice : keySlices) {
+ String key = unhashKey(keySlice.getKey());
+ if(key==null) // Skip invalid keys
+ continue;
+ List<ColumnOrSuperColumn> columns = keySlice.getColumns();
+ if (columns.size() > 0) {
+ if (log.isDebugEnabled()) {
+ log.debug("Loading {0}", key);
+ }
+ byte[] value = columns.get(0).getColumn().getValue();
+ InternalCacheEntry ice = unmarshall(value, key);
+ s.add(ice);
+ } else if (log.isDebugEnabled()) {
+ log.debug("Skipping empty key {0}", key);
+ }
+ }
+ if (keySlices.size() < sliceSize) {
+ // Cassandra has returned less keys than what we asked for.
+ // Assume we have finished
+ complete = true;
+ } else {
+ // Cassandra has returned exactly the amount of keys we
+ // asked for. If we haven't reached the required quota yet,
+ // assume we need to cycle again starting from
+ // the last returned key (excluded)
+ sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+ if (sliceSize == 0) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+ }
+
+ }
+ return s;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Set<Object> s = new HashSet<Object>();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+ boolean complete = false;
+ // Get the keys in SLICE_SIZE blocks
+ while (!complete) {
+ KeyRange keyRange = new KeyRange(SLICE_SIZE);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ if (keySlices.size() < SLICE_SIZE) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+
+ for (KeySlice keySlice : keySlices) {
+ if(keySlice.getColumnsSize()>0) {
+ String key = unhashKey(keySlice.getKey());
+ if (key!=null && (keysToExclude == null || !keysToExclude.contains(key)))
+ s.add(key);
+ }
+ }
+ }
+ return s;
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ /**
+ * Closes all databases, ignoring exceptions, and nulls references to all
+ * database related information.
+ */
+ @Override
+ public void stop() {
+ pool.close();
+ }
+
+ @Override
+ public void clear() throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ SlicePredicate slicePredicate = new SlicePredicate();
+ slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+ String startKey = "";
+ boolean complete = false;
+ // Get the keys in SLICE_SIZE blocks
+ while (!complete) {
+ KeyRange keyRange = new KeyRange(SLICE_SIZE);
+ keyRange.setStart_token(startKey);
+ keyRange.setEnd_token("");
+ List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+ if (keySlices.size() < SLICE_SIZE) {
+ complete = true;
+ } else {
+ startKey = keySlices.get(keySlices.size() - 1).getKey();
+ }
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+ for (KeySlice keySlice : keySlices) {
+ String cassandraKey = keySlice.getKey();
+ remove0(cassandraKey, mutationMap);
+ }
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ALL);
+ }
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+
+ }
+
+ @Override
+ public boolean remove(Object key) throws CacheLoaderException {
+ if (trace)
+ log.trace("remove(\"{0}\") ", key);
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+ remove0(CassandraCacheStore.hashKey(key), mutationMap);
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ return true;
+ } catch (Exception e) {
+ log.error("Exception while removing " + key, e);
+ return false;
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ private void remove0(String key, Map<String, Map<String, List<Mutation>>> mutationMap) {
+ addMutation(mutationMap, key, config.entryColumnFamily, null, null);
+ }
+
+ private byte[] marshall(InternalCacheEntry entry) throws IOException {
+ return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
+ }
+
+ private InternalCacheEntry unmarshall(Object o, Object key) throws IOException, ClassNotFoundException {
+ if (o == null)
+ return null;
+ byte b[] = (byte[]) o;
+ InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
+ return v.toInternalCacheEntry(key);
+ }
+
+ public void store(InternalCacheEntry entry) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+
+ try {
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
+ store0(entry, mutationMap);
+
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+ }
+
+ private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
+ Object key = entry.getKey();
+ if (trace)
+ log.trace("store(\"{0}\") ", key);
+ String cassandraKey = CassandraCacheStore.hashKey(key);
+ addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
+ if (entry.canExpire()) {
+ addExpiryEntry(cassandraKey, entry.getExpiryTime(), mutationMap);
+ }
+ }
+
+ private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {
+ try {
+ addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+ } catch (Exception e) {
+ // Should not happen
+ }
+ }
+
+ /**
+ * Writes to a stream the number of entries (long) then the entries
+ * themselves.
+ */
+ public void toStream(ObjectOutput out) throws CacheLoaderException {
+ try {
+ Set<InternalCacheEntry> loadAll = loadAll();
+ int count = 0;
+ for (InternalCacheEntry entry : loadAll) {
+ getMarshaller().objectToObjectStream(entry, out);
+ count++;
+ }
+ getMarshaller().objectToObjectStream(null, out);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /**
+ * Reads from a stream the number of entries (long) then the entries
+ * themselves.
+ */
+ public void fromStream(ObjectInput in) throws CacheLoaderException {
+ try {
+ int count = 0;
+ while (true) {
+ count++;
+ InternalCacheEntry entry = (InternalCacheEntry) getMarshaller().objectFromObjectStream(in);
+ if (entry == null)
+ break;
+ store(entry);
+ }
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /**
+ * Purge expired entries.
+ * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+ * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
+ */
+ @Override
+ protected void purgeInternal() throws CacheLoaderException {
+ if (trace)
+ log.trace("purgeInternal");
+ Cassandra.Iface cassandraClient = null;
+ try {
+ cassandraClient = pool.getConnection();
+ // We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
+ SlicePredicate predicate = new SlicePredicate();
+ predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+ for(boolean complete=false; !complete; ) {
+ // Get all columns
+ List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+ complete = slice.size() < SLICE_SIZE;
+ // Delete all keys returned by the slice
+ for(ColumnOrSuperColumn crumb : slice) {
+ SuperColumn scol = crumb.getSuper_column();
+ for(Iterator<Column> i = scol.getColumnsIterator(); i.hasNext(); ) {
+ Column col = i.next();
+ // Remove the entry row
+ remove0(new String(col.getName(), "UTF-8"), mutationMap);
+ }
+ // Remove the expiration supercolumn
+ addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+ }
+ }
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+
+ }
+
+ @Override
+ protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
+ Cassandra.Iface cassandraClient = null;
+
+ try {
+ cassandraClient = pool.getConnection();
+ Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+ for (Modification m : mods) {
+ switch (m.getType()) {
+ case STORE:
+ store0(((Store) m).getStoredEntry(), mutationMap);
+ break;
+ case CLEAR:
+ clear();
+ break;
+ case REMOVE:
+ remove0(hashKey(((Remove) m).getKey()), mutationMap);
+ break;
+ default:
+ throw new AssertionError();
+ }
+ }
+
+ cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ } finally {
+ pool.release(cassandraClient);
+ }
+
+ }
+
+ @Override
+ public String toString() {
+ return "CassandraCacheStore";
+ }
+
+ private static String hashKey(Object key) {
+ return ENTRY_KEY_PREFIX + key.toString();
+ }
+
+ private static String unhashKey(String key) {
+ if(key.startsWith(ENTRY_KEY_PREFIX))
+ return key.substring(ENTRY_KEY_PREFIX.length());
+ else
+ return null;
+ }
+
+ private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+ addMutation(mutationMap, key, columnFamily, null, column, value);
+ }
+
+ private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] superColumn, byte[] column, byte[] value) {
+ Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
+ // If the key doesn't exist yet, create the mutation holder
+ if (keyMutations == null) {
+ keyMutations = new HashMap<String, List<Mutation>>();
+ mutationMap.put(key, keyMutations);
+ }
+ // If the columnfamily doesn't exist yet, create the mutation holder
+ List<Mutation> columnFamilyMutations = keyMutations.get(columnFamily);
+ if (columnFamilyMutations == null) {
+ columnFamilyMutations = new ArrayList<Mutation>();
+ keyMutations.put(columnFamily, columnFamilyMutations);
+ }
+
+ if (value == null) { // Delete
+ Deletion deletion = new Deletion(System.currentTimeMillis());
+ if(superColumn!=null) {
+ deletion.setSuper_column(superColumn);
+ }
+ if (column != null) { // Single column delete
+ deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
+ } // else Delete entire column family or supercolumn
+ columnFamilyMutations.add(new Mutation().setDeletion(deletion));
+ } else { // Insert/update
+ ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+ if(superColumn!=null) {
+ List<Column> columns = new ArrayList<Column>();
+ columns.add(new Column(column, value, System.currentTimeMillis()));
+ cosc.setSuper_column(new SuperColumn(superColumn, columns));
+ } else {
+ cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+ }
+ columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+ }
+ }
+
+ private static final byte[] longToBytes(long v) {
+ byte b[] = new byte[8];
+ b[0] = (byte) (v >>> 56);
+ b[1] = (byte) (v >>> 48);
+ b[2] = (byte) (v >>> 40);
+ b[3] = (byte) (v >>> 32);
+ b[4] = (byte) (v >>> 24);
+ b[5] = (byte) (v >>> 16);
+ b[6] = (byte) (v >>> 8);
+ b[7] = (byte) (v >>> 0);
+ return b;
+ }
+}
Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,334 @@
+package org.infinispan.loaders.cassandra;
+
+import net.dataforte.cassandra.pool.PoolProperties;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link CassandraCacheStore}.
+ */
+public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
+
+
+
+ /**
+ * @configRef desc="The Cassandra keyspace"
+ */
+ String keySpace = "Infinispan";
+
+ /**
+ * @configRef desc="The Cassandra column family for entries"
+ */
+ String entryColumnFamily = "InfinispanEntries";
+
+ /**
+ * @configRef desc="The Cassandra column family for expirations"
+ */
+ String expirationColumnFamily = "InfinispanExpiration";
+
+ PoolProperties poolProperties;
+
+ public CassandraCacheStoreConfig() {
+ setCacheLoaderClassName(CassandraCacheStore.class.getName());
+ poolProperties = new PoolProperties();
+ }
+
+ public String getKeySpace() {
+ return keySpace;
+ }
+
+ public void setKeySpace(String keySpace) {
+ this.keySpace = keySpace;
+ }
+
+ public String getEntryColumnFamily() {
+ return entryColumnFamily;
+ }
+
+ public void setEntryColumnFamily(String entryColumnFamily) {
+ this.entryColumnFamily = entryColumnFamily;
+ }
+
+ public String getExpirationColumnFamily() {
+ return expirationColumnFamily;
+ }
+
+ public void setExpirationColumnFamily(String expirationColumnFamily) {
+ this.expirationColumnFamily = expirationColumnFamily;
+ }
+
+ public PoolProperties getPoolProperties() {
+ return poolProperties;
+ }
+
+ public void setHost(String host) {
+ poolProperties.setHost(host);
+ }
+
+ public String getHost() {
+ return poolProperties.getHost();
+ }
+
+ public void setPort(int port) {
+ poolProperties.setPort(port);
+ }
+
+ public int getPort() {
+ return poolProperties.getPort();
+ }
+
+
+ public int getAbandonWhenPercentageFull() {
+ return poolProperties.getAbandonWhenPercentageFull();
+ }
+
+
+ public boolean getFramed() {
+ return poolProperties.getFramed();
+ }
+
+
+ public int getInitialSize() {
+ return poolProperties.getInitialSize();
+ }
+
+
+ public int getMaxActive() {
+ return poolProperties.getMaxActive();
+ }
+
+
+ public long getMaxAge() {
+ return poolProperties.getMaxAge();
+ }
+
+
+ public int getMaxIdle() {
+ return poolProperties.getMaxIdle();
+ }
+
+
+ public int getMaxWait() {
+ return poolProperties.getMaxWait();
+ }
+
+
+ public int getMinEvictableIdleTimeMillis() {
+ return poolProperties.getMinEvictableIdleTimeMillis();
+ }
+
+
+ public int getMinIdle() {
+ return poolProperties.getMinIdle();
+ }
+
+
+ public String getName() {
+ return poolProperties.getName();
+ }
+
+
+ public int getNumTestsPerEvictionRun() {
+ return poolProperties.getNumTestsPerEvictionRun();
+ }
+
+
+ public String getPassword() {
+ return poolProperties.getPassword();
+ }
+
+ public int getRemoveAbandonedTimeout() {
+ return poolProperties.getRemoveAbandonedTimeout();
+ }
+
+
+ public int getSuspectTimeout() {
+ return poolProperties.getSuspectTimeout();
+ }
+
+
+ public int getTimeBetweenEvictionRunsMillis() {
+ return poolProperties.getTimeBetweenEvictionRunsMillis();
+ }
+
+
+ public boolean getUseLock() {
+ return poolProperties.getUseLock();
+ }
+
+
+ public String getUsername() {
+ return poolProperties.getUsername();
+ }
+
+
+ public long getValidationInterval() {
+ return poolProperties.getValidationInterval();
+ }
+
+ public boolean isFairQueue() {
+ return poolProperties.isFairQueue();
+ }
+
+
+ public boolean isJmxEnabled() {
+ return poolProperties.isJmxEnabled();
+ }
+
+
+ public boolean isLogAbandoned() {
+ return poolProperties.isLogAbandoned();
+ }
+
+
+ public boolean isRemoveAbandoned() {
+ return poolProperties.isRemoveAbandoned();
+ }
+
+
+ public boolean isTestOnBorrow() {
+ return poolProperties.isTestOnBorrow();
+ }
+
+
+ public boolean isTestOnConnect() {
+ return poolProperties.isTestOnConnect();
+ }
+
+
+ public boolean isTestOnReturn() {
+ return poolProperties.isTestOnReturn();
+ }
+
+
+ public boolean isTestWhileIdle() {
+ return poolProperties.isTestWhileIdle();
+ }
+
+
+ public void setAbandonWhenPercentageFull(int percentage) {
+ poolProperties.setAbandonWhenPercentageFull(percentage);
+
+ }
+
+ public void setFairQueue(boolean fairQueue) {
+ poolProperties.setFairQueue(fairQueue);
+
+ }
+
+
+ public void setFramed(boolean framed) {
+ poolProperties.setFramed(framed);
+
+ }
+
+
+ public void setInitialSize(int initialSize) {
+ poolProperties.setInitialSize(initialSize);
+
+ }
+
+
+ public void setJmxEnabled(boolean jmxEnabled) {
+ poolProperties.setJmxEnabled(jmxEnabled);
+ }
+
+
+ public void setLogAbandoned(boolean logAbandoned) {
+ poolProperties.setLogAbandoned(logAbandoned);
+ }
+
+
+ public void setMaxActive(int maxActive) {
+ poolProperties.setMaxActive(maxActive);
+
+ }
+
+
+ public void setMaxAge(long maxAge) {
+ poolProperties.setMaxAge(maxAge);
+
+ }
+
+
+ public void setMaxIdle(int maxIdle) {
+ poolProperties.setMaxIdle(maxIdle);
+
+ }
+
+
+ public void setMaxWait(int maxWait) {
+ poolProperties.setMaxWait(maxWait);
+
+ }
+
+
+ public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
+ poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+
+ }
+
+ public void setMinIdle(int minIdle) {
+ poolProperties.setMinIdle(minIdle);
+
+ }
+
+ public void setName(String name) {
+ poolProperties.setName(name);
+ }
+
+ public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
+ poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
+
+ }
+
+ public void setPassword(String password) {
+ poolProperties.setPassword(password);
+ }
+
+ public void setRemoveAbandoned(boolean removeAbandoned) {
+ poolProperties.setRemoveAbandoned(removeAbandoned);
+ }
+
+
+ public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
+ poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+
+ }
+
+ public void setSuspectTimeout(int seconds) {
+ poolProperties.setSuspectTimeout(seconds);
+
+ }
+
+ public void setTestOnBorrow(boolean testOnBorrow) {
+ poolProperties.setTestOnBorrow(testOnBorrow);
+
+ }
+
+ public void setTestOnConnect(boolean testOnConnect) {
+ poolProperties.setTestOnConnect(testOnConnect);
+
+ }
+
+ public void setTestOnReturn(boolean testOnReturn) {
+ poolProperties.setTestOnReturn(testOnReturn);
+ }
+
+ public void setTestWhileIdle(boolean testWhileIdle) {
+ poolProperties.setTestWhileIdle(testWhileIdle);
+ }
+
+ public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
+ poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+
+ }
+
+ public void setUsername(String username) {
+ poolProperties.setUsername(username);
+ }
+
+ public void setValidationInterval(long validationInterval) {
+ poolProperties.setValidationInterval(validationInterval);
+ }
+}
Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,5 @@
+/**
+ * This package contains a {@link org.infinispan.loaders.CacheStore} implementation based on
+ * persisting to JDBM.
+ */
+package org.infinispan.loaders.cassandra;
\ No newline at end of file
Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,53 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.thrift.transport.TTransportException;
+import org.infinispan.loaders.BaseCacheStoreTest;
+import org.infinispan.loaders.CacheStore;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+ at Test(groups = "unit", testName = "loaders.cassandra.CassandraCacheStoreTest")
+public class CassandraCacheStoreTest extends BaseCacheStoreTest {
+ private static EmbeddedCassandraService cassandra;
+
+ /**
+ * Set embedded cassandra up and spawn it in a new thread.
+ *
+ * @throws TTransportException
+ * @throws IOException
+ * @throws InterruptedException
+ */
+ @BeforeClass
+ public static void setup() throws TTransportException, IOException, InterruptedException {
+ // Tell cassandra where the configuration files are.
+ // Use the test configuration file.
+ URL resource = Thread.currentThread().getContextClassLoader().getResource("storage-conf.xml");
+ String configPath = resource.getPath().substring(0, resource.getPath().lastIndexOf(File.separatorChar));
+
+ System.setProperty("storage-config", configPath);
+
+ CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+ cleaner.prepare();
+ cassandra = new EmbeddedCassandraService();
+ cassandra.init();
+ Thread t = new Thread(cassandra);
+ t.setDaemon(true);
+ t.start();
+ }
+
+ @Override
+ protected CacheStore createCacheStore() throws Exception {
+ CassandraCacheStore cs = new CassandraCacheStore();
+ CassandraCacheStoreConfig clc = new CassandraCacheStoreConfig();
+ clc.setHost("localhost");
+ cs.init(clc, getCache(), getMarshaller());
+ cs.start();
+ return cs;
+ }
+
+}
Property changes on: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,72 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.infinispan.test.TestingUtil;
+
+public class CassandraServiceDataCleaner {
+ /**
+ * Creates all data dir if they don't exist and cleans them
+ *
+ * @throws IOException
+ */
+ public void prepare() throws IOException {
+ makeDirsIfNotExist();
+ cleanupDataDirectories();
+ }
+
+ /**
+ * Deletes all data from cassandra data directories, including the commit
+ * log.
+ *
+ * @throws IOException
+ * in case of permissions error etc.
+ */
+ public void cleanupDataDirectories() throws IOException {
+ for (String s : getDataDirs()) {
+ TestingUtil.recursiveFileRemove(s);
+ }
+ }
+
+ /**
+ * Creates the data diurectories, if they didn't exist.
+ *
+ * @throws IOException
+ * if directories cannot be created (permissions etc).
+ */
+ public void makeDirsIfNotExist() throws IOException {
+ for (String s : getDataDirs()) {
+ mkdir(s);
+ }
+ }
+
+ /**
+ * Collects all data dirs and returns a set of String paths on the file
+ * system.
+ *
+ * @return
+ */
+ private Set<String> getDataDirs() {
+ Set<String> dirs = new HashSet<String>();
+ for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+ dirs.add(s);
+ }
+ dirs.add(DatabaseDescriptor.getLogFileLocation());
+ return dirs;
+ }
+
+ /**
+ * Creates a directory
+ *
+ * @param dir
+ * @throws IOException
+ */
+ private void mkdir(String dir) throws IOException {
+ FileUtils.createDirectory(dir);
+ }
+
+}
Property changes on: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,6 @@
+log4j.rootCategory=INFO, STDOUT
+
+log4j.appender.STDOUT = org.apache.log4j.ConsoleAppender
+log4j.appender.STDOUT.layout = org.apache.log4j.PatternLayout
+log4j.appender.STDOUT.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
+
Added: branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml 2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,347 @@
+<Storage>
+ <!--======================================================================-->
+ <!-- Basic Configuration -->
+ <!--======================================================================-->
+
+ <!--
+ ~ The name of this cluster. This is mainly used to prevent machines in
+ ~ one logical cluster from joining another.
+ -->
+ <ClusterName>Infinispan</ClusterName>
+
+ <!--
+ ~ Turn on to make new [non-seed] nodes automatically migrate the right data
+ ~ to themselves. (If no InitialToken is specified, they will pick one
+ ~ such that they will get half the range of the most-loaded node.)
+ ~ If a node starts up without bootstrapping, it will mark itself bootstrapped
+ ~ so that you can't subsequently accidently bootstrap a node with
+ ~ data on it. (You can reset this by wiping your data and commitlog
+ ~ directories.)
+ ~
+ ~ Off by default so that new clusters and upgraders from 0.4 don't
+ ~ bootstrap immediately. You should turn this on when you start adding
+ ~ new nodes to a cluster that already has data on it. (If you are upgrading
+ ~ from 0.4, start your cluster with it off once before changing it to true.
+ ~ Otherwise, no data will be lost but you will incur a lot of unnecessary
+ ~ I/O before your cluster starts up.)
+ -->
+ <AutoBootstrap>false</AutoBootstrap>
+
+ <!--
+ ~ Keyspaces and ColumnFamilies:
+ ~ A ColumnFamily is the Cassandra concept closest to a relational
+ ~ table. Keyspaces are separate groups of ColumnFamilies. Except in
+ ~ very unusual circumstances you will have one Keyspace per application.
+
+ ~ There is an implicit keyspace named 'system' for Cassandra internals.
+ -->
+ <Keyspaces>
+ <Keyspace Name="Infinispan">
+ <!--
+ ~ ColumnFamily definitions have one required attribute (Name)
+ ~ and several optional ones.
+ ~
+ ~ The CompareWith attribute tells Cassandra how to sort the columns
+ ~ for slicing operations. The default is BytesType, which is a
+ ~ straightforward lexical comparison of the bytes in each column.
+ ~ Other options are AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType,
+ ~ and LongType. You can also specify the fully-qualified class
+ ~ name to a class of your choice extending
+ ~ org.apache.cassandra.db.marshal.AbstractType.
+ ~
+ ~ SuperColumns have a similar CompareSubcolumnsWith attribute.
+ ~
+ ~ BytesType: Simple sort by byte value. No validation is performed.
+ ~ AsciiType: Like BytesType, but validates that the input can be
+ ~ parsed as US-ASCII.
+ ~ UTF8Type: A string encoded as UTF8
+ ~ LongType: A 64bit long
+ ~ LexicalUUIDType: A 128bit UUID, compared lexically (by byte value)
+ ~ TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
+ ~
+ ~ (To get the closest approximation to 0.3-style supercolumns, you
+ ~ would use CompareWith=UTF8Type CompareSubcolumnsWith=LongType.)
+ ~
+ ~ An optional `Comment` attribute may be used to attach additional
+ ~ human-readable information about the column family to its definition.
+ ~
+ ~ The optional KeysCached attribute specifies
+ ~ the number of keys per sstable whose locations we keep in
+ ~ memory in "mostly LRU" order. (JUST the key locations, NOT any
+ ~ column values.) Specify a fraction (value less than 1), a percentage
+ ~ (ending in a % sign) or an absolute number of keys to cache.
+ ~
+ ~ The optional RowsCached attribute specifies the number of rows
+ ~ whose entire contents we cache in memory. Do not use this on
+ ~ ColumnFamilies with large rows, or ColumnFamilies with high write:read
+ ~ ratios. Specify a fraction (value less than 1), a percentage (ending in
+ ~ a % sign) or an absolute number of rows to cache.
+ -->
+
+ <ColumnFamily CompareWith="BytesType" Name="InfinispanEntries" KeysCached="10%" />
+ <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" ColumnType="Super" CompareSubcolumnsWith="BytesType"/>
+
+ <!--
+ ~ Strategy: Setting this to the class that implements
+ ~ IReplicaPlacementStrategy will change the way the node picker works.
+ ~ Out of the box, Cassandra provides
+ ~ org.apache.cassandra.locator.RackUnawareStrategy and
+ ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
+ ~ a different datacenter, and the others on different racks in the same
+ ~ one.)
+ -->
+ <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+
+ <!-- Number of replicas of the data -->
+ <ReplicationFactor>1</ReplicationFactor>
+
+ <!--
+ ~ EndPointSnitch: Setting this to the class that implements
+ ~ AbstractEndpointSnitch, which lets Cassandra know enough
+ ~ about your network topology to route requests efficiently.
+ ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
+ ~ and PropertyFileEndPointSnitch is available in contrib/.
+ -->
+ <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+
+ </Keyspace>
+ </Keyspaces>
+
+ <!--
+ ~ Authenticator: any IAuthenticator may be used, including your own as long
+ ~ as it is on the classpath. Out of the box, Cassandra provides
+ ~ org.apache.cassandra.auth.AllowAllAuthenticator and,
+ ~ org.apache.cassandra.auth.SimpleAuthenticator
+ ~ (SimpleAuthenticator uses access.properties and passwd.properties by
+ ~ default).
+ ~
+ ~ If you don't specify an authenticator, AllowAllAuthenticator is used.
+ -->
+ <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
+
+ <!--
+ ~ Partitioner: any IPartitioner may be used, including your own as long
+ ~ as it is on the classpath. Out of the box, Cassandra provides
+ ~ org.apache.cassandra.dht.RandomPartitioner,
+ ~ org.apache.cassandra.dht.OrderPreservingPartitioner, and
+ ~ org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+ ~ (CollatingOPP colates according to EN,US rules, not naive byte
+ ~ ordering. Use this as an example if you need locale-aware collation.)
+ ~ Range queries require using an order-preserving partitioner.
+ ~
+ ~ Achtung! Changing this parameter requires wiping your data
+ ~ directories, since the partitioner can modify the sstable on-disk
+ ~ format.
+ -->
+ <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
+
+ <!--
+ ~ If you are using an order-preserving partitioner and you know your key
+ ~ distribution, you can specify the token for this node to use. (Keys
+ ~ are sent to the node with the "closest" token, so distributing your
+ ~ tokens equally along the key distribution space will spread keys
+ ~ evenly across your cluster.) This setting is only checked the first
+ ~ time a node is started.
+
+ ~ This can also be useful with RandomPartitioner to force equal spacing
+ ~ of tokens around the hash space, especially for clusters with a small
+ ~ number of nodes.
+ -->
+ <InitialToken></InitialToken>
+
+ <!--
+ ~ Directories: Specify where Cassandra should store different data on
+ ~ disk. Keep the data disks and the CommitLog disks separate for best
+ ~ performance
+ -->
+ <CommitLogDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/commitlog</CommitLogDirectory>
+ <DataFileDirectories>
+ <DataFileDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/data</DataFileDirectory>
+ </DataFileDirectories>
+ <CalloutLocation>${java.io.tmpdir}/infinispan-cassandra-cachestore/callouts</CalloutLocation>
+ <StagingFileDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/staging</StagingFileDirectory>
+
+
+ <!--
+ ~ Addresses of hosts that are deemed contact points. Cassandra nodes
+ ~ use this list of hosts to find each other and learn the topology of
+ ~ the ring. You must change this if you are running multiple nodes!
+ -->
+ <Seeds>
+ <Seed>127.0.0.1</Seed>
+ </Seeds>
+
+
+ <!-- Miscellaneous -->
+
+ <!-- Time to wait for a reply from other nodes before failing the command -->
+ <RpcTimeoutInMillis>10000</RpcTimeoutInMillis>
+ <!-- Size to allow commitlog to grow to before creating a new segment -->
+ <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
+
+
+ <!-- Local hosts and ports -->
+
+ <!--
+ ~ Address to bind to and tell other nodes to connect to. You _must_
+ ~ change this if you want multiple nodes to be able to communicate!
+ ~
+ ~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
+ ~ will always do the Right Thing *if* the node is properly configured
+ ~ (hostname, name resolution, etc), and the Right Thing is to use the
+ ~ address associated with the hostname (it might not be).
+ -->
+ <ListenAddress></ListenAddress>
+ <!-- internal communications port -->
+ <StoragePort>7000</StoragePort>
+
+ <!--
+ ~ The address to bind the Thrift RPC service to. Unlike ListenAddress
+ ~ above, you *can* specify 0.0.0.0 here if you want Thrift to listen on
+ ~ all interfaces.
+ ~
+ ~ Leaving this blank has the same effect it does for ListenAddress,
+ ~ (i.e. it will be based on the configured hostname of the node).
+ -->
+ <ThriftAddress>localhost</ThriftAddress>
+ <!-- Thrift RPC port (the port clients connect to). -->
+ <ThriftPort>9160</ThriftPort>
+ <!--
+ ~ Whether or not to use a framed transport for Thrift. If this option
+ ~ is set to true then you must also use a framed transport on the
+ ~ client-side, (framed and non-framed transports are not compatible).
+ -->
+ <ThriftFramedTransport>false</ThriftFramedTransport>
+
+
+ <!--======================================================================-->
+ <!-- Memory, Disk, and Performance -->
+ <!--======================================================================-->
+
+ <!--
+ ~ Access mode. mmapped i/o is substantially faster, but only practical on
+ ~ a 64bit machine (which notably does not include EC2 "small" instances)
+ ~ or relatively small datasets. "auto", the safe choice, will enable
+ ~ mmapping on a 64bit JVM. Other values are "mmap", "mmap_index_only"
+ ~ (which may allow you to get part of the benefits of mmap on a 32bit
+ ~ machine by mmapping only index files) and "standard".
+ ~ (The buffer size settings that follow only apply to standard,
+ ~ non-mmapped i/o.)
+ -->
+ <DiskAccessMode>auto</DiskAccessMode>
+
+ <!--
+ ~ Size of compacted row above which to log a warning. (If compacted
+ ~ rows do not fit in memory, Cassandra will crash. This is explained
+ ~ in http://wiki.apache.org/cassandra/CassandraLimitations and is
+ ~ scheduled to be fixed in 0.7.)
+ -->
+ <RowWarningThresholdInMB>512</RowWarningThresholdInMB>
+
+ <!--
+ ~ Buffer size to use when performing contiguous column slices. Increase
+ ~ this to the size of the column slices you typically perform.
+ ~ (Name-based queries are performed with a buffer size of
+ ~ ColumnIndexSizeInKB.)
+ -->
+ <SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
+
+ <!--
+ ~ Buffer size to use when flushing memtables to disk. (Only one
+ ~ memtable is ever flushed at a time.) Increase (decrease) the index
+ ~ buffer size relative to the data buffer if you have few (many)
+ ~ columns per key. Bigger is only better _if_ your memtables get large
+ ~ enough to use the space. (Check in your data directory after your
+ ~ app has been running long enough.) -->
+ <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
+ <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
+
+ <!--
+ ~ Add column indexes to a row after its contents reach this size.
+ ~ Increase if your column values are large, or if you have a very large
+ ~ number of columns. The competing causes are, Cassandra has to
+ ~ deserialize this much of the row to read a single column, so you want
+ ~ it to be small - at least if you do many partial-row reads - but all
+ ~ the index data is read for each access, so you don't want to generate
+ ~ that wastefully either.
+ -->
+ <ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
+
+ <!--
+ ~ Flush memtable after this much data has been inserted, including
+ ~ overwritten data. There is one memtable per column family, and
+ ~ this threshold is based solely on the amount of data stored, not
+ ~ actual heap memory usage (there is some overhead in indexing the
+ ~ columns).
+ -->
+ <MemtableThroughputInMB>64</MemtableThroughputInMB>
+ <!--
+ ~ Throughput setting for Binary Memtables. Typically these are
+ ~ used for bulk load so you want them to be larger.
+ -->
+ <BinaryMemtableThroughputInMB>256</BinaryMemtableThroughputInMB>
+ <!--
+ ~ The maximum number of columns in millions to store in memory per
+ ~ ColumnFamily before flushing to disk. This is also a per-memtable
+ ~ setting. Use with MemtableThroughputInMB to tune memory usage.
+ -->
+ <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
+ <!--
+ ~ The maximum time to leave a dirty memtable unflushed.
+ ~ (While any affected columnfamilies have unflushed data from a
+ ~ commit log segment, that segment cannot be deleted.)
+ ~ This needs to be large enough that it won't cause a flush storm
+ ~ of all your memtables flushing at once because none has hit
+ ~ the size or count thresholds yet. For production, a larger
+ ~ value such as 1440 is recommended.
+ -->
+ <MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
+
+ <!--
+ ~ Unlike most systems, in Cassandra writes are faster than reads, so
+ ~ you can afford more of those in parallel. A good rule of thumb is 2
+ ~ concurrent reads per processor core. Increase ConcurrentWrites to
+ ~ the number of clients writing at once if you enable CommitLogSync +
+ ~ CommitLogSyncDelay. -->
+ <ConcurrentReads>8</ConcurrentReads>
+ <ConcurrentWrites>32</ConcurrentWrites>
+
+ <!--
+ ~ CommitLogSync may be either "periodic" or "batch." When in batch
+ ~ mode, Cassandra won't ack writes until the commit log has been
+ ~ fsynced to disk. It will wait up to CommitLogSyncBatchWindowInMS
+ ~ milliseconds for other writes, before performing the sync.
+
+ ~ This is less necessary in Cassandra than in traditional databases
+ ~ since replication reduces the odds of losing data from a failure
+ ~ after writing the log entry but before it actually reaches the disk.
+ ~ So the other option is "timed," where writes may be acked immediately
+ ~ and the CommitLog is simply synced every CommitLogSyncPeriodInMS
+ ~ milliseconds.
+ -->
+ <CommitLogSync>periodic</CommitLogSync>
+ <!--
+ ~ Interval at which to perform syncs of the CommitLog in periodic mode.
+ ~ Usually the default of 10000ms is fine; increase it if your i/o
+ ~ load is such that syncs are taking excessively long times.
+ -->
+ <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
+ <!--
+ ~ Delay (in milliseconds) during which additional commit log entries
+ ~ may be written before fsync in batch mode. This will increase
+ ~ latency slightly, but can vastly improve throughput where there are
+ ~ many writers. Set to zero to disable (each entry will be synced
+ ~ individually). Reasonable values range from a minimal 0.1 to 10 or
+ ~ even more if throughput matters more than latency.
+ -->
+ <!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
+
+ <!--
+ ~ Time to wait before garbage-collection deletion markers. Set this to
+ ~ a large enough value that you are confident that the deletion marker
+ ~ will be propagated to all replicas by the time this many seconds has
+ ~ elapsed, even in the face of hardware failures. The default value is
+ ~ ten days.
+ -->
+ <GCGraceSeconds>86400</GCGraceSeconds>
+</Storage>
Property changes on: branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
More information about the infinispan-commits
mailing list