[infinispan-commits] Infinispan SVN: r1589 - in trunk: cachestore/cloud/src/main/java/org/infinispan/loaders/cloud and 15 other directories.
infinispan-commits at lists.jboss.org
infinispan-commits at lists.jboss.org
Wed Mar 10 12:06:25 EST 2010
Author: manik.surtani at jboss.com
Date: 2010-03-10 12:06:20 -0500 (Wed, 10 Mar 2010)
New Revision: 1589
Added:
trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
Modified:
trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java
trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java
trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java
trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
Log:
[ISPN-311] (CacheLoader.loadKeys(), and performance improvements when rehashing from a cache store)
Modified: trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
===================================================================
--- trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -450,6 +450,17 @@
}
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ try {
+ Set<Object> s = new HashSet<Object>();
+ for (Object o: cacheMap.keySet()) if (keysToExclude == null || !keysToExclude.contains(o)) s.add(o);
+ return s;
+ } catch (RuntimeException caught) {
+ throw convertToCacheLoaderException("error loading all entries", caught);
+ }
+ }
+
/**
* {@inheritDoc} This implementation reads the number of entries to load from the stream, then begins a transaction.
* During that transaction, the cachestore is cleared and replaced with entries from the stream. If there are any
Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -142,30 +142,12 @@
}
@Override
- protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
-
+ protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
- if (bucket.removeExpiredEntries())
- updateBucket(bucket);
- result.addAll(bucket.getStoredEntries());
- }
- return result;
- }
-
- @Override
- protected Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException {
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(maxEntries);
-
- for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
- Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
if (bucket.removeExpiredEntries()) updateBucket(bucket);
- for (Iterator<? extends InternalCacheEntry> i = bucket.getStoredEntries().iterator(); i.hasNext() && result.size() < maxEntries;)
- result.add(i.next());
- if (result.size() >= maxEntries) break;
+ if (handler.handle(bucket)) break;
}
- return result;
}
@Override
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -160,7 +160,7 @@
if (filterExpired) ps.setLong(1, System.currentTimeMillis());
rs = ps.executeQuery();
rs.setFetchSize(tableManipulation.getFetchSize());
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+ Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(tableManipulation.getFetchSize());
while (rs.next()) {
loadAllProcess(rs, result);
}
@@ -176,6 +176,32 @@
}
}
+ public Set<Object> loadAllKeysSupport(Set<Object> keysToExclude) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+
+ String sql = getLoadAllKeysSql();
+ if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(tableManipulation.getFetchSize());
+ Set<Object> result = new HashSet<Object>(tableManipulation.getFetchSize());
+ while (rs.next()) loadAllKeysProcess(rs, result, keysToExclude);
+ return result;
+ } catch (SQLException e) {
+ String message = "SQL error while fetching all StoredEntries";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
public final Set<InternalCacheEntry> loadSome(int maxEntries) throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
@@ -204,15 +230,22 @@
}
}
- public abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
+ protected boolean includeKey(Object key, Set<Object> keysToExclude) {
+ return keysToExclude == null || !keysToExclude.contains(key);
+ }
- public abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+ protected abstract String getLoadAllKeysSql();
- public abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+ protected abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
+ protected abstract void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException;
+
+ protected abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+
+ protected abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+
public static void logAndThrow(Exception e, String message) throws CacheLoaderException {
log.error(message, e);
throw new CacheLoaderException(message, e);
}
-
}
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -80,6 +80,8 @@
private String deleteExpiredRowsSql;
private String loadSomeRowsSql;
public DatabaseType databaseType;
+ private String loadAllKeysBinarySql;
+ private String loadAllKeysStringSql;
public TableManipulation(String idColumnName, String idColumnType, String tableNamePrefix, String dataColumnName,
String dataColumnType, String timestampColumnName, String timestampColumnType) {
@@ -429,23 +431,23 @@
switch (getDatabaseType()) {
case ORACLE:
- loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+ loadSomeRowsSql = String.format("SELECT %s, %s FROM (SELECT %s, %s FROM %s) WHERE ROWNUM <= ?", dataColumnName, idColumnName, dataColumnName, idColumnName, getTableName());
break;
case DB2:
- loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+ loadSomeRowsSql = String.format("SELECT %s, %s FROM %s FETCH FIRST ? ROWS ONLY", dataColumnName, idColumnName, getTableName());
break;
case INFORMIX:
case INTERBASE:
case FIREBIRD:
- loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+ loadSomeRowsSql = String.format("SELECT FIRST ? %s, %s FROM %s", dataColumnName, idColumnName, getTableName());
break;
case SQL_SERVER:
case ACCESS:
- loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+ loadSomeRowsSql = String.format("SELECT TOP ? %s, %s FROM %s", dataColumnName, idColumnName, getTableName());
break;
default:
// the MySQL-style LIMIT clause
- loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+ loadSomeRowsSql = String.format("SELECT %s, %s FROM %s LIMIT ?", dataColumnName, idColumnName, getTableName());
break;
}
@@ -453,6 +455,16 @@
return loadSomeRowsSql;
}
+ public String getLoadAllKeysBinarySql() {
+ if (loadAllKeysBinarySql == null) loadAllKeysBinarySql = String.format("SELECT %s FROM %s", dataColumnName, getTableName());
+ return loadAllKeysBinarySql;
+ }
+
+ public String getLoadAllKeysStringSql() {
+ if (loadAllKeysStringSql == null) loadAllKeysStringSql = String.format("SELECT %s FROM %s", idColumnName, getTableName());
+ return loadAllKeysStringSql;
+ }
+
private DatabaseType getDatabaseType() {
if (databaseType == null) {
// need to guess from the database type!
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -50,6 +50,8 @@
import java.util.Iterator;
import java.util.Set;
+import static org.infinispan.loaders.jdbc.DataManipulationHelper.logAndThrow;
+
/**
* {@link BucketBasedCacheStore} implementation that will store all the buckets as rows in database, each row
* corresponding to a bucket. This is in contrast to {@link org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore}
@@ -99,15 +101,25 @@
}
dmHelper = new DataManipulationHelper(connectionFactory, tableManipulation, marshaller) {
@Override
+ protected String getLoadAllKeysSql() {
+ return tableManipulation.getLoadAllKeysBinarySql();
+ }
+
+ @Override
public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
InputStream binaryStream = rs.getBinaryStream(1);
Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
- for (InternalCacheEntry ice: bucket.getStoredEntries()) {
- if (!ice.isExpired()) result.add(ice);
- }
+ for (InternalCacheEntry ice: bucket.getStoredEntries()) if (!ice.isExpired()) result.add(ice);
}
@Override
+ public void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException {
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
+ for (InternalCacheEntry ice: bucket.getStoredEntries()) if (!ice.isExpired() && includeKey(ice.getKey(), keysToExclude)) keys.add(ice.getKey());
+ }
+
+ @Override
public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), is);
String bucketName = rs.getString(2);
@@ -156,7 +168,7 @@
throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
}
} catch (SQLException ex) {
- DataManipulationHelper.logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+ logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
} finally {
JdbcUtil.safeClose(ps);
connectionFactory.releaseConnection(conn);
@@ -182,7 +194,7 @@
throw new CacheLoaderException("Unexpected update result: '" + updatedRows + "'. Expected values is 1");
}
} catch (SQLException e) {
- DataManipulationHelper.logAndThrow(e, "sql failure while updating bucket: " + bucket);
+ logAndThrow(e, "sql failure while updating bucket: " + bucket);
} finally {
JdbcUtil.safeClose(ps);
connectionFactory.releaseConnection(conn);
@@ -219,15 +231,27 @@
}
}
+ @Override
public Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
return dmHelper.loadAllSupport(false);
}
@Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ return dmHelper.loadAllKeysSupport(keysToExclude);
+ }
+
+ @Override
protected Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException {
return dmHelper.loadSome(maxEntries);
}
+ @Override
+ protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
+ // this is a no-op.
+ throw new UnsupportedOperationException("Should never be called.");
+ }
+
protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
dmHelper.fromStreamSupport(objectInput);
}
@@ -271,7 +295,7 @@
//if something happens make sure buckets locks are being release
releaseLocks(expiredBuckets);
connectionFactory.releaseConnection(conn);
- DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
JdbcUtil.safeClose(ps);
JdbcUtil.safeClose(rs);
@@ -315,7 +339,7 @@
//if something happens make sure buckets locks are being release
releaseLocks(emptyBuckets);
connectionFactory.releaseConnection(conn);
- DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
//release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
releaseLocks(expiredBuckets);
@@ -348,7 +372,7 @@
}
} catch (SQLException ex) {
//if something happens make sure buckets locks are being release
- DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
} finally {
releaseLocks(emptyBuckets);
JdbcUtil.safeClose(ps);
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -119,13 +119,21 @@
@Override
public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
if (numEntries < 0) return loadAll();
- Set<InternalCacheEntry> fromBuckets = binaryCacheStore.load(numEntries);
+ Set<InternalCacheEntry> set = stringBasedCacheStore.load(numEntries);
- if (fromBuckets.size() < numEntries) {
- Set<InternalCacheEntry> fromStrings = stringBasedCacheStore.load(numEntries - fromBuckets.size());
- fromBuckets.addAll(fromStrings);
+ if (set.size() < numEntries) {
+ Set<InternalCacheEntry> otherSet = binaryCacheStore.load(numEntries - set.size());
+ set.addAll(otherSet);
}
+ return set;
+ }
+
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ Set<Object> fromBuckets = binaryCacheStore.loadAllKeys(keysToExclude);
+ Set<Object> fromStrings = stringBasedCacheStore.loadAllKeys(keysToExclude);
+ fromBuckets.addAll(fromStrings);
return fromBuckets;
}
Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -103,6 +103,11 @@
dmHelper = new DataManipulationHelper(connectionFactory, tableManipulation, marshaller) {
@Override
+ protected String getLoadAllKeysSql() {
+ return tableManipulation.getLoadAllKeysStringSql();
+ }
+
+ @Override
public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
InputStream inputStream = rs.getBinaryStream(1);
InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
@@ -111,6 +116,12 @@
}
@Override
+ public void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException {
+ Object k = rs.getObject(1);
+ if (includeKey(k, keysToExclude)) keys.add(k);
+ }
+
+ @Override
public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
Object key = rs.getObject(2);
@@ -225,6 +236,11 @@
}
@Override
+ protected Set<Object> loadAllKeysLockSafe(Set<Object> keysToExclude) throws CacheLoaderException {
+ return dmHelper.loadAllKeysSupport(keysToExclude);
+ }
+
+ @Override
public void purgeInternal() throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
Modified: trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java
===================================================================
--- trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -4,7 +4,6 @@
import jdbm.RecordManagerFactory;
import jdbm.btree.BTree;
import jdbm.helper.FastIterator;
-import jdbm.helper.Serializer;
import jdbm.helper.Tuple;
import jdbm.helper.TupleBrowser;
import jdbm.htree.HTree;
@@ -33,6 +32,7 @@
import java.util.AbstractSet;
import java.util.ArrayList;
import java.util.Date;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
@@ -88,7 +88,7 @@
locationStr = System.getProperty("java.io.tmpdir");
config.setLocation(locationStr);
}
-
+
expiryEntryQueue = new LinkedBlockingQueue<ExpiryEntry>(config.getExpiryQueueSize());
// JBCACHE-1448 db name parsing fix courtesy of Ciro Cavani
@@ -152,6 +152,19 @@
return new BTreeSet(numEntries);
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ try {
+ Set<Object> s = new HashSet<Object>();
+ FastIterator fi = tree.keys();
+ Object o;
+ while ((o = fi.next()) != null) if (keysToExclude == null || !keysToExclude.contains(o)) s.add(o);
+ return s;
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
/**
* Opens all databases and initializes database related information.
*/
@@ -255,7 +268,7 @@
store0(entry);
commit();
}
-
+
private byte[] marshall(InternalCacheEntry entry) throws IOException {
return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
}
@@ -264,7 +277,7 @@
if (o == null)
return null;
byte b[] = (byte[]) o;
- InternalCacheValue v = (InternalCacheValue)getMarshaller().objectFromByteBuffer(b);
+ InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
return v.toInternalCacheEntry(key);
}
@@ -291,7 +304,7 @@
Long at = expiry;
Object key = entry.getKey();
if (trace) log.trace("at " + new SimpleDateFormat(DATE).format(new Date(at)) + " expire " + key);
-
+
try {
expiryEntryQueue.put(new ExpiryEntry(at, key));
} catch (InterruptedException e) {
@@ -311,7 +324,7 @@
getMarshaller().objectToObjectStream(entry, out);
count++;
}
- getMarshaller().objectToObjectStream(null, out);
+ getMarshaller().objectToObjectStream(null, out);
log.debug("wrote " + count + " entries");
} catch (IOException e) {
throw new CacheLoaderException(e);
@@ -356,7 +369,8 @@
/**
* Find all times less than current time. Build a list of keys for those times. Then purge those keys, assuming those
* keys' expiry has not changed.
- * @throws ClassNotFoundException
+ *
+ * @throws ClassNotFoundException
*/
private void purgeInternal0() throws Exception {
// Drain queue and update expiry tree
@@ -492,7 +506,7 @@
if (!hasNext())
throw new NoSuchElementException();
try {
- entriesReturned ++;
+ entriesReturned++;
return current;
} finally {
current = null;
@@ -516,11 +530,11 @@
return size;
}
}
-
+
private static final class ExpiryEntry {
private final Long expiry;
private final Object key;
-
+
private ExpiryEntry(long expiry, Object key) {
this.expiry = expiry;
this.key = key;
Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -20,6 +20,9 @@
import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
import org.infinispan.remoting.transport.Address;
import org.infinispan.remoting.transport.Transport;
+import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
import java.util.HashMap;
import java.util.List;
@@ -58,6 +61,7 @@
List<WriteCommand> txLogCommands;
List<PrepareCommand> pendingPrepares;
CommandsFactory commandsFactory;
+ private static final Log log = LogFactory.getLog(RehashControlCommand.class);
public RehashControlCommand() {
}
@@ -142,15 +146,26 @@
CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
if (cacheStore != null) {
- for (InternalCacheEntry ice : cacheStore.loadAll()) {
- Object k = ice.getKey();
- if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self))
- state.put(k, ice.toInternalCacheValue());
+ for (Object k: cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
+ if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self)) {
+ InternalCacheValue v = loadValue(cacheStore, k);
+ if (v != null) state.put(k, v);
+ }
}
}
return state;
}
+ private InternalCacheValue loadValue(CacheStore cs, Object k) {
+ try {
+ InternalCacheEntry ice = cs.load(k);
+ return ice == null ? null : ice.toInternalCacheValue();
+ } catch (CacheLoaderException cle) {
+ log.warn("Unable to load " + k + " from cache loader", cle);
+ }
+ return null;
+ }
+
final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
// if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
// in new_ch, then add this to the map.
Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -9,9 +9,11 @@
import org.infinispan.container.DataContainer;
import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.CacheLoader;
import org.infinispan.loaders.CacheStore;
import org.infinispan.remoting.rpc.RpcManager;
import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
import org.infinispan.util.Util;
import org.infinispan.util.concurrent.NotifyingFutureImpl;
import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
@@ -71,7 +73,7 @@
CacheStore cs = dmi.getCacheStoreForRehashing();
if (cs != null) {
if (log.isTraceEnabled()) log.trace("Examining state in cache store");
- for (InternalCacheEntry ice : cs.loadAll()) if (statemap.doesNotContainKey(ice.getKey())) statemap.addState(ice);
+ for (Object key: cs.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) statemap.addState(key, cs);
}
// push state.
@@ -198,6 +200,7 @@
ConsistentHash oldCH, newCH;
int replCount;
Map<Address, S> state = new HashMap<Address, S>();
+ protected static final Log log = LogFactory.getLog(LeaveTask.class);
StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
this.leavers = leavers;
@@ -227,7 +230,24 @@
* @param payload an InternalCacheEntry to add to the state map
*/
void addState(InternalCacheEntry payload) {
- Object key = payload.getKey();
+ addState(payload, null, null);
+ }
+
+ /**
+ * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+ * owner list, retrieving the value from a cache loader
+ */
+ void addState(Object key, CacheLoader loader) {
+ addState(null, key, loader);
+ }
+
+
+ private void addState(InternalCacheEntry ice, Object k, CacheLoader loader) {
+ Object key = ice == null ? k : ice.getKey();
+ if (keysHandled.contains(key)) return;
+
+ InternalCacheValue icv = null;
+
for (Address leaver : leavers) {
List<Address> owners = oldCH.locate(key, replCount);
int leaverIndex = owners.indexOf(leaver);
@@ -247,7 +267,20 @@
s = new HashMap<Object, InternalCacheValue>();
state.put(no, s);
}
- s.put(key, payload.toInternalCacheValue());
+
+ if (icv == null) {
+ if (ice == null) {
+ try {
+ InternalCacheEntry payload = loader.load(key);
+ if (payload != null) icv = payload.toInternalCacheValue();
+ } catch (Exception e) {
+ log.warn("Unable to load " + key + " from cache loader", e);
+ }
+ } else {
+ icv = ice.toInternalCacheValue();
+ }
+ }
+ s.put(key, icv);
}
}
}
@@ -255,10 +288,6 @@
}
keysHandled.add(key);
}
-
- boolean doesNotContainKey(Object key) {
- return !keysHandled.contains(key);
- }
}
/**
Modified: trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -53,6 +53,15 @@
Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException;
/**
+ * Loads a set of all keys, excluding a filter set.
+ *
+ * @param keysToExclude a set of keys to exclude. An empty set or null will indicate that all keys should be returned.
+ * @return A set containing keys of entries stored. An empty set is returned if the loader is empty.
+ * @throws CacheLoaderException
+ */
+ Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException;
+
+ /**
* @param key key to test
* @return true if the key exists, false otherwise
* @throws CacheLoaderException in the event of problems reading from source
Modified: trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -118,6 +118,16 @@
}
}
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ acquireGlobalLock(false);
+ try {
+ return loadAllKeysLockSafe(keysToExclude);
+ } finally {
+ releaseGlobalLock(false);
+ }
+ }
+
+
public final void store(InternalCacheEntry ed) throws CacheLoaderException {
if (trace) log.trace("store(" + ed + ")");
if (ed == null) return;
@@ -193,6 +203,8 @@
protected abstract Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException;
+ protected abstract Set<Object> loadAllKeysLockSafe(Set<Object> keysToExclude) throws CacheLoaderException;
+
protected abstract void toStreamLockSafe(ObjectOutput oos) throws CacheLoaderException;
protected abstract void fromStreamLockSafe(ObjectInput ois) throws CacheLoaderException;
Modified: trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -4,6 +4,11 @@
import org.infinispan.loaders.CacheLoaderException;
import org.infinispan.loaders.LockSupportCacheStore;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
/**
* Base class for CacheStore implementations that combine entries into buckets when storing data.
* <p/>
@@ -101,7 +106,80 @@
updateBucket(bucket);
}
+ protected static interface BucketHandler {
+ /**
+ * Handles a bucket that is passed in.
+ * @param bucket bucket to handle. Cannot be null.
+ * @return <tt>true</tt> if <i>no more buckets</i> should be passed in (enoiugh buckets have been handled). <tt>false</tt> otherwise.
+ */
+ boolean handle(Bucket bucket) throws CacheLoaderException;
+ }
+
+ // ah for closures in Java ...
+ protected abstract class CollectionGeneratingBucketHandler<T> implements BucketHandler{
+ Set<T> generated = new HashSet<T>();
+ public abstract boolean consider(Collection<? extends InternalCacheEntry> entries);
+ public Set<T> generate() { return generated; }
+ public boolean handle(Bucket bucket) throws CacheLoaderException {
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) updateBucket(bucket);
+ boolean enoughLooping = consider(bucket.getStoredEntries());
+ if (enoughLooping) return true;
+ }
+ return false;
+ }
+ }
+
+ protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+ CollectionGeneratingBucketHandler<InternalCacheEntry> g = new CollectionGeneratingBucketHandler<InternalCacheEntry>() {
+ public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+ generated.addAll(entries);
+ return false;
+ }
+ };
+
+ loopOverBuckets(g);
+ return g.generate();
+ }
+
+ protected Set<InternalCacheEntry> loadLockSafe(final int max) throws CacheLoaderException {
+ CollectionGeneratingBucketHandler<InternalCacheEntry> g = new CollectionGeneratingBucketHandler<InternalCacheEntry>() {
+ public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+ for (Iterator<? extends InternalCacheEntry> i = entries.iterator(); i.hasNext() && generated.size() < max;) generated.add(i.next());
+ return generated.size() >= max;
+ }
+ };
+
+ loopOverBuckets(g);
+ return g.generate();
+ }
+
+ @Override
+ protected Set<Object> loadAllKeysLockSafe(final Set<Object> keysToExclude) throws CacheLoaderException {
+ CollectionGeneratingBucketHandler<Object> g = new CollectionGeneratingBucketHandler<Object>() {
+ public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+ for (InternalCacheEntry ice: entries) if (keysToExclude == null || !keysToExclude.contains(ice.getKey())) generated.add(ice.getKey());
+ return false;
+ }
+ };
+
+ loopOverBuckets(g);
+ return g.generate();
+ }
+
/**
+ * A mechanism to loop over all buckets in the cache store. Implementations should, very simply, loop over all
+ * available buckets, and for each deserialized bucket, pass it to the handler.
+ * <p />
+ * The implementation is expected to loop over <i>all</i> available buckets (in any order), until {@link org.infinispan.loaders.bucket.BucketBasedCacheStore.BucketHandler#handle(Bucket)}
+ * returns <tt>true</tt> or there are no more buckets available.
+ * <p />
+ * @param handler
+ * @throws CacheLoaderException
+ */
+ protected abstract void loopOverBuckets(BucketHandler handler) throws CacheLoaderException;
+
+ /**
* Updates a bucket in the store with the Bucket passed in to the method. This method assumes that the bucket
* already exists in the store, however some implementations may choose to simply create a new bucket if the bucket
* does not exist.
Modified: trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -75,6 +75,11 @@
return emptySet();
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ return emptySet();
+ }
+
public void start() throws CacheLoaderException {
//nothing to do here
}
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -95,6 +95,11 @@
return delegate.load(numEntries);
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ return delegate.loadAllKeys(keysToExclude);
+ }
+
public boolean containsKey(Object key) throws CacheLoaderException {
return delegate.containsKey(key);
}
Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -129,6 +129,13 @@
return set;
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ Set<Object> set = new HashSet<Object>();
+ for (CacheStore s : stores.keySet()) set.add(s.loadAllKeys(keysToExclude));
+ return set;
+ }
+
public boolean containsKey(Object key) throws CacheLoaderException {
for (CacheLoader l : loaders.keySet()) {
if (l.containsKey(key)) return true;
Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -2,7 +2,6 @@
import org.infinispan.Cache;
import org.infinispan.config.ConfigurationException;
-import org.infinispan.container.entries.InternalCacheEntry;
import org.infinispan.io.ExposedByteArrayOutputStream;
import org.infinispan.loaders.CacheLoaderConfig;
import org.infinispan.loaders.CacheLoaderException;
@@ -14,19 +13,7 @@
import org.infinispan.util.logging.Log;
import org.infinispan.util.logging.LogFactory;
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.io.*;
/**
* A filesystem-based implementation of a {@link org.infinispan.loaders.bucket.BucketBasedCacheStore}. This file store
@@ -34,7 +21,6 @@
*
* @author Manik Surtani
* @author Mircea.Markus at jboss.com
- *
* @since 4.0
*/
@CacheLoaderMetadata(configurationClass = FileCacheStoreConfig.class)
@@ -59,37 +45,14 @@
super.init(config, cache, m);
this.config = (FileCacheStoreConfig) config;
}
-
- protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+
+ protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
for (File bucketFile : root.listFiles()) {
Bucket bucket = loadBucket(bucketFile);
- if (bucket != null) {
- if (bucket.removeExpiredEntries()) {
- updateBucket(bucket);
- }
- result.addAll(bucket.getStoredEntries());
- }
+ if (handler.handle(bucket)) break;
}
- return result;
}
- protected Set<InternalCacheEntry> loadLockSafe(int max) throws CacheLoaderException {
- Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(max);
- for (File bucketFile : root.listFiles()) {
- Bucket bucket = loadBucket(bucketFile);
- if (bucket != null) {
- if (bucket.removeExpiredEntries()) {
- updateBucket(bucket);
- }
- Iterator<? extends InternalCacheEntry> i = bucket.getStoredEntries().iterator();
- while (result.size() < max && i.hasNext()) result.add(i.next());
- }
- if (result.size() >= max) break;
- }
- return result;
- }
-
protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
try {
int numFiles = objectInput.readInt();
@@ -173,7 +136,7 @@
@Override
protected boolean supportsMultiThreadedPurge() {
return true;
- }
+ }
protected void purgeInternal() throws CacheLoaderException {
if (trace) log.trace("purgeInternal()");
Added: trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -0,0 +1,97 @@
+package org.infinispan.util;
+
+import org.infinispan.container.DataContainer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * A Set view of keys in a data container, which is read-only and has efficient contains(), unlike some data container
+ * ley sets.
+ *
+ * @author Manik Surtani
+ * @since 4.1
+ */
+public class ReadOnlyDataContainerBackedKeySet implements Set<Object> {
+
+ final DataContainer container;
+ Set<Object> keySet;
+
+ public ReadOnlyDataContainerBackedKeySet(DataContainer container) {
+ this.container = container;
+ }
+
+ @Override
+ public int size() {
+ return container.size();
+ }
+
+ @Override
+ public boolean isEmpty() {
+ return container.size() == 0;
+ }
+
+ @Override
+ public boolean contains(Object o) {
+ return container.containsKey(o);
+ }
+
+ @Override
+ public Iterator<Object> iterator() {
+ if (keySet == null) keySet = container.keySet();
+ return keySet.iterator();
+ }
+
+ @Override
+ public Object[] toArray() {
+ if (keySet == null) keySet = container.keySet();
+ return keySet.toArray();
+ }
+
+ @Override
+ public <T> T[] toArray(T[] a) {
+ if (keySet == null) keySet = container.keySet();
+ return keySet.toArray(a);
+ }
+
+ @Override
+ public boolean add(Object o) {
+ throw new UnsupportedOperationException("Immutable");
+ }
+
+ @Override
+ public boolean remove(Object o) {
+ throw new UnsupportedOperationException("Immutable");
+ }
+
+ @Override
+ public boolean containsAll(Collection<?> c) {
+ boolean ca = true;
+ for (Object o: c) {
+ ca = ca && contains(o);
+ if (!ca) return false;
+ }
+ return ca;
+ }
+
+ @Override
+ public boolean addAll(Collection<? extends Object> c) {
+ throw new UnsupportedOperationException("Immutable");
+ }
+
+ @Override
+ public boolean retainAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Immutable");
+ }
+
+ @Override
+ public boolean removeAll(Collection<?> c) {
+ throw new UnsupportedOperationException("Immutable");
+ }
+
+ @Override
+ public void clear() {
+ throw new UnsupportedOperationException("Immutable");
+ }
+}
Property changes on: trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -30,12 +30,16 @@
import java.io.ObjectOutput;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Random;
import java.util.Set;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+
/**
* This is a base class containing various unit tests for each and every different CacheStore implementations.
* If you need to add Cache/CacheManager tests that need to be run for each cache store/loader implementation,
@@ -466,6 +470,26 @@
assert !cs.containsKey("k3");
}
+ public void testLoadKeys() throws CacheLoaderException {
+ cs.store(InternalEntryFactory.create("k1", "v"));
+ cs.store(InternalEntryFactory.create("k2", "v"));
+ cs.store(InternalEntryFactory.create("k3", "v"));
+ cs.store(InternalEntryFactory.create("k4", "v"));
+ cs.store(InternalEntryFactory.create("k5", "v"));
+
+ Set<Object> s = cs.loadAllKeys(null);
+ assert s.size() == 5 : "Expected 5 keys, was " + s;
+
+ s = cs.loadAllKeys(emptySet());
+ assert s.size() == 5 : "Expected 5 keys, was " + s;
+
+ Set<Object> excl = Collections.<Object>singleton("k3");
+ s = cs.loadAllKeys(excl);
+ assert s.size() == 4 : "Expected 4 keys but was " + s;
+
+ assert !s.contains("k3");
+ }
+
public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
cs.store(InternalEntryFactory.create("k1", "v1"));
cs.store(InternalEntryFactory.create("k2", "v2"));
Modified: trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java 2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java 2010-03-10 17:06:20 UTC (rev 1589)
@@ -14,6 +14,7 @@
import java.io.ObjectInput;
import java.io.ObjectOutput;
+import java.util.AbstractSet;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
@@ -124,6 +125,15 @@
return s;
}
+ @Override
+ public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+ Set<Object> set = new HashSet<Object>();
+ for (Object key: store.keySet()) {
+ if (keysToExclude == null || !keysToExclude.contains(key)) set.add(key);
+ }
+ return set;
+ }
+
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return Cfg.class;
}
More information about the infinispan-commits
mailing list