[infinispan-commits] Infinispan SVN: r2497 - in branches/4.2.x/cachestore: cassandra and 14 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Mon Oct 11 04:49:34 EDT 2010


Author: NadirX
Date: 2010-10-11 04:49:33 -0400 (Mon, 11 Oct 2010)
New Revision: 2497

Added:
   branches/4.2.x/cachestore/cassandra/
   branches/4.2.x/cachestore/cassandra/pom.xml
   branches/4.2.x/cachestore/cassandra/src/
   branches/4.2.x/cachestore/cassandra/src/main/
   branches/4.2.x/cachestore/cassandra/src/main/java/
   branches/4.2.x/cachestore/cassandra/src/main/java/org/
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
   branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
   branches/4.2.x/cachestore/cassandra/src/main/resources/
   branches/4.2.x/cachestore/cassandra/src/test/
   branches/4.2.x/cachestore/cassandra/src/test/java/
   branches/4.2.x/cachestore/cassandra/src/test/java/org/
   branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/
   branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/
   branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/
   branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
   branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
   branches/4.2.x/cachestore/cassandra/src/test/resources/
   branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties
   branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
Log:
Bring in Cassandra Cache Store from trunk

Added: branches/4.2.x/cachestore/cassandra/pom.xml
===================================================================
--- branches/4.2.x/cachestore/cassandra/pom.xml	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/pom.xml	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,63 @@
+<?xml version="1.0"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+	<parent>
+		<groupId>org.infinispan</groupId>
+		<artifactId>infinispan-cachestore-parent</artifactId>
+		<version>4.2.0-SNAPSHOT</version>
+		<relativePath>../pom.xml</relativePath>
+	</parent>
+	<artifactId>infinispan-cachestore-cassandra</artifactId>
+	<name>Infinispan CassandraCacheStore</name>
+	<description>Infinispan CassandraCacheStore module</description>
+
+	<properties>
+		<test.src.dir>src/test/java</test.src.dir>
+	</properties>
+
+	<dependencies>
+		<dependency>
+			<groupId>net.dataforte.cassandra</groupId>
+			<artifactId>cassandra-connection-pool</artifactId>
+			<version>0.1.0</version>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-log4j12</artifactId>
+			<version>${version.slf4j}</version>
+			<scope>test</scope>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<testSourceDirectory>${test.src.dir}</testSourceDirectory>
+		<testResources>
+			<testResource>
+				<directory>src/test/resources</directory>
+				<filtering>true</filtering>
+			</testResource>
+		</testResources>
+		<plugins>
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-surefire-plugin</artifactId>
+				<version>2.4.3</version>
+				<configuration>
+					<forkMode>once</forkMode>
+					<parallel>false</parallel>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+
+	<repositories>
+		<repository>
+			<id>dataforte</id>
+			<url>http://www.dataforte.net/listing/maven/releases</url>
+			<snapshots>
+				<enabled>false</enabled>
+			</snapshots>
+		</repository>
+	</repositories>
+</project>


Property changes on: branches/4.2.x/cachestore/cassandra/pom.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,524 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import net.dataforte.cassandra.pool.ConnectionPool;
+
+import org.apache.cassandra.thrift.Cassandra;
+import org.apache.cassandra.thrift.Column;
+import org.apache.cassandra.thrift.ColumnOrSuperColumn;
+import org.apache.cassandra.thrift.ColumnParent;
+import org.apache.cassandra.thrift.ColumnPath;
+import org.apache.cassandra.thrift.ConsistencyLevel;
+import org.apache.cassandra.thrift.Deletion;
+import org.apache.cassandra.thrift.KeyRange;
+import org.apache.cassandra.thrift.KeySlice;
+import org.apache.cassandra.thrift.Mutation;
+import org.apache.cassandra.thrift.NotFoundException;
+import org.apache.cassandra.thrift.SlicePredicate;
+import org.apache.cassandra.thrift.SliceRange;
+import org.apache.cassandra.thrift.SuperColumn;
+import org.infinispan.Cache;
+import org.infinispan.config.ConfigurationException;
+import org.infinispan.container.entries.InternalCacheEntry;
+import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.AbstractCacheStore;
+import org.infinispan.loaders.CacheLoaderConfig;
+import org.infinispan.loaders.CacheLoaderException;
+import org.infinispan.loaders.CacheLoaderMetadata;
+import org.infinispan.loaders.modifications.Modification;
+import org.infinispan.loaders.modifications.Remove;
+import org.infinispan.loaders.modifications.Store;
+import org.infinispan.marshall.StreamingMarshaller;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
+
+/**
+ * A persistent <code>CacheLoader</code> based on Apache Cassandra project. See
+ * http://cassandra.apache.org/
+ * 
+ * @author Tristan Tarrant
+ */
+ at CacheLoaderMetadata(configurationClass = CassandraCacheStoreConfig.class)
+public class CassandraCacheStore extends AbstractCacheStore {
+
+	private static final String ENTRY_KEY_PREFIX = "entry_";
+	private static final String ENTRY_COLUMN_NAME = "entry";
+	private static final String EXPIRATION_KEY = "expiration";
+	private static final int SLICE_SIZE = 100;
+	private static final Log log = LogFactory.getLog(CassandraCacheStore.class);
+	private static final boolean trace = log.isTraceEnabled();
+
+	private CassandraCacheStoreConfig config;
+
+	private ConnectionPool pool;
+
+	private ColumnPath entryColumnPath;
+	private ColumnParent entryColumnParent;
+	private ColumnParent expirationColumnParent;
+
+	static private byte emptyByteArray[] = {};
+
+	public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+		return CassandraCacheStoreConfig.class;
+	}
+
+	@Override
+	public void init(CacheLoaderConfig clc, Cache<?, ?> cache, StreamingMarshaller m) throws CacheLoaderException {
+		super.init(clc, cache, m);
+		this.config = (CassandraCacheStoreConfig) clc;
+	}
+
+	@Override
+	public void start() throws CacheLoaderException {
+
+		try {
+			pool = new ConnectionPool(config.getPoolProperties());
+			entryColumnPath = new ColumnPath(config.entryColumnFamily).setColumn(ENTRY_COLUMN_NAME.getBytes("UTF-8"));
+			entryColumnParent = new ColumnParent(config.entryColumnFamily);
+			expirationColumnParent = new ColumnParent(config.expirationColumnFamily);			
+		} catch (Exception e) {
+			throw new ConfigurationException(e);
+		}
+
+		log.debug("cleaning up expired entries...");
+		purgeInternal();
+
+		log.debug("started");
+		super.start();
+	}
+
+	@Override
+	public InternalCacheEntry load(Object key) throws CacheLoaderException {
+		String hashKey = CassandraCacheStore.hashKey(key);
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			ColumnOrSuperColumn column = cassandraClient.get(config.keySpace, hashKey, entryColumnPath, ConsistencyLevel.ONE);
+			InternalCacheEntry ice = unmarshall(column.getColumn().getValue(), key);
+			if (ice != null && ice.isExpired()) {
+				remove(key);
+				return null;
+			}
+			return ice;
+		} catch (NotFoundException nfe) {
+			log.debug("Key '{0}' not found", hashKey);
+			return null;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	@Override
+	public Set<InternalCacheEntry> loadAll() throws CacheLoaderException {
+		return load(Integer.MAX_VALUE);
+	}
+
+	@Override
+	public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Set<InternalCacheEntry> s = new HashSet<InternalCacheEntry>();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+
+			// Get the keys in SLICE_SIZE blocks
+			int sliceSize = Math.min(SLICE_SIZE, numEntries);
+			for(boolean complete = false; !complete; ) {
+				KeyRange keyRange = new KeyRange(sliceSize);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);				
+
+				// Cycle through all the keys
+				for (KeySlice keySlice : keySlices) {
+					String key = unhashKey(keySlice.getKey());
+					if(key==null) // Skip invalid keys
+						continue;
+					List<ColumnOrSuperColumn> columns = keySlice.getColumns();
+					if (columns.size() > 0) {
+						if (log.isDebugEnabled()) {
+							log.debug("Loading {0}", key);
+						}
+						byte[] value = columns.get(0).getColumn().getValue();
+						InternalCacheEntry ice = unmarshall(value, key);
+						s.add(ice);
+					} else if (log.isDebugEnabled()) {
+						log.debug("Skipping empty key {0}", key);
+					}
+				}
+				if (keySlices.size() < sliceSize) {
+					// Cassandra has returned less keys than what we asked for.
+					// Assume we have finished
+					complete = true;
+				} else {
+					// Cassandra has returned exactly the amount of keys we
+					// asked for. If we haven't reached the required quota yet, 
+					// assume we need to cycle again starting from
+					// the last returned key (excluded)					
+					sliceSize = Math.min(SLICE_SIZE, numEntries - s.size());
+					if (sliceSize == 0) {
+						complete = true;
+					} else {
+						startKey = keySlices.get(keySlices.size() - 1).getKey();
+					}
+				}
+				
+			}
+			return s;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	@Override
+	public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Set<Object> s = new HashSet<Object>();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+			boolean complete = false;
+			// Get the keys in SLICE_SIZE blocks
+			while (!complete) {
+				KeyRange keyRange = new KeyRange(SLICE_SIZE);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				if (keySlices.size() < SLICE_SIZE) {
+					complete = true;
+				} else {
+					startKey = keySlices.get(keySlices.size() - 1).getKey();
+				}
+
+				for (KeySlice keySlice : keySlices) {
+					if(keySlice.getColumnsSize()>0) {
+						String key = unhashKey(keySlice.getKey());
+						if (key!=null && (keysToExclude == null || !keysToExclude.contains(key)))
+							s.add(key);
+					}
+				}
+			}
+			return s;
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	/**
+	 * Closes all databases, ignoring exceptions, and nulls references to all
+	 * database related information.
+	 */
+	@Override
+	public void stop() {
+		pool.close();
+	}
+
+	@Override
+	public void clear() throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			SlicePredicate slicePredicate = new SlicePredicate();
+			slicePredicate.setSlice_range(new SliceRange(entryColumnPath.getColumn(), emptyByteArray, false, 1));
+			String startKey = "";
+			boolean complete = false;
+			// Get the keys in SLICE_SIZE blocks
+			while (!complete) {
+				KeyRange keyRange = new KeyRange(SLICE_SIZE);
+				keyRange.setStart_token(startKey);
+				keyRange.setEnd_token("");
+				List<KeySlice> keySlices = cassandraClient.get_range_slices(config.keySpace, entryColumnParent, slicePredicate, keyRange, ConsistencyLevel.ONE);
+				if (keySlices.size() < SLICE_SIZE) {
+					complete = true;
+				} else {
+					startKey = keySlices.get(keySlices.size() - 1).getKey();
+				}
+				Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+				for (KeySlice keySlice : keySlices) {
+					String cassandraKey = keySlice.getKey();
+					remove0(cassandraKey, mutationMap);
+				}
+				cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ALL);
+			}
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+
+	}
+
+	@Override
+	public boolean remove(Object key) throws CacheLoaderException {
+		if (trace)
+			log.trace("remove(\"{0}\") ", key);
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			remove0(CassandraCacheStore.hashKey(key), mutationMap);
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+			return true;
+		} catch (Exception e) {
+			log.error("Exception while removing " + key, e);
+			return false;
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	private void remove0(String key, Map<String, Map<String, List<Mutation>>> mutationMap) {
+		addMutation(mutationMap, key, config.entryColumnFamily, null, null);
+	}
+
+	private byte[] marshall(InternalCacheEntry entry) throws IOException {
+		return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
+	}
+
+	private InternalCacheEntry unmarshall(Object o, Object key) throws IOException, ClassNotFoundException {
+		if (o == null)
+			return null;
+		byte b[] = (byte[]) o;
+		InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
+		return v.toInternalCacheEntry(key);
+	}
+
+	public void store(InternalCacheEntry entry) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+
+		try {
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>(2);
+			store0(entry, mutationMap);
+			
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+	}
+
+	private void store0(InternalCacheEntry entry, Map<String, Map<String, List<Mutation>>> mutationMap) throws IOException {
+		Object key = entry.getKey();
+		if (trace)
+			log.trace("store(\"{0}\") ", key);
+		String cassandraKey = CassandraCacheStore.hashKey(key);
+		addMutation(mutationMap, cassandraKey, config.entryColumnFamily, entryColumnPath.getColumn(), marshall(entry));
+		if (entry.canExpire()) {
+			addExpiryEntry(cassandraKey, entry.getExpiryTime(), mutationMap);
+		}
+	}
+	
+	private void addExpiryEntry(String cassandraKey, long expiryTime, Map<String, Map<String, List<Mutation>>> mutationMap) {		
+		try {
+			addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, longToBytes(expiryTime), cassandraKey.getBytes("UTF-8"), emptyByteArray);
+		} catch (Exception e) {
+			// Should not happen
+		}
+	}
+
+	/**
+	 * Writes to a stream the number of entries (long) then the entries
+	 * themselves.
+	 */
+	public void toStream(ObjectOutput out) throws CacheLoaderException {
+		try {
+			Set<InternalCacheEntry> loadAll = loadAll();
+			int count = 0;
+			for (InternalCacheEntry entry : loadAll) {
+				getMarshaller().objectToObjectStream(entry, out);
+				count++;
+			}
+			getMarshaller().objectToObjectStream(null, out);
+		} catch (IOException e) {
+			throw new CacheLoaderException(e);
+		}
+	}
+
+	/**
+	 * Reads from a stream the number of entries (long) then the entries
+	 * themselves.
+	 */
+	public void fromStream(ObjectInput in) throws CacheLoaderException {
+		try {
+			int count = 0;
+			while (true) {
+				count++;
+				InternalCacheEntry entry = (InternalCacheEntry) getMarshaller().objectFromObjectStream(in);
+				if (entry == null)
+					break;
+				store(entry);
+			}
+		} catch (IOException e) {
+			throw new CacheLoaderException(e);
+		} catch (ClassNotFoundException e) {
+			throw new CacheLoaderException(e);
+		}
+	}
+
+	/**
+	 * Purge expired entries.
+	 * Expiration entries are stored in a single key (EXPIRATION_KEY) within a specific ColumnFamily (set by configuration).
+	 * The entries are grouped by expiration timestamp in SuperColumns within which each entry's key is mapped to a column
+	 */
+	@Override
+	protected void purgeInternal() throws CacheLoaderException {
+		if (trace)
+			log.trace("purgeInternal");
+		Cassandra.Iface cassandraClient = null;
+		try {
+			cassandraClient = pool.getConnection();
+			// We need to get all supercolumns from the beginning of time until now, in SLICE_SIZE chunks
+			SlicePredicate predicate = new SlicePredicate();
+			predicate.setSlice_range(new SliceRange(emptyByteArray, longToBytes(System.currentTimeMillis()), false, SLICE_SIZE));
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+			for(boolean complete=false; !complete; ) {
+				// Get all columns
+				List<ColumnOrSuperColumn> slice = cassandraClient.get_slice(config.keySpace, EXPIRATION_KEY, expirationColumnParent, predicate, ConsistencyLevel.ONE);
+				complete = slice.size() < SLICE_SIZE;
+				// Delete all keys returned by the slice
+				for(ColumnOrSuperColumn crumb : slice) {
+					SuperColumn scol = crumb.getSuper_column();					
+					for(Iterator<Column> i = scol.getColumnsIterator(); i.hasNext(); ) {
+						Column col = i.next();
+						// Remove the entry row
+						remove0(new String(col.getName(), "UTF-8"), mutationMap);
+					}
+					// Remove the expiration supercolumn
+					addMutation(mutationMap, EXPIRATION_KEY, config.expirationColumnFamily, scol.getName(), null, null);
+				}				
+			}
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+
+	}
+
+	@Override
+	protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
+		Cassandra.Iface cassandraClient = null;
+
+		try {
+			cassandraClient = pool.getConnection();
+			Map<String, Map<String, List<Mutation>>> mutationMap = new HashMap<String, Map<String, List<Mutation>>>();
+
+			for (Modification m : mods) {
+				switch (m.getType()) {
+				case STORE:
+					store0(((Store) m).getStoredEntry(), mutationMap);
+					break;
+				case CLEAR:
+					clear();
+					break;
+				case REMOVE:
+					remove0(hashKey(((Remove) m).getKey()), mutationMap);
+					break;
+				default:
+					throw new AssertionError();
+				}
+			}
+
+			cassandraClient.batch_mutate(config.keySpace, mutationMap, ConsistencyLevel.ONE);
+		} catch (Exception e) {
+			throw new CacheLoaderException(e);
+		} finally {
+			pool.release(cassandraClient);
+		}
+
+	}
+
+	@Override
+	public String toString() {
+		return "CassandraCacheStore";
+	}
+
+	private static String hashKey(Object key) {
+		return ENTRY_KEY_PREFIX + key.toString();
+	}
+
+	private static String unhashKey(String key) {
+		if(key.startsWith(ENTRY_KEY_PREFIX))
+			return key.substring(ENTRY_KEY_PREFIX.length());
+		else
+			return null;
+	}
+
+	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] column, byte[] value) {
+		addMutation(mutationMap, key, columnFamily, null, column, value);
+	}
+
+	private static void addMutation(Map<String, Map<String, List<Mutation>>> mutationMap, String key, String columnFamily, byte[] superColumn, byte[] column, byte[] value) {
+		Map<String, List<Mutation>> keyMutations = mutationMap.get(key);
+		// If the key doesn't exist yet, create the mutation holder
+		if (keyMutations == null) {
+			keyMutations = new HashMap<String, List<Mutation>>();
+			mutationMap.put(key, keyMutations);
+		}
+		// If the columnfamily doesn't exist yet, create the mutation holder
+		List<Mutation> columnFamilyMutations = keyMutations.get(columnFamily);
+		if (columnFamilyMutations == null) {
+			columnFamilyMutations = new ArrayList<Mutation>();
+			keyMutations.put(columnFamily, columnFamilyMutations);
+		}
+
+		if (value == null) { // Delete
+			Deletion deletion = new Deletion(System.currentTimeMillis());
+			if(superColumn!=null) {
+				deletion.setSuper_column(superColumn);
+			}
+			if (column != null) { // Single column delete
+				deletion.setPredicate(new SlicePredicate().setColumn_names(Arrays.asList(new byte[][] { column })));
+			} // else Delete entire column family or supercolumn
+			columnFamilyMutations.add(new Mutation().setDeletion(deletion));
+		} else { // Insert/update
+			ColumnOrSuperColumn cosc = new ColumnOrSuperColumn();
+			if(superColumn!=null) {
+				List<Column> columns = new ArrayList<Column>();
+				columns.add(new Column(column, value, System.currentTimeMillis()));
+				cosc.setSuper_column(new SuperColumn(superColumn, columns));
+			} else {
+				cosc.setColumn(new Column(column, value, System.currentTimeMillis()));
+			}
+			columnFamilyMutations.add(new Mutation().setColumn_or_supercolumn(cosc));
+		}
+	}
+
+	private static final byte[] longToBytes(long v) {
+		byte b[] = new byte[8];
+		b[0] = (byte) (v >>> 56);
+		b[1] = (byte) (v >>> 48);
+		b[2] = (byte) (v >>> 40);
+		b[3] = (byte) (v >>> 32);
+		b[4] = (byte) (v >>> 24);
+		b[5] = (byte) (v >>> 16);
+		b[6] = (byte) (v >>> 8);
+		b[7] = (byte) (v >>> 0);
+		return b;
+	}
+}


Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStore.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,334 @@
+package org.infinispan.loaders.cassandra;
+
+import net.dataforte.cassandra.pool.PoolProperties;
+
+import org.infinispan.loaders.LockSupportCacheStoreConfig;
+
+/**
+ * Configures {@link CassandraCacheStore}.
+ */
+public class CassandraCacheStoreConfig extends LockSupportCacheStoreConfig {
+	
+	
+
+	/**
+	 * @configRef desc="The Cassandra keyspace"
+	 */
+	String keySpace = "Infinispan";
+
+	/**
+	 * @configRef desc="The Cassandra column family for entries"
+	 */
+	String entryColumnFamily = "InfinispanEntries";
+
+	/**
+	 * @configRef desc="The Cassandra column family for expirations"
+	 */
+	String expirationColumnFamily = "InfinispanExpiration";
+
+	PoolProperties poolProperties;
+
+	public CassandraCacheStoreConfig() {
+		setCacheLoaderClassName(CassandraCacheStore.class.getName());
+		poolProperties = new PoolProperties();
+	}
+
+	public String getKeySpace() {
+		return keySpace;
+	}
+
+	public void setKeySpace(String keySpace) {
+		this.keySpace = keySpace;
+	}
+
+	public String getEntryColumnFamily() {
+		return entryColumnFamily;
+	}
+
+	public void setEntryColumnFamily(String entryColumnFamily) {
+		this.entryColumnFamily = entryColumnFamily;
+	}
+
+	public String getExpirationColumnFamily() {
+		return expirationColumnFamily;
+	}
+
+	public void setExpirationColumnFamily(String expirationColumnFamily) {
+		this.expirationColumnFamily = expirationColumnFamily;
+	}
+
+	public PoolProperties getPoolProperties() {
+		return poolProperties;
+	}
+
+	public void setHost(String host) {
+		poolProperties.setHost(host);
+	}
+
+	public String getHost() {
+		return poolProperties.getHost();
+	}
+
+	public void setPort(int port) {
+		poolProperties.setPort(port);
+	}
+
+	public int getPort() {
+		return poolProperties.getPort();
+	}
+
+	
+	public int getAbandonWhenPercentageFull() {
+		return poolProperties.getAbandonWhenPercentageFull();
+	}
+
+	
+	public boolean getFramed() {
+		return poolProperties.getFramed();
+	}
+
+	
+	public int getInitialSize() {
+		return poolProperties.getInitialSize();
+	}
+
+	
+	public int getMaxActive() {
+		return poolProperties.getMaxActive();
+	}
+
+	
+	public long getMaxAge() {
+		return poolProperties.getMaxAge();
+	}
+
+	
+	public int getMaxIdle() {
+		return poolProperties.getMaxIdle();
+	}
+
+	
+	public int getMaxWait() {
+		return poolProperties.getMaxWait();
+	}
+
+	
+	public int getMinEvictableIdleTimeMillis() {
+		return poolProperties.getMinEvictableIdleTimeMillis();
+	}
+
+	
+	public int getMinIdle() {
+		return poolProperties.getMinIdle();
+	}
+
+	
+	public String getName() {
+		return poolProperties.getName();
+	}
+
+	
+	public int getNumTestsPerEvictionRun() {
+		return poolProperties.getNumTestsPerEvictionRun();
+	}
+
+	
+	public String getPassword() {
+		return poolProperties.getPassword();
+	}
+	
+	public int getRemoveAbandonedTimeout() {
+		return poolProperties.getRemoveAbandonedTimeout();
+	}
+
+	
+	public int getSuspectTimeout() {
+		return poolProperties.getSuspectTimeout();
+	}
+
+	
+	public int getTimeBetweenEvictionRunsMillis() {
+		return poolProperties.getTimeBetweenEvictionRunsMillis();
+	}
+
+	
+	public boolean getUseLock() {
+		return poolProperties.getUseLock();
+	}
+
+	
+	public String getUsername() {
+		return poolProperties.getUsername();
+	}
+
+	
+	public long getValidationInterval() {
+		return poolProperties.getValidationInterval();
+	}
+	
+	public boolean isFairQueue() {
+		return poolProperties.isFairQueue();
+	}
+
+	
+	public boolean isJmxEnabled() {
+		return poolProperties.isJmxEnabled();
+	}
+
+	
+	public boolean isLogAbandoned() {
+		return poolProperties.isLogAbandoned();
+	}
+
+	
+	public boolean isRemoveAbandoned() {
+		return poolProperties.isRemoveAbandoned();
+	}
+
+	
+	public boolean isTestOnBorrow() {
+		return poolProperties.isTestOnBorrow();
+	}
+
+	
+	public boolean isTestOnConnect() {
+		return poolProperties.isTestOnConnect();
+	}
+
+	
+	public boolean isTestOnReturn() {
+		return poolProperties.isTestOnReturn();
+	}
+
+	
+	public boolean isTestWhileIdle() {
+		return poolProperties.isTestWhileIdle();
+	}
+
+	
+	public void setAbandonWhenPercentageFull(int percentage) {
+		poolProperties.setAbandonWhenPercentageFull(percentage);
+
+	}
+	
+	public void setFairQueue(boolean fairQueue) {
+		poolProperties.setFairQueue(fairQueue);
+
+	}
+
+	
+	public void setFramed(boolean framed) {
+		poolProperties.setFramed(framed);
+
+	}
+
+	
+	public void setInitialSize(int initialSize) {
+		poolProperties.setInitialSize(initialSize);
+
+	}
+
+	
+	public void setJmxEnabled(boolean jmxEnabled) {
+		poolProperties.setJmxEnabled(jmxEnabled);
+	}
+
+	
+	public void setLogAbandoned(boolean logAbandoned) {
+		poolProperties.setLogAbandoned(logAbandoned);
+	}
+
+	
+	public void setMaxActive(int maxActive) {
+		poolProperties.setMaxActive(maxActive);
+
+	}
+
+	
+	public void setMaxAge(long maxAge) {
+		poolProperties.setMaxAge(maxAge);
+
+	}
+
+	
+	public void setMaxIdle(int maxIdle) {
+		poolProperties.setMaxIdle(maxIdle);
+
+	}
+
+	
+	public void setMaxWait(int maxWait) {
+		poolProperties.setMaxWait(maxWait);
+
+	}
+
+	
+	public void setMinEvictableIdleTimeMillis(int minEvictableIdleTimeMillis) {
+		poolProperties.setMinEvictableIdleTimeMillis(minEvictableIdleTimeMillis);
+
+	}
+
+	public void setMinIdle(int minIdle) {
+		poolProperties.setMinIdle(minIdle);
+
+	}
+
+	public void setName(String name) {
+		poolProperties.setName(name);
+	}
+
+	public void setNumTestsPerEvictionRun(int numTestsPerEvictionRun) {
+		poolProperties.setNumTestsPerEvictionRun(numTestsPerEvictionRun);
+
+	}
+
+	public void setPassword(String password) {
+		poolProperties.setPassword(password);
+	}
+
+	public void setRemoveAbandoned(boolean removeAbandoned) {
+		poolProperties.setRemoveAbandoned(removeAbandoned);
+	}
+
+	
+	public void setRemoveAbandonedTimeout(int removeAbandonedTimeout) {
+		poolProperties.setRemoveAbandonedTimeout(removeAbandonedTimeout);
+
+	}
+
+	public void setSuspectTimeout(int seconds) {
+		poolProperties.setSuspectTimeout(seconds);
+
+	}
+
+	public void setTestOnBorrow(boolean testOnBorrow) {
+		poolProperties.setTestOnBorrow(testOnBorrow);
+
+	}
+
+	public void setTestOnConnect(boolean testOnConnect) {
+		poolProperties.setTestOnConnect(testOnConnect);
+
+	}
+
+	public void setTestOnReturn(boolean testOnReturn) {
+		poolProperties.setTestOnReturn(testOnReturn);
+	}
+
+	public void setTestWhileIdle(boolean testWhileIdle) {
+		poolProperties.setTestWhileIdle(testWhileIdle);
+	}
+
+	public void setTimeBetweenEvictionRunsMillis(int timeBetweenEvictionRunsMillis) {
+		poolProperties.setTimeBetweenEvictionRunsMillis(timeBetweenEvictionRunsMillis);
+
+	}
+
+	public void setUsername(String username) {
+		poolProperties.setUsername(username);
+	}
+
+	public void setValidationInterval(long validationInterval) {
+		poolProperties.setValidationInterval(validationInterval);
+	}
+}


Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/CassandraCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,5 @@
+/**
+ * This package contains a {@link org.infinispan.loaders.CacheStore} implementation based on
+ * persisting to JDBM.
+ */
+package org.infinispan.loaders.cassandra;
\ No newline at end of file


Property changes on: branches/4.2.x/cachestore/cassandra/src/main/java/org/infinispan/loaders/cassandra/package-info.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,53 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+
+import org.apache.cassandra.service.EmbeddedCassandraService;
+import org.apache.thrift.transport.TTransportException;
+import org.infinispan.loaders.BaseCacheStoreTest;
+import org.infinispan.loaders.CacheStore;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+ at Test(groups = "unit", testName = "loaders.cassandra.CassandraCacheStoreTest")
+public class CassandraCacheStoreTest extends BaseCacheStoreTest {
+	private static EmbeddedCassandraService cassandra;
+
+	/**
+	 * Set embedded cassandra up and spawn it in a new thread.
+	 * 
+	 * @throws TTransportException
+	 * @throws IOException
+	 * @throws InterruptedException
+	 */
+	@BeforeClass
+	public static void setup() throws TTransportException, IOException, InterruptedException {
+		// Tell cassandra where the configuration files are.
+		// Use the test configuration file.
+		URL resource = Thread.currentThread().getContextClassLoader().getResource("storage-conf.xml");
+		String configPath = resource.getPath().substring(0, resource.getPath().lastIndexOf(File.separatorChar));
+		
+		System.setProperty("storage-config", configPath);
+
+		CassandraServiceDataCleaner cleaner = new CassandraServiceDataCleaner();
+		cleaner.prepare();
+		cassandra = new EmbeddedCassandraService();
+		cassandra.init();
+		Thread t = new Thread(cassandra);
+		t.setDaemon(true);
+		t.start();
+	}
+
+	@Override
+	protected CacheStore createCacheStore() throws Exception {
+		CassandraCacheStore cs = new CassandraCacheStore();
+		CassandraCacheStoreConfig clc = new CassandraCacheStoreConfig();
+		clc.setHost("localhost");
+		cs.init(clc, getCache(), getMarshaller());
+		cs.start();
+		return cs;
+	}
+
+}


Property changes on: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraCacheStoreTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,72 @@
+package org.infinispan.loaders.cassandra;
+
+import java.io.IOException;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.cassandra.config.DatabaseDescriptor;
+import org.apache.cassandra.io.util.FileUtils;
+import org.infinispan.test.TestingUtil;
+
+public class CassandraServiceDataCleaner {
+	/**
+	 * Creates all data dir if they don't exist and cleans them
+	 * 
+	 * @throws IOException
+	 */
+	public void prepare() throws IOException {
+		makeDirsIfNotExist();
+		cleanupDataDirectories();
+	}
+
+	/**
+	 * Deletes all data from cassandra data directories, including the commit
+	 * log.
+	 * 
+	 * @throws IOException
+	 *             in case of permissions error etc.
+	 */
+	public void cleanupDataDirectories() throws IOException {
+		for (String s : getDataDirs()) {
+			TestingUtil.recursiveFileRemove(s);
+		}
+	}
+
+	/**
+	 * Creates the data diurectories, if they didn't exist.
+	 * 
+	 * @throws IOException
+	 *             if directories cannot be created (permissions etc).
+	 */
+	public void makeDirsIfNotExist() throws IOException {
+		for (String s : getDataDirs()) {
+			mkdir(s);
+		}
+	}
+
+	/**
+	 * Collects all data dirs and returns a set of String paths on the file
+	 * system.
+	 * 
+	 * @return
+	 */
+	private Set<String> getDataDirs() {
+		Set<String> dirs = new HashSet<String>();
+		for (String s : DatabaseDescriptor.getAllDataFileLocations()) {
+			dirs.add(s);
+		}
+		dirs.add(DatabaseDescriptor.getLogFileLocation());
+		return dirs;
+	}
+
+	/**
+	 * Creates a directory
+	 * 
+	 * @param dir
+	 * @throws IOException
+	 */
+	private void mkdir(String dir) throws IOException {
+		FileUtils.createDirectory(dir);
+	}
+
+}


Property changes on: branches/4.2.x/cachestore/cassandra/src/test/java/org/infinispan/loaders/cassandra/CassandraServiceDataCleaner.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Added: branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/resources/log4j.properties	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,6 @@
+log4j.rootCategory=INFO, STDOUT
+
+log4j.appender.STDOUT = org.apache.log4j.ConsoleAppender
+log4j.appender.STDOUT.layout = org.apache.log4j.PatternLayout
+log4j.appender.STDOUT.layout.ConversionPattern = %d{yyyy-MM-dd HH:mm:ss} %c{1} [%p] %m%n
+

Added: branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
===================================================================
--- branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml	                        (rev 0)
+++ branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml	2010-10-11 08:49:33 UTC (rev 2497)
@@ -0,0 +1,347 @@
+<Storage>
+  <!--======================================================================-->
+  <!-- Basic Configuration                                                  -->
+  <!--======================================================================-->
+
+  <!--
+   ~ The name of this cluster.  This is mainly used to prevent machines in
+   ~ one logical cluster from joining another.
+  -->
+  <ClusterName>Infinispan</ClusterName>
+
+  <!--
+   ~ Turn on to make new [non-seed] nodes automatically migrate the right data
+   ~ to themselves.  (If no InitialToken is specified, they will pick one
+   ~ such that they will get half the range of the most-loaded node.)
+   ~ If a node starts up without bootstrapping, it will mark itself bootstrapped
+   ~ so that you can't subsequently accidently bootstrap a node with
+   ~ data on it.  (You can reset this by wiping your data and commitlog
+   ~ directories.)
+   ~
+   ~ Off by default so that new clusters and upgraders from 0.4 don't
+   ~ bootstrap immediately.  You should turn this on when you start adding
+   ~ new nodes to a cluster that already has data on it.  (If you are upgrading
+   ~ from 0.4, start your cluster with it off once before changing it to true.
+   ~ Otherwise, no data will be lost but you will incur a lot of unnecessary
+   ~ I/O before your cluster starts up.)
+  -->
+  <AutoBootstrap>false</AutoBootstrap>
+
+  <!--
+   ~ Keyspaces and ColumnFamilies:
+   ~ A ColumnFamily is the Cassandra concept closest to a relational
+   ~ table.  Keyspaces are separate groups of ColumnFamilies.  Except in
+   ~ very unusual circumstances you will have one Keyspace per application.
+
+   ~ There is an implicit keyspace named 'system' for Cassandra internals.
+  -->
+  <Keyspaces>
+    <Keyspace Name="Infinispan">
+      <!--
+       ~ ColumnFamily definitions have one required attribute (Name)
+       ~ and several optional ones.
+       ~
+       ~ The CompareWith attribute tells Cassandra how to sort the columns
+       ~ for slicing operations.  The default is BytesType, which is a
+       ~ straightforward lexical comparison of the bytes in each column.
+       ~ Other options are AsciiType, UTF8Type, LexicalUUIDType, TimeUUIDType,
+       ~ and LongType.  You can also specify the fully-qualified class
+       ~ name to a class of your choice extending
+       ~ org.apache.cassandra.db.marshal.AbstractType.
+       ~
+       ~ SuperColumns have a similar CompareSubcolumnsWith attribute.
+       ~
+       ~ BytesType: Simple sort by byte value.  No validation is performed.
+       ~ AsciiType: Like BytesType, but validates that the input can be
+       ~            parsed as US-ASCII.
+       ~ UTF8Type: A string encoded as UTF8
+       ~ LongType: A 64bit long
+       ~ LexicalUUIDType: A 128bit UUID, compared lexically (by byte value)
+       ~ TimeUUIDType: a 128bit version 1 UUID, compared by timestamp
+       ~
+       ~ (To get the closest approximation to 0.3-style supercolumns, you
+       ~ would use CompareWith=UTF8Type CompareSubcolumnsWith=LongType.)
+       ~
+       ~ An optional `Comment` attribute may be used to attach additional
+       ~ human-readable information about the column family to its definition.
+       ~
+       ~ The optional KeysCached attribute specifies
+       ~ the number of keys per sstable whose locations we keep in
+       ~ memory in "mostly LRU" order.  (JUST the key locations, NOT any
+       ~ column values.) Specify a fraction (value less than 1), a percentage
+       ~ (ending in a % sign) or an absolute number of keys to cache.
+       ~
+       ~ The optional RowsCached attribute specifies the number of rows
+       ~ whose entire contents we cache in memory. Do not use this on
+       ~ ColumnFamilies with large rows, or ColumnFamilies with high write:read
+       ~ ratios. Specify a fraction (value less than 1), a percentage (ending in
+       ~ a % sign) or an absolute number of rows to cache.
+      -->
+
+      <ColumnFamily CompareWith="BytesType" Name="InfinispanEntries" KeysCached="10%" />
+      <ColumnFamily CompareWith="LongType" Name="InfinispanExpiration" KeysCached="10%" ColumnType="Super" CompareSubcolumnsWith="BytesType"/>
+
+      <!--
+       ~ Strategy: Setting this to the class that implements
+       ~ IReplicaPlacementStrategy will change the way the node picker works.
+       ~ Out of the box, Cassandra provides
+       ~ org.apache.cassandra.locator.RackUnawareStrategy and
+       ~ org.apache.cassandra.locator.RackAwareStrategy (place one replica in
+       ~ a different datacenter, and the others on different racks in the same
+       ~ one.)
+      -->
+      <ReplicaPlacementStrategy>org.apache.cassandra.locator.RackUnawareStrategy</ReplicaPlacementStrategy>
+
+      <!-- Number of replicas of the data -->
+      <ReplicationFactor>1</ReplicationFactor>
+
+      <!--
+       ~ EndPointSnitch: Setting this to the class that implements
+       ~ AbstractEndpointSnitch, which lets Cassandra know enough
+       ~ about your network topology to route requests efficiently.
+       ~ Out of the box, Cassandra provides org.apache.cassandra.locator.EndPointSnitch,
+       ~ and PropertyFileEndPointSnitch is available in contrib/.
+      -->
+      <EndPointSnitch>org.apache.cassandra.locator.EndPointSnitch</EndPointSnitch>
+
+    </Keyspace>
+  </Keyspaces>
+
+  <!--
+   ~ Authenticator: any IAuthenticator may be used, including your own as long
+   ~ as it is on the classpath.  Out of the box, Cassandra provides
+   ~ org.apache.cassandra.auth.AllowAllAuthenticator and,
+   ~ org.apache.cassandra.auth.SimpleAuthenticator
+   ~ (SimpleAuthenticator uses access.properties and passwd.properties by
+   ~ default).
+   ~
+   ~ If you don't specify an authenticator, AllowAllAuthenticator is used.
+  -->
+  <Authenticator>org.apache.cassandra.auth.AllowAllAuthenticator</Authenticator>
+
+  <!--
+   ~ Partitioner: any IPartitioner may be used, including your own as long
+   ~ as it is on the classpath.  Out of the box, Cassandra provides
+   ~ org.apache.cassandra.dht.RandomPartitioner,
+   ~ org.apache.cassandra.dht.OrderPreservingPartitioner, and
+   ~ org.apache.cassandra.dht.CollatingOrderPreservingPartitioner.
+   ~ (CollatingOPP colates according to EN,US rules, not naive byte
+   ~ ordering.  Use this as an example if you need locale-aware collation.)
+   ~ Range queries require using an order-preserving partitioner.
+   ~
+   ~ Achtung!  Changing this parameter requires wiping your data
+   ~ directories, since the partitioner can modify the sstable on-disk
+   ~ format.
+  -->
+  <Partitioner>org.apache.cassandra.dht.OrderPreservingPartitioner</Partitioner>
+
+  <!--
+   ~ If you are using an order-preserving partitioner and you know your key
+   ~ distribution, you can specify the token for this node to use. (Keys
+   ~ are sent to the node with the "closest" token, so distributing your
+   ~ tokens equally along the key distribution space will spread keys
+   ~ evenly across your cluster.)  This setting is only checked the first
+   ~ time a node is started.
+
+   ~ This can also be useful with RandomPartitioner to force equal spacing
+   ~ of tokens around the hash space, especially for clusters with a small
+   ~ number of nodes.
+  -->
+  <InitialToken></InitialToken>
+
+  <!--
+   ~ Directories: Specify where Cassandra should store different data on
+   ~ disk.  Keep the data disks and the CommitLog disks separate for best
+   ~ performance
+  -->
+  <CommitLogDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/commitlog</CommitLogDirectory>
+  <DataFileDirectories>
+      <DataFileDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/data</DataFileDirectory>
+  </DataFileDirectories>
+  <CalloutLocation>${java.io.tmpdir}/infinispan-cassandra-cachestore/callouts</CalloutLocation>
+  <StagingFileDirectory>${java.io.tmpdir}/infinispan-cassandra-cachestore/staging</StagingFileDirectory>
+
+
+  <!--
+   ~ Addresses of hosts that are deemed contact points. Cassandra nodes
+   ~ use this list of hosts to find each other and learn the topology of
+   ~ the ring. You must change this if you are running multiple nodes!
+  -->
+  <Seeds>
+      <Seed>127.0.0.1</Seed>
+  </Seeds>
+
+
+  <!-- Miscellaneous -->
+
+  <!-- Time to wait for a reply from other nodes before failing the command -->
+  <RpcTimeoutInMillis>10000</RpcTimeoutInMillis>
+  <!-- Size to allow commitlog to grow to before creating a new segment -->
+  <CommitLogRotationThresholdInMB>128</CommitLogRotationThresholdInMB>
+
+
+  <!-- Local hosts and ports -->
+
+  <!--
+   ~ Address to bind to and tell other nodes to connect to.  You _must_
+   ~ change this if you want multiple nodes to be able to communicate!
+   ~
+   ~ Leaving it blank leaves it up to InetAddress.getLocalHost(). This
+   ~ will always do the Right Thing *if* the node is properly configured
+   ~ (hostname, name resolution, etc), and the Right Thing is to use the
+   ~ address associated with the hostname (it might not be).
+  -->
+  <ListenAddress></ListenAddress>
+  <!-- internal communications port -->
+  <StoragePort>7000</StoragePort>
+
+  <!--
+   ~ The address to bind the Thrift RPC service to. Unlike ListenAddress
+   ~ above, you *can* specify 0.0.0.0 here if you want Thrift to listen on
+   ~ all interfaces.
+   ~
+   ~ Leaving this blank has the same effect it does for ListenAddress,
+   ~ (i.e. it will be based on the configured hostname of the node).
+  -->
+  <ThriftAddress>localhost</ThriftAddress>
+  <!-- Thrift RPC port (the port clients connect to). -->
+  <ThriftPort>9160</ThriftPort>
+  <!--
+   ~ Whether or not to use a framed transport for Thrift. If this option
+   ~ is set to true then you must also use a framed transport on the
+   ~ client-side, (framed and non-framed transports are not compatible).
+  -->
+  <ThriftFramedTransport>false</ThriftFramedTransport>
+
+
+  <!--======================================================================-->
+  <!-- Memory, Disk, and Performance                                        -->
+  <!--======================================================================-->
+
+  <!--
+   ~ Access mode.  mmapped i/o is substantially faster, but only practical on
+   ~ a 64bit machine (which notably does not include EC2 "small" instances)
+   ~ or relatively small datasets.  "auto", the safe choice, will enable
+   ~ mmapping on a 64bit JVM.  Other values are "mmap", "mmap_index_only"
+   ~ (which may allow you to get part of the benefits of mmap on a 32bit
+   ~ machine by mmapping only index files) and "standard".
+   ~ (The buffer size settings that follow only apply to standard,
+   ~ non-mmapped i/o.)
+   -->
+  <DiskAccessMode>auto</DiskAccessMode>
+
+  <!--
+   ~ Size of compacted row above which to log a warning.  (If compacted
+   ~ rows do not fit in memory, Cassandra will crash.  This is explained
+   ~ in http://wiki.apache.org/cassandra/CassandraLimitations and is
+   ~ scheduled to be fixed in 0.7.)
+  -->
+  <RowWarningThresholdInMB>512</RowWarningThresholdInMB>
+
+  <!--
+   ~ Buffer size to use when performing contiguous column slices. Increase
+   ~ this to the size of the column slices you typically perform.
+   ~ (Name-based queries are performed with a buffer size of
+   ~ ColumnIndexSizeInKB.)
+  -->
+  <SlicedBufferSizeInKB>64</SlicedBufferSizeInKB>
+
+  <!--
+   ~ Buffer size to use when flushing memtables to disk. (Only one
+   ~ memtable is ever flushed at a time.) Increase (decrease) the index
+   ~ buffer size relative to the data buffer if you have few (many)
+   ~ columns per key.  Bigger is only better _if_ your memtables get large
+   ~ enough to use the space. (Check in your data directory after your
+   ~ app has been running long enough.) -->
+  <FlushDataBufferSizeInMB>32</FlushDataBufferSizeInMB>
+  <FlushIndexBufferSizeInMB>8</FlushIndexBufferSizeInMB>
+
+  <!--
+   ~ Add column indexes to a row after its contents reach this size.
+   ~ Increase if your column values are large, or if you have a very large
+   ~ number of columns.  The competing causes are, Cassandra has to
+   ~ deserialize this much of the row to read a single column, so you want
+   ~ it to be small - at least if you do many partial-row reads - but all
+   ~ the index data is read for each access, so you don't want to generate
+   ~ that wastefully either.
+  -->
+  <ColumnIndexSizeInKB>64</ColumnIndexSizeInKB>
+
+  <!--
+   ~ Flush memtable after this much data has been inserted, including
+   ~ overwritten data.  There is one memtable per column family, and
+   ~ this threshold is based solely on the amount of data stored, not
+   ~ actual heap memory usage (there is some overhead in indexing the
+   ~ columns).
+  -->
+  <MemtableThroughputInMB>64</MemtableThroughputInMB>
+  <!--
+   ~ Throughput setting for Binary Memtables.  Typically these are
+   ~ used for bulk load so you want them to be larger.
+  -->
+  <BinaryMemtableThroughputInMB>256</BinaryMemtableThroughputInMB>
+  <!--
+   ~ The maximum number of columns in millions to store in memory per
+   ~ ColumnFamily before flushing to disk.  This is also a per-memtable
+   ~ setting.  Use with MemtableThroughputInMB to tune memory usage.
+  -->
+  <MemtableOperationsInMillions>0.3</MemtableOperationsInMillions>
+  <!--
+   ~ The maximum time to leave a dirty memtable unflushed.
+   ~ (While any affected columnfamilies have unflushed data from a
+   ~ commit log segment, that segment cannot be deleted.)
+   ~ This needs to be large enough that it won't cause a flush storm
+   ~ of all your memtables flushing at once because none has hit
+   ~ the size or count thresholds yet.  For production, a larger
+   ~ value such as 1440 is recommended.
+  -->
+  <MemtableFlushAfterMinutes>60</MemtableFlushAfterMinutes>
+
+  <!--
+   ~ Unlike most systems, in Cassandra writes are faster than reads, so
+   ~ you can afford more of those in parallel.  A good rule of thumb is 2
+   ~ concurrent reads per processor core.  Increase ConcurrentWrites to
+   ~ the number of clients writing at once if you enable CommitLogSync +
+   ~ CommitLogSyncDelay. -->
+  <ConcurrentReads>8</ConcurrentReads>
+  <ConcurrentWrites>32</ConcurrentWrites>
+
+  <!--
+   ~ CommitLogSync may be either "periodic" or "batch."  When in batch
+   ~ mode, Cassandra won't ack writes until the commit log has been
+   ~ fsynced to disk.  It will wait up to CommitLogSyncBatchWindowInMS
+   ~ milliseconds for other writes, before performing the sync.
+
+   ~ This is less necessary in Cassandra than in traditional databases
+   ~ since replication reduces the odds of losing data from a failure
+   ~ after writing the log entry but before it actually reaches the disk.
+   ~ So the other option is "timed," where writes may be acked immediately
+   ~ and the CommitLog is simply synced every CommitLogSyncPeriodInMS
+   ~ milliseconds.
+  -->
+  <CommitLogSync>periodic</CommitLogSync>
+  <!--
+   ~ Interval at which to perform syncs of the CommitLog in periodic mode.
+   ~ Usually the default of 10000ms is fine; increase it if your i/o
+   ~ load is such that syncs are taking excessively long times.
+  -->
+  <CommitLogSyncPeriodInMS>10000</CommitLogSyncPeriodInMS>
+  <!--
+   ~ Delay (in milliseconds) during which additional commit log entries
+   ~ may be written before fsync in batch mode.  This will increase
+   ~ latency slightly, but can vastly improve throughput where there are
+   ~ many writers.  Set to zero to disable (each entry will be synced
+   ~ individually).  Reasonable values range from a minimal 0.1 to 10 or
+   ~ even more if throughput matters more than latency.
+  -->
+  <!-- <CommitLogSyncBatchWindowInMS>1</CommitLogSyncBatchWindowInMS> -->
+
+  <!--
+   ~ Time to wait before garbage-collection deletion markers.  Set this to
+   ~ a large enough value that you are confident that the deletion marker
+   ~ will be propagated to all replicas by the time this many seconds has
+   ~ elapsed, even in the face of hardware failures.  The default value is
+   ~ ten days.
+  -->
+  <GCGraceSeconds>86400</GCGraceSeconds>
+</Storage>


Property changes on: branches/4.2.x/cachestore/cassandra/src/test/resources/storage-conf.xml
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF



More information about the infinispan-commits mailing list