[infinispan-commits] Infinispan SVN: r1589 - in trunk: cachestore/cloud/src/main/java/org/infinispan/loaders/cloud and 15 other directories.

infinispan-commits at lists.jboss.org infinispan-commits at lists.jboss.org
Wed Mar 10 12:06:25 EST 2010


Author: manik.surtani at jboss.com
Date: 2010-03-10 12:06:20 -0500 (Wed, 10 Mar 2010)
New Revision: 1589

Added:
   trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
Modified:
   trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
   trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
   trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
   trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java
   trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
   trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
   trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
   trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
   trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
   trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java
   trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
   trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java
   trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
Log:
[ISPN-311] (CacheLoader.loadKeys(), and performance improvements when rehashing from a cache store)

Modified: trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java
===================================================================
--- trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/bdbje/src/main/java/org/infinispan/loaders/bdbje/BdbjeCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -450,6 +450,17 @@
       }
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      try {
+         Set<Object> s = new HashSet<Object>();
+         for (Object o: cacheMap.keySet()) if (keysToExclude == null || !keysToExclude.contains(o)) s.add(o);
+         return s;
+      } catch (RuntimeException caught) {
+         throw convertToCacheLoaderException("error loading all entries", caught);
+      }
+   }
+
    /**
     * {@inheritDoc} This implementation reads the number of entries to load from the stream, then begins a transaction.
     * During that transaction, the cachestore is cleared and replaced with entries from the stream.  If there are any

Modified: trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java
===================================================================
--- trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/cloud/src/main/java/org/infinispan/loaders/cloud/CloudCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -142,30 +142,12 @@
    }
 
    @Override
-   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
-      Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
-
+   protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
       for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
          Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
-         if (bucket.removeExpiredEntries())
-            updateBucket(bucket);
-         result.addAll(bucket.getStoredEntries());
-      }
-      return result;
-   }
-
-   @Override
-   protected Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException {
-      Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(maxEntries);
-
-      for (Map.Entry<String, Blob> entry : ctx.createBlobMap(containerName).entrySet()) {
-         Bucket bucket = readFromBlob(entry.getValue(), entry.getKey());
          if (bucket.removeExpiredEntries()) updateBucket(bucket);
-         for (Iterator<? extends InternalCacheEntry> i = bucket.getStoredEntries().iterator(); i.hasNext() && result.size() < maxEntries;)
-            result.add(i.next());
-         if (result.size() >= maxEntries) break;
+         if (handler.handle(bucket)) break;
       }
-      return result;
    }
 
    @Override

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/DataManipulationHelper.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -160,7 +160,7 @@
          if (filterExpired) ps.setLong(1, System.currentTimeMillis());
          rs = ps.executeQuery();
          rs.setFetchSize(tableManipulation.getFetchSize());
-         Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+         Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(tableManipulation.getFetchSize());
          while (rs.next()) {
             loadAllProcess(rs, result);
          }
@@ -176,6 +176,32 @@
       }
    }
 
+   public Set<Object> loadAllKeysSupport(Set<Object> keysToExclude) throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      try {
+
+         String sql = getLoadAllKeysSql();
+         if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
+         conn = connectionFactory.getConnection();
+         ps = conn.prepareStatement(sql);
+         rs = ps.executeQuery();
+         rs.setFetchSize(tableManipulation.getFetchSize());
+         Set<Object> result = new HashSet<Object>(tableManipulation.getFetchSize());
+         while (rs.next()) loadAllKeysProcess(rs, result, keysToExclude);
+         return result;
+      } catch (SQLException e) {
+         String message = "SQL error while fetching all StoredEntries";
+         log.error(message, e);
+         throw new CacheLoaderException(message, e);
+      } finally {
+         JdbcUtil.safeClose(rs);
+         JdbcUtil.safeClose(ps);
+         connectionFactory.releaseConnection(conn);
+      }
+   }
+
    public final Set<InternalCacheEntry> loadSome(int maxEntries) throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;
@@ -204,15 +230,22 @@
       }
    }
 
-   public abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
+   protected boolean includeKey(Object key, Set<Object> keysToExclude) {
+      return keysToExclude == null || !keysToExclude.contains(key);
+   }
 
-   public abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+   protected abstract String getLoadAllKeysSql();
 
-   public abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+   protected abstract void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException;
 
+   protected abstract void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException;
+
+   protected abstract void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException;
+
+   protected abstract boolean fromStreamProcess(Object objFromStream, PreparedStatement ps, ObjectInput objectInput) throws SQLException, CacheLoaderException, IOException, ClassNotFoundException;
+
    public static void logAndThrow(Exception e, String message) throws CacheLoaderException {
       log.error(message, e);
       throw new CacheLoaderException(message, e);
    }
-   
 }

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/TableManipulation.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -80,6 +80,8 @@
    private String deleteExpiredRowsSql;
    private String loadSomeRowsSql;
    public DatabaseType databaseType;
+   private String loadAllKeysBinarySql;
+   private String loadAllKeysStringSql;
 
    public TableManipulation(String idColumnName, String idColumnType, String tableNamePrefix, String dataColumnName,
                             String dataColumnType, String timestampColumnName, String timestampColumnType) {
@@ -429,23 +431,23 @@
 
          switch (getDatabaseType()) {
             case ORACLE:
-               loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+               loadSomeRowsSql = String.format("SELECT %s, %s FROM (SELECT %s, %s FROM %s) WHERE ROWNUM <= ?", dataColumnName, idColumnName, dataColumnName, idColumnName, getTableName());
                break;
             case DB2:
-               loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+               loadSomeRowsSql = String.format("SELECT %s, %s FROM %s FETCH FIRST ? ROWS ONLY", dataColumnName, idColumnName, getTableName());
                break;
             case INFORMIX:
             case INTERBASE:
             case FIREBIRD:
-               loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+               loadSomeRowsSql = String.format("SELECT FIRST ? %s, %s FROM %s", dataColumnName, idColumnName, getTableName());
                break;
             case SQL_SERVER:
             case ACCESS:
-               loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+               loadSomeRowsSql = String.format("SELECT TOP ? %s, %s FROM %s", dataColumnName, idColumnName, getTableName());
                break;
             default:
                // the MySQL-style LIMIT clause
-               loadSomeRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + getTableName() + " LIMIT ?";
+               loadSomeRowsSql = String.format("SELECT %s, %s FROM %s LIMIT ?", dataColumnName, idColumnName, getTableName());
                break;
          }
 
@@ -453,6 +455,16 @@
       return loadSomeRowsSql;
    }
 
+   public String getLoadAllKeysBinarySql() {
+      if (loadAllKeysBinarySql == null) loadAllKeysBinarySql = String.format("SELECT %s FROM %s", dataColumnName, getTableName());
+      return loadAllKeysBinarySql;
+   }
+
+   public String getLoadAllKeysStringSql() {
+      if (loadAllKeysStringSql == null) loadAllKeysStringSql = String.format("SELECT %s FROM %s", idColumnName, getTableName());
+      return loadAllKeysStringSql;
+   }
+
    private DatabaseType getDatabaseType() {
       if (databaseType == null) {
          // need to guess from the database type!

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/binary/JdbcBinaryCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -50,6 +50,8 @@
 import java.util.Iterator;
 import java.util.Set;
 
+import static org.infinispan.loaders.jdbc.DataManipulationHelper.logAndThrow;
+
 /**
  * {@link BucketBasedCacheStore} implementation that will store all the buckets as rows in database, each row
  * corresponding to a bucket. This is in contrast to {@link org.infinispan.loaders.jdbc.stringbased.JdbcStringBasedCacheStore}
@@ -99,15 +101,25 @@
       }
       dmHelper = new DataManipulationHelper(connectionFactory, tableManipulation, marshaller) {
          @Override
+         protected String getLoadAllKeysSql() {
+            return tableManipulation.getLoadAllKeysBinarySql();
+         }
+
+         @Override
          public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
             InputStream binaryStream = rs.getBinaryStream(1);
             Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
-            for (InternalCacheEntry ice: bucket.getStoredEntries()) {
-               if (!ice.isExpired()) result.add(ice);
-            }
+            for (InternalCacheEntry ice: bucket.getStoredEntries()) if (!ice.isExpired()) result.add(ice);
          }
 
          @Override
+         public void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException {
+            InputStream binaryStream = rs.getBinaryStream(1);
+            Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), binaryStream);
+            for (InternalCacheEntry ice: bucket.getStoredEntries()) if (!ice.isExpired() && includeKey(ice.getKey(), keysToExclude)) keys.add(ice.getKey());
+         }
+
+         @Override
          public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
             Bucket bucket = (Bucket) JdbcUtil.unmarshall(getMarshaller(), is);
             String bucketName = rs.getString(2);
@@ -156,7 +168,7 @@
             throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
          }
       } catch (SQLException ex) {
-         DataManipulationHelper.logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+         logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
       } finally {
          JdbcUtil.safeClose(ps);
          connectionFactory.releaseConnection(conn);
@@ -182,7 +194,7 @@
             throw new CacheLoaderException("Unexpected  update result: '" + updatedRows + "'. Expected values is 1");
          }
       } catch (SQLException e) {
-         DataManipulationHelper.logAndThrow(e, "sql failure while updating bucket: " + bucket);
+         logAndThrow(e, "sql failure while updating bucket: " + bucket);
       } finally {
          JdbcUtil.safeClose(ps);
          connectionFactory.releaseConnection(conn);
@@ -219,15 +231,27 @@
       }
    }
 
+   @Override
    public Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
       return dmHelper.loadAllSupport(false);
    }
 
    @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      return dmHelper.loadAllKeysSupport(keysToExclude);
+   }
+
+   @Override
    protected Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException {
       return dmHelper.loadSome(maxEntries);
    }
 
+   @Override
+   protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
+      // this is a no-op.
+      throw new UnsupportedOperationException("Should never be called.");
+   }
+
    protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
       dmHelper.fromStreamSupport(objectInput);
    }
@@ -271,7 +295,7 @@
          //if something happens make sure buckets locks are being release
          releaseLocks(expiredBuckets);
          connectionFactory.releaseConnection(conn);
-         DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          JdbcUtil.safeClose(ps);
          JdbcUtil.safeClose(rs);
@@ -315,7 +339,7 @@
          //if something happens make sure buckets locks are being release
          releaseLocks(emptyBuckets);
          connectionFactory.releaseConnection(conn);
-         DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
          releaseLocks(expiredBuckets);
@@ -348,7 +372,7 @@
          }
       } catch (SQLException ex) {
          //if something happens make sure buckets locks are being release
-         DataManipulationHelper.logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+         logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
       } finally {
          releaseLocks(emptyBuckets);
          JdbcUtil.safeClose(ps);

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/mixed/JdbcMixedCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -119,13 +119,21 @@
    @Override
    public Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException {
       if (numEntries < 0) return loadAll();
-      Set<InternalCacheEntry> fromBuckets = binaryCacheStore.load(numEntries);
+      Set<InternalCacheEntry> set = stringBasedCacheStore.load(numEntries);
 
-      if (fromBuckets.size() < numEntries) {
-         Set<InternalCacheEntry> fromStrings = stringBasedCacheStore.load(numEntries - fromBuckets.size());
-         fromBuckets.addAll(fromStrings);
+      if (set.size() < numEntries) {
+         Set<InternalCacheEntry> otherSet = binaryCacheStore.load(numEntries - set.size());
+         set.addAll(otherSet);
       }
 
+      return set;
+   }
+
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      Set<Object> fromBuckets = binaryCacheStore.loadAllKeys(keysToExclude);
+      Set<Object> fromStrings = stringBasedCacheStore.loadAllKeys(keysToExclude);
+      fromBuckets.addAll(fromStrings);
       return fromBuckets;
    }
 

Modified: trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java
===================================================================
--- trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbc/src/main/java/org/infinispan/loaders/jdbc/stringbased/JdbcStringBasedCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -103,6 +103,11 @@
       dmHelper = new DataManipulationHelper(connectionFactory, tableManipulation, marshaller) {
 
          @Override
+         protected String getLoadAllKeysSql() {
+            return tableManipulation.getLoadAllKeysStringSql();
+         }
+
+         @Override
          public void loadAllProcess(ResultSet rs, Set<InternalCacheEntry> result) throws SQLException, CacheLoaderException {
             InputStream inputStream = rs.getBinaryStream(1);
             InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), inputStream);
@@ -111,6 +116,12 @@
          }
 
          @Override
+         public void loadAllKeysProcess(ResultSet rs, Set<Object> keys, Set<Object> keysToExclude) throws SQLException, CacheLoaderException {
+            Object k = rs.getObject(1);
+            if (includeKey(k, keysToExclude)) keys.add(k);
+         }
+
+         @Override
          public void toStreamProcess(ResultSet rs, InputStream is, ObjectOutput objectOutput) throws CacheLoaderException, SQLException, IOException {
             InternalCacheValue icv = (InternalCacheValue) JdbcUtil.unmarshall(getMarshaller(), is);
             Object key = rs.getObject(2);
@@ -225,6 +236,11 @@
    }
 
    @Override
+   protected Set<Object> loadAllKeysLockSafe(Set<Object> keysToExclude) throws CacheLoaderException {
+      return dmHelper.loadAllKeysSupport(keysToExclude);
+   }
+
+   @Override
    public void purgeInternal() throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;

Modified: trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java
===================================================================
--- trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/cachestore/jdbm/src/main/java/org/infinispan/loaders/jdbm/JdbmCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -4,7 +4,6 @@
 import jdbm.RecordManagerFactory;
 import jdbm.btree.BTree;
 import jdbm.helper.FastIterator;
-import jdbm.helper.Serializer;
 import jdbm.helper.Tuple;
 import jdbm.helper.TupleBrowser;
 import jdbm.htree.HTree;
@@ -33,6 +32,7 @@
 import java.util.AbstractSet;
 import java.util.ArrayList;
 import java.util.Date;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.NoSuchElementException;
@@ -88,7 +88,7 @@
          locationStr = System.getProperty("java.io.tmpdir");
          config.setLocation(locationStr);
       }
-      
+
       expiryEntryQueue = new LinkedBlockingQueue<ExpiryEntry>(config.getExpiryQueueSize());
 
       // JBCACHE-1448 db name parsing fix courtesy of Ciro Cavani
@@ -152,6 +152,19 @@
       return new BTreeSet(numEntries);
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      try {
+         Set<Object> s = new HashSet<Object>();
+         FastIterator fi = tree.keys();
+         Object o;
+         while ((o = fi.next()) != null) if (keysToExclude == null || !keysToExclude.contains(o)) s.add(o);
+         return s;
+      } catch (IOException e) {
+         throw new CacheLoaderException(e);
+      }
+   }
+
    /**
     * Opens all databases and initializes database related information.
     */
@@ -255,7 +268,7 @@
       store0(entry);
       commit();
    }
-   
+
    private byte[] marshall(InternalCacheEntry entry) throws IOException {
       return getMarshaller().objectToByteBuffer(entry.toInternalCacheValue());
    }
@@ -264,7 +277,7 @@
       if (o == null)
          return null;
       byte b[] = (byte[]) o;
-      InternalCacheValue v = (InternalCacheValue)getMarshaller().objectFromByteBuffer(b);
+      InternalCacheValue v = (InternalCacheValue) getMarshaller().objectFromByteBuffer(b);
       return v.toInternalCacheEntry(key);
    }
 
@@ -291,7 +304,7 @@
       Long at = expiry;
       Object key = entry.getKey();
       if (trace) log.trace("at " + new SimpleDateFormat(DATE).format(new Date(at)) + " expire " + key);
-      
+
       try {
          expiryEntryQueue.put(new ExpiryEntry(at, key));
       } catch (InterruptedException e) {
@@ -311,7 +324,7 @@
             getMarshaller().objectToObjectStream(entry, out);
             count++;
          }
-        getMarshaller().objectToObjectStream(null, out);
+         getMarshaller().objectToObjectStream(null, out);
          log.debug("wrote " + count + " entries");
       } catch (IOException e) {
          throw new CacheLoaderException(e);
@@ -356,7 +369,8 @@
    /**
     * Find all times less than current time. Build a list of keys for those times. Then purge those keys, assuming those
     * keys' expiry has not changed.
-    * @throws ClassNotFoundException 
+    *
+    * @throws ClassNotFoundException
     */
    private void purgeInternal0() throws Exception {
       // Drain queue and update expiry tree
@@ -492,7 +506,7 @@
                if (!hasNext())
                   throw new NoSuchElementException();
                try {
-                  entriesReturned ++;
+                  entriesReturned++;
                   return current;
                } finally {
                   current = null;
@@ -516,11 +530,11 @@
          return size;
       }
    }
-   
+
    private static final class ExpiryEntry {
       private final Long expiry;
       private final Object key;
-      
+
       private ExpiryEntry(long expiry, Object key) {
          this.expiry = expiry;
          this.key = key;

Modified: trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/commands/control/RehashControlCommand.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -20,6 +20,9 @@
 import org.infinispan.marshall.exts.ReplicableCommandExternalizer;
 import org.infinispan.remoting.transport.Address;
 import org.infinispan.remoting.transport.Transport;
+import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
+import org.infinispan.util.logging.Log;
+import org.infinispan.util.logging.LogFactory;
 
 import java.util.HashMap;
 import java.util.List;
@@ -58,6 +61,7 @@
    List<WriteCommand> txLogCommands;
    List<PrepareCommand> pendingPrepares;
    CommandsFactory commandsFactory;
+   private static final Log log = LogFactory.getLog(RehashControlCommand.class);
 
    public RehashControlCommand() {
    }
@@ -142,15 +146,26 @@
 
       CacheStore cacheStore = distributionManager.getCacheStoreForRehashing();
       if (cacheStore != null) {
-         for (InternalCacheEntry ice : cacheStore.loadAll()) {
-            Object k = ice.getKey();
-            if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self))
-               state.put(k, ice.toInternalCacheValue());
+         for (Object k: cacheStore.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) {
+            if (!state.containsKey(k) && shouldAddToMap(k, oldCH, numCopies, self)) {
+               InternalCacheValue v = loadValue(cacheStore, k);
+               if (v != null) state.put(k, v);
+            }
          }
       }
       return state;
    }
 
+   private InternalCacheValue loadValue(CacheStore cs, Object k) {
+      try {
+         InternalCacheEntry ice = cs.load(k);
+         return ice == null ? null : ice.toInternalCacheValue();
+      } catch (CacheLoaderException cle) {
+         log.warn("Unable to load " + k + " from cache loader", cle);
+      }
+      return null;
+   }
+
    final boolean shouldAddToMap(Object k, ConsistentHash oldCH, int numCopies, Address self) {
       // if the current address is the current "owner" of this key (in old_ch), and the requestor is in the owner list
       // in new_ch, then add this to the map.

Modified: trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/distribution/LeaveTask.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -9,9 +9,11 @@
 import org.infinispan.container.DataContainer;
 import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.container.entries.InternalCacheValue;
+import org.infinispan.loaders.CacheLoader;
 import org.infinispan.loaders.CacheStore;
 import org.infinispan.remoting.rpc.RpcManager;
 import org.infinispan.remoting.transport.Address;
+import org.infinispan.util.ReadOnlyDataContainerBackedKeySet;
 import org.infinispan.util.Util;
 import org.infinispan.util.concurrent.NotifyingFutureImpl;
 import org.infinispan.util.concurrent.NotifyingNotifiableFuture;
@@ -71,7 +73,7 @@
          CacheStore cs = dmi.getCacheStoreForRehashing();
          if (cs != null) {
             if (log.isTraceEnabled()) log.trace("Examining state in cache store");
-            for (InternalCacheEntry ice : cs.loadAll()) if (statemap.doesNotContainKey(ice.getKey())) statemap.addState(ice);
+            for (Object key: cs.loadAllKeys(new ReadOnlyDataContainerBackedKeySet(dataContainer))) statemap.addState(key, cs);
          }
 
          // push state.
@@ -198,6 +200,7 @@
    ConsistentHash oldCH, newCH;
    int replCount;
    Map<Address, S> state = new HashMap<Address, S>();
+   protected static final Log log = LogFactory.getLog(LeaveTask.class);
 
    StateMap(List<Address> leavers, ConsistentHash oldCH, ConsistentHash newCH, int replCount) {
       this.leavers = leavers;
@@ -227,7 +230,24 @@
     * @param payload an InternalCacheEntry to add to the state map
     */
    void addState(InternalCacheEntry payload) {
-      Object key = payload.getKey();
+      addState(payload, null, null);
+   }
+
+   /**
+    * Only add state to state map if old_owner_list for key contains a leaver, and the position of the leaver in the old
+    * owner list, retrieving the value from a cache loader
+    */
+   void addState(Object key, CacheLoader loader) {
+      addState(null, key, loader);
+   }
+
+
+   private void addState(InternalCacheEntry ice, Object k, CacheLoader loader) {
+      Object key = ice == null ? k : ice.getKey();
+      if (keysHandled.contains(key)) return;
+      
+      InternalCacheValue icv = null;
+      
       for (Address leaver : leavers) {
          List<Address> owners = oldCH.locate(key, replCount);
          int leaverIndex = owners.indexOf(leaver);
@@ -247,7 +267,20 @@
                         s = new HashMap<Object, InternalCacheValue>();
                         state.put(no, s);
                      }
-                     s.put(key, payload.toInternalCacheValue());
+
+                     if (icv == null) {
+                        if (ice == null) {
+                           try {
+                              InternalCacheEntry payload = loader.load(key);
+                              if (payload != null) icv = payload.toInternalCacheValue();
+                           } catch (Exception e) {
+                              log.warn("Unable to load " + key + " from cache loader", e);
+                           }
+                        } else {
+                           icv = ice.toInternalCacheValue();
+                        }
+                     }
+                     s.put(key, icv);
                   }
                }
             }
@@ -255,10 +288,6 @@
       }
       keysHandled.add(key);
    }
-
-   boolean doesNotContainKey(Object key) {
-      return !keysHandled.contains(key);
-   }
 }
 
 /**

Modified: trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/CacheLoader.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -53,6 +53,15 @@
    Set<InternalCacheEntry> load(int numEntries) throws CacheLoaderException;
 
    /**
+    * Loads a set of all keys, excluding a filter set.
+    *
+    * @param keysToExclude a set of keys to exclude.  An empty set or null will indicate that all keys should be returned.
+    * @return A set containing keys of entries stored.  An empty set is returned if the loader is empty.   
+    * @throws CacheLoaderException
+    */
+   Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException;
+
+   /**
     * @param key key to test
     * @return true if the key exists, false otherwise
     * @throws CacheLoaderException in the event of problems reading from source

Modified: trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/LockSupportCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -118,6 +118,16 @@
       }
    }
 
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      acquireGlobalLock(false);
+      try {
+         return loadAllKeysLockSafe(keysToExclude);
+      } finally {
+         releaseGlobalLock(false);
+      }
+   }
+
+
    public final void store(InternalCacheEntry ed) throws CacheLoaderException {
       if (trace) log.trace("store(" + ed + ")");
       if (ed == null) return;
@@ -193,6 +203,8 @@
 
    protected abstract Set<InternalCacheEntry> loadLockSafe(int maxEntries) throws CacheLoaderException;
 
+   protected abstract Set<Object> loadAllKeysLockSafe(Set<Object> keysToExclude) throws CacheLoaderException;
+
    protected abstract void toStreamLockSafe(ObjectOutput oos) throws CacheLoaderException;
 
    protected abstract void fromStreamLockSafe(ObjectInput ois) throws CacheLoaderException;

Modified: trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/bucket/BucketBasedCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -4,6 +4,11 @@
 import org.infinispan.loaders.CacheLoaderException;
 import org.infinispan.loaders.LockSupportCacheStore;
 
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
 /**
  * Base class for CacheStore implementations that combine entries into buckets when storing data.
  * <p/>
@@ -101,7 +106,80 @@
       updateBucket(bucket);
    }
 
+   protected static interface BucketHandler {
+      /**
+       * Handles a bucket that is passed in.
+       * @param bucket bucket to handle.  Cannot be null.
+       * @return <tt>true</tt> if <i>no more buckets</i> should be passed in (enoiugh buckets have been handled).  <tt>false</tt> otherwise.
+       */
+      boolean handle(Bucket bucket) throws CacheLoaderException;
+   }
+
+   // ah for closures in Java ...
+   protected abstract class CollectionGeneratingBucketHandler<T> implements BucketHandler{
+      Set<T> generated = new HashSet<T>();
+      public abstract boolean consider(Collection<? extends InternalCacheEntry> entries);
+      public Set<T> generate() { return generated; }
+      public boolean handle(Bucket bucket) throws CacheLoaderException {
+         if (bucket != null) {
+            if (bucket.removeExpiredEntries()) updateBucket(bucket);
+            boolean enoughLooping = consider(bucket.getStoredEntries());
+            if (enoughLooping) return true;
+         }
+         return false;
+      }
+   }
+
+   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
+      CollectionGeneratingBucketHandler<InternalCacheEntry> g = new CollectionGeneratingBucketHandler<InternalCacheEntry>() {
+         public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+            generated.addAll(entries);
+            return false;
+         }
+      };
+
+      loopOverBuckets(g);
+      return g.generate();
+   }
+
+   protected Set<InternalCacheEntry> loadLockSafe(final int max) throws CacheLoaderException {
+      CollectionGeneratingBucketHandler<InternalCacheEntry> g = new CollectionGeneratingBucketHandler<InternalCacheEntry>() {
+         public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+            for (Iterator<? extends InternalCacheEntry> i = entries.iterator(); i.hasNext() && generated.size() < max;) generated.add(i.next());
+            return generated.size() >= max;
+         }
+      };
+
+      loopOverBuckets(g);
+      return g.generate();
+   }
+
+   @Override
+   protected Set<Object> loadAllKeysLockSafe(final Set<Object> keysToExclude) throws CacheLoaderException {
+      CollectionGeneratingBucketHandler<Object> g = new CollectionGeneratingBucketHandler<Object>() {
+         public boolean consider(Collection<? extends InternalCacheEntry> entries) {
+            for (InternalCacheEntry ice: entries) if (keysToExclude == null || !keysToExclude.contains(ice.getKey())) generated.add(ice.getKey());
+            return false;
+         }
+      };
+
+      loopOverBuckets(g);
+      return g.generate();
+   }
+
    /**
+    * A mechanism to loop over all buckets in the cache store.  Implementations should, very simply, loop over all
+    * available buckets, and for each deserialized bucket, pass it to the handler.
+    * <p />
+    * The implementation is expected to loop over <i>all</i> available buckets (in any order), until {@link org.infinispan.loaders.bucket.BucketBasedCacheStore.BucketHandler#handle(Bucket)}
+    * returns <tt>true</tt> or there are no more buckets available.
+    * <p />
+    * @param handler
+    * @throws CacheLoaderException
+    */
+   protected abstract void loopOverBuckets(BucketHandler handler) throws CacheLoaderException;
+
+   /**
     * Updates a bucket in the store with the Bucket passed in to the method.  This method assumes that the bucket
     * already exists in the store, however some implementations may choose to simply create a new bucket if the bucket
     * does not exist.

Modified: trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/cluster/ClusterCacheLoader.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -75,6 +75,11 @@
       return emptySet();
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      return emptySet();
+   }
+
    public void start() throws CacheLoaderException {
       //nothing to do here
    }

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/AbstractDelegatingStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -95,6 +95,11 @@
       return delegate.load(numEntries);
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      return delegate.loadAllKeys(keysToExclude);
+   }
+
    public boolean containsKey(Object key) throws CacheLoaderException {
       return delegate.containsKey(key);
    }

Modified: trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/decorators/ChainingCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -129,6 +129,13 @@
       return set;
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      Set<Object> set = new HashSet<Object>();
+      for (CacheStore s : stores.keySet()) set.add(s.loadAllKeys(keysToExclude));
+      return set;
+   }
+
    public boolean containsKey(Object key) throws CacheLoaderException {
       for (CacheLoader l : loaders.keySet()) {
          if (l.containsKey(key)) return true;

Modified: trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/main/java/org/infinispan/loaders/file/FileCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -2,7 +2,6 @@
 
 import org.infinispan.Cache;
 import org.infinispan.config.ConfigurationException;
-import org.infinispan.container.entries.InternalCacheEntry;
 import org.infinispan.io.ExposedByteArrayOutputStream;
 import org.infinispan.loaders.CacheLoaderConfig;
 import org.infinispan.loaders.CacheLoaderException;
@@ -14,19 +13,7 @@
 import org.infinispan.util.logging.Log;
 import org.infinispan.util.logging.LogFactory;
 
-import java.io.BufferedInputStream;
-import java.io.BufferedOutputStream;
-import java.io.ByteArrayInputStream;
-import java.io.File;
-import java.io.FileInputStream;
-import java.io.FileOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
+import java.io.*;
 
 /**
  * A filesystem-based implementation of a {@link org.infinispan.loaders.bucket.BucketBasedCacheStore}.  This file store
@@ -34,7 +21,6 @@
  *
  * @author Manik Surtani
  * @author Mircea.Markus at jboss.com
- * 
  * @since 4.0
  */
 @CacheLoaderMetadata(configurationClass = FileCacheStoreConfig.class)
@@ -59,37 +45,14 @@
       super.init(config, cache, m);
       this.config = (FileCacheStoreConfig) config;
    }
-
-   protected Set<InternalCacheEntry> loadAllLockSafe() throws CacheLoaderException {
-      Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>();
+   
+   protected void loopOverBuckets(BucketHandler handler) throws CacheLoaderException {
       for (File bucketFile : root.listFiles()) {
          Bucket bucket = loadBucket(bucketFile);
-         if (bucket != null) {
-            if (bucket.removeExpiredEntries()) {
-               updateBucket(bucket);
-            }
-            result.addAll(bucket.getStoredEntries());
-         }
+         if (handler.handle(bucket)) break;
       }
-      return result;
    }
 
-   protected Set<InternalCacheEntry> loadLockSafe(int max) throws CacheLoaderException {
-      Set<InternalCacheEntry> result = new HashSet<InternalCacheEntry>(max);
-      for (File bucketFile : root.listFiles()) {
-         Bucket bucket = loadBucket(bucketFile);
-         if (bucket != null) {
-            if (bucket.removeExpiredEntries()) {
-               updateBucket(bucket);
-            }
-            Iterator<? extends InternalCacheEntry> i = bucket.getStoredEntries().iterator();
-            while (result.size() < max && i.hasNext()) result.add(i.next());
-         }
-         if (result.size() >= max) break;
-      }
-      return result;
-   }
-
    protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
       try {
          int numFiles = objectInput.readInt();
@@ -173,7 +136,7 @@
    @Override
    protected boolean supportsMultiThreadedPurge() {
       return true;
-   }   
+   }
 
    protected void purgeInternal() throws CacheLoaderException {
       if (trace) log.trace("purgeInternal()");

Added: trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
===================================================================
--- trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java	                        (rev 0)
+++ trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -0,0 +1,97 @@
+package org.infinispan.util;
+
+import org.infinispan.container.DataContainer;
+
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * A Set view of keys in a data container, which is read-only and has efficient contains(), unlike some data container
+ * ley sets.
+ *
+ * @author Manik Surtani
+ * @since 4.1
+ */
+public class ReadOnlyDataContainerBackedKeySet implements Set<Object> {
+
+   final DataContainer container;
+   Set<Object> keySet;
+
+   public ReadOnlyDataContainerBackedKeySet(DataContainer container) {
+      this.container = container;
+   }
+
+   @Override
+   public int size() {
+      return container.size();
+   }
+
+   @Override
+   public boolean isEmpty() {
+      return container.size() == 0;
+   }
+
+   @Override
+   public boolean contains(Object o) {
+      return container.containsKey(o);
+   }
+
+   @Override
+   public Iterator<Object> iterator() {
+      if (keySet == null) keySet = container.keySet();
+      return keySet.iterator();
+   }
+
+   @Override
+   public Object[] toArray() {
+      if (keySet == null) keySet = container.keySet();
+      return keySet.toArray();
+   }
+
+   @Override
+   public <T> T[] toArray(T[] a) {
+      if (keySet == null) keySet = container.keySet();
+      return keySet.toArray(a);
+   }
+
+   @Override
+   public boolean add(Object o) {
+      throw new UnsupportedOperationException("Immutable");
+   }
+
+   @Override
+   public boolean remove(Object o) {
+      throw new UnsupportedOperationException("Immutable");
+   }
+
+   @Override
+   public boolean containsAll(Collection<?> c) {
+      boolean ca = true;
+      for (Object o: c) {
+         ca = ca && contains(o);
+         if (!ca) return false;
+      }
+      return ca;
+   }
+
+   @Override
+   public boolean addAll(Collection<? extends Object> c) {
+      throw new UnsupportedOperationException("Immutable");
+   }
+
+   @Override
+   public boolean retainAll(Collection<?> c) {
+      throw new UnsupportedOperationException("Immutable");
+   }
+
+   @Override
+   public boolean removeAll(Collection<?> c) {
+      throw new UnsupportedOperationException("Immutable");
+   }
+
+   @Override
+   public void clear() {
+      throw new UnsupportedOperationException("Immutable");
+   }
+}


Property changes on: trunk/core/src/main/java/org/infinispan/util/ReadOnlyDataContainerBackedKeySet.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/test/java/org/infinispan/loaders/BaseCacheStoreTest.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -30,12 +30,16 @@
 import java.io.ObjectOutput;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Random;
 import java.util.Set;
 
+import static java.util.Collections.emptySet;
+import static java.util.Collections.singleton;
+
 /**
  * This is a base class containing various unit tests for each and every different CacheStore implementations. 
  * If you need to add Cache/CacheManager tests that need to be run for each cache store/loader implementation,
@@ -466,6 +470,26 @@
       assert !cs.containsKey("k3");
    }
 
+   public void testLoadKeys() throws CacheLoaderException {
+      cs.store(InternalEntryFactory.create("k1", "v"));
+      cs.store(InternalEntryFactory.create("k2", "v"));
+      cs.store(InternalEntryFactory.create("k3", "v"));
+      cs.store(InternalEntryFactory.create("k4", "v"));
+      cs.store(InternalEntryFactory.create("k5", "v"));
+
+      Set<Object> s = cs.loadAllKeys(null);
+      assert s.size() == 5 : "Expected 5 keys, was " + s;
+
+      s = cs.loadAllKeys(emptySet());
+      assert s.size() == 5 : "Expected 5 keys, was " + s;
+
+      Set<Object> excl = Collections.<Object>singleton("k3");
+      s = cs.loadAllKeys(excl);
+      assert s.size() == 4 : "Expected 4 keys but was " + s;
+
+      assert !s.contains("k3");
+   }
+
    public void testStreamingAPI() throws IOException, ClassNotFoundException, CacheLoaderException {
       cs.store(InternalEntryFactory.create("k1", "v1"));
       cs.store(InternalEntryFactory.create("k2", "v2"));

Modified: trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java
===================================================================
--- trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java	2010-03-10 17:01:14 UTC (rev 1588)
+++ trunk/core/src/test/java/org/infinispan/loaders/dummy/DummyInMemoryCacheStore.java	2010-03-10 17:06:20 UTC (rev 1589)
@@ -14,6 +14,7 @@
 
 import java.io.ObjectInput;
 import java.io.ObjectOutput;
+import java.util.AbstractSet;
 import java.util.HashSet;
 import java.util.Iterator;
 import java.util.Map;
@@ -124,6 +125,15 @@
       return s;
    }
 
+   @Override
+   public Set<Object> loadAllKeys(Set<Object> keysToExclude) throws CacheLoaderException {
+      Set<Object> set = new HashSet<Object>();
+      for (Object key: store.keySet()) {
+         if (keysToExclude == null || !keysToExclude.contains(key)) set.add(key);
+      }
+      return set;
+   }
+
    public Class<? extends CacheLoaderConfig> getConfigurationClass() {
       return Cfg.class;
    }



More information about the infinispan-commits mailing list