[jbosscache-commits] JBoss Cache SVN: r7791 - in core/branches/flat/src: main/java/org/horizon/loader/file and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Thu Feb 26 09:25:24 EST 2009


Author: mircea.markus
Date: 2009-02-26 09:25:24 -0500 (Thu, 26 Feb 2009)
New Revision: 7791

Added:
   core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
Modified:
   core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
   core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
   core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
   core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
   core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
   core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
   core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
   core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
Log:
ongoing FileCacheStoreWork

Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -73,6 +73,16 @@
       return entries.values();
    }
 
+   public long timestampOfFirstEntryToExpire() {
+      long result = Long.MAX_VALUE;
+      for (StoredEntry se : entries.values()) {
+         if (se.getExpiryTime() < result) {
+            result = se.getExpiryTime();
+         }
+      }
+      return result;
+   }
+
    @Override
    public String toString() {
       return "Bucket{" +
@@ -80,4 +90,8 @@
             ", bucketName='" + bucketName + '\'' +
             '}';
    }
+
+   public boolean isEmpty() {
+      return entries.isEmpty();
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -2,18 +2,25 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.horizon.lock.StripedLock;
+import org.horizon.Cache;
+import org.horizon.util.concurrent.WithinThreadExecutor;
 import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
 import org.horizon.loader.StoredEntry;
-import org.horizon.loader.CacheLoaderException;
-import org.horizon.loader.CacheLoaderConfig;
-import org.horizon.Cache;
+import org.horizon.lock.StripedLock;
 import org.horizon.marshall.Marshaller;
 
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
 
 /**
  * //TODO comment this
@@ -28,6 +35,7 @@
    private StripedLock bucketLocks;
    private BucketBasedCacheStoreConfig config;
    private long globalLockTimeoutMillis;
+   private ExecutorService purgerService;
 
    /**
     * This global lock guards against direct store access via clear() and the stream APIs.  These three methods would
@@ -43,8 +51,17 @@
    public void start() throws CacheLoaderException {
       bucketLocks = new StripedLock(config.getLockConcurrencyLevel());
       globalLockTimeoutMillis = config.getLockAcquistionTimeout();
+      if (config.isPurgeSynchronously()) {
+         purgerService = new WithinThreadExecutor();
+      } else {
+         purgerService = Executors.newSingleThreadExecutor();
+      }
    }
 
+   public void stop() throws CacheLoaderException {
+      purgerService.shutdownNow();
+   }
+
    public StoredEntry load(Object key) throws CacheLoaderException {
       if (log.isTraceEnabled()) log.trace("Loading entry " + key);
       String keyHashCode = String.valueOf(key.hashCode());
@@ -61,8 +78,6 @@
          } else {
             return se;
          }
-      } catch (Exception e) {
-         throw new CacheLoaderException("Problems loading key " + key, e);
       } finally {
          unlock(keyHashCode);
       }
@@ -90,9 +105,6 @@
             bucket.addEntry(ed);
             insertBucket(bucket);
          }
-      }
-      catch (Exception ex) {
-         throw new CacheLoaderException("Problems storing entry with key " + ed.getKey(), ex);
       } finally {
          unlock(keyHashCode);
       }
@@ -112,14 +124,70 @@
             if (success) saveBucket(bucket);
             return success;
          }
-      } catch (Exception e) {
-         throw new CacheLoaderException("Problems removing key " + key, e);
       } finally {
          unlock(keyHashCodeStr);
       }
    }
 
+   public void fromStream(InputStream inputStream) throws CacheLoaderException {
+      ObjectInputStream ois = null;
+      try {
+         // first clear all local state
+         acquireGlobalLock(true);
+         clear();
+         ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
+               new ObjectInputStream(inputStream);
+         fromStreamInternal(ois);
+      }
+      catch (IOException e) {
+         throw new CacheLoaderException("Cannot convert to ObjectInputSream", e);
+      } finally {
+         releaseGlobalLock(true);
+         // we should close the stream we created!
+         if (inputStream != ois) safeClose(ois);
+      }
+   }
 
+   public void toStream(OutputStream outputStream) throws CacheLoaderException {
+      ObjectOutputStream oos = null;
+      try {
+         acquireGlobalLock(true);
+         try {
+            oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
+                  new ObjectOutputStream(outputStream);
+         } catch (IOException e) {
+            throw new CacheLoaderException(e);
+         }
+         toStreamInternal(oos);
+      } finally {
+         releaseGlobalLock(true);
+         // we should close the stream we created!
+         if (oos != outputStream) safeClose(oos);
+      }
+   }
+
+   public void clear() throws CacheLoaderException {
+      log.trace("Clearing store");
+      try {
+         acquireGlobalLock(true);
+         clearInternal();
+      } finally {
+         releaseGlobalLock(true);
+      }
+   }
+
+   public void purgeExpired() {
+      purgerService.execute(new Runnable() {
+         public void run() {
+            try {
+               purgeInternal();
+            } catch (CacheLoaderException e) {
+               log.info("Problems encountered while purging expired", e);
+            }
+         }
+      });
+   }
+
    protected void unlock(String keyHashCode) {
       bucketLocks.releaseLock(keyHashCode);
       globalLock.readLock().unlock();
@@ -135,6 +203,18 @@
       bucketLocks.acquireLock(keyHashCode, true);
    }
 
+   protected boolean immediateLockForWritting(String keyHashCode) throws CacheLoaderException {
+      try {
+         if (!globalLock.readLock().tryLock(0, TimeUnit.MILLISECONDS)) {
+            return false;
+         }
+      } catch (InterruptedException e) {
+         log.warn("Received interrupt signal while waiting for global lock aquisition");
+         throw new CacheLoaderException(e);
+      }
+      return bucketLocks.acquireLock(keyHashCode, true, 0);
+   }
+
    protected void lockForReading(String keyHashCode) throws CacheLoaderException {
       try {
          globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
@@ -145,10 +225,14 @@
       bucketLocks.acquireLock(keyHashCode, false);
    }
 
-   protected void acquireGlobalLock(boolean exclusive) throws TimeoutException, InterruptedException {
+   protected void acquireGlobalLock(boolean exclusive) throws CacheLoaderException {
       Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
-      if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
-         throw new TimeoutException("Timed out trying to acquire " + (exclusive ? "exclusive" : "shared") + " global lock after " + globalLockTimeoutMillis + " millis.  Lock is " + l);
+      try {
+         if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
+            throw new CacheLoaderException("Timed out trying to acquire " + (exclusive ? "exclusive" : "shared") + " global lock after " + globalLockTimeoutMillis + " millis.  Lock is " + l);
+      } catch (InterruptedException e) {
+         throw new CacheLoaderException(e);
+      }
    }
 
    protected void releaseGlobalLock(boolean exclusive) {
@@ -172,4 +256,12 @@
    protected abstract void saveBucket(Bucket bucket) throws CacheLoaderException;
 
    protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
+
+   protected abstract void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException;
+
+   protected abstract void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException;
+
+   protected abstract void clearInternal() throws CacheLoaderException;
+
+   protected abstract void purgeInternal() throws CacheLoaderException;
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -11,6 +11,7 @@
 
    private int lockConcurrencyLevel = 2048;
    private long lockAcquistionTimeout = 60000;
+   private boolean purgeSynchronously = false;
    
    public int getLockConcurrencyLevel() {
       return lockConcurrencyLevel;
@@ -29,4 +30,13 @@
       testImmutability("lockAcquistionTimeout");
       this.lockAcquistionTimeout = lockAcquistionTimeout;
    }
+
+   public boolean isPurgeSynchronously() {
+      return purgeSynchronously;
+   }
+
+   public void setPurgeSynchronously(boolean purgeSynchronously) {
+      testImmutability("purgeSynchronously");
+      this.purgeSynchronously = purgeSynchronously;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -10,13 +10,17 @@
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
 import org.horizon.marshall.Marshaller;
-import org.horizon.util.concurrent.WithinThreadExecutor;
 
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 
 /**
  * A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}.  This file store stores stuff in the
@@ -42,8 +46,8 @@
 public class FileCacheStore extends BucketBasedCacheStore {
    private static final Log log = LogFactory.getLog(FileCacheStore.class);
    private int streamBufferSize;
-   ExecutorService purgerService;
 
+
    FileCacheStoreConfig cfg;
    Cache cache;
    Marshaller m;
@@ -77,14 +81,8 @@
       return result;
    }
 
-   public void fromStream(InputStream inputStream) throws CacheLoaderException {
-      ObjectInputStream ois = null;
+   protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
       try {
-         // first clear all local state
-         acquireGlobalLock(true);
-         clear();
-         ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
-               new ObjectInputStream(inputStream);
          int numFiles = ois.readInt();
          byte[] buffer = new byte[streamBufferSize];
          int bytesRead, totalBytesRead = 0;
@@ -101,30 +99,20 @@
                bos.write(buffer, 0, bytesRead);
             }
             bos.flush();
-            bos.close();
+            safeClose(bos);
             fos.flush();
-            fos.close();
+            safeClose(fos);
             totalBytesRead = 0;
          }
+      } catch (IOException e) {
+         throw new CacheLoaderException("I/O error", e);
+      } catch (ClassNotFoundException e) {
+         throw new CacheLoaderException("Unexpected expcetion", e);
       }
-      catch (Exception e) {
-         CacheLoaderException cle = (e instanceof CacheLoaderException) ? (CacheLoaderException) e :
-               new CacheLoaderException("Problems reading from stream", e);
-         throw cle;
-      }
-      finally {
-         releaseGlobalLock(true);
-         // we should close the stream we created!
-         if (inputStream != ois) safeClose(ois);
-      }
    }
 
-   public void toStream(OutputStream outputStream) throws CacheLoaderException {
-      ObjectOutputStream oos = null;
+   protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
       try {
-         acquireGlobalLock(true);
-         oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
-               new ObjectOutputStream(outputStream);
          File[] files = root.listFiles();
          oos.writeInt(files.length);
          byte[] buffer = new byte[streamBufferSize];
@@ -145,39 +133,19 @@
             bis.close();
             fileInStream.close();
          }
-      } catch (Exception ioe) {
-         throw new CacheLoaderException("Problems handling stream", ioe);
-      } finally {
-         releaseGlobalLock(true);
-         // we should close the stream we created!
-         if (oos != outputStream) safeClose(oos);
+      } catch (IOException e) {
+         throw new CacheLoaderException("I/O expcetion while generating stream", e);
       }
    }
 
-   public void clear() throws CacheLoaderException {
-      log.trace("Clearing store");
-      try {
-         acquireGlobalLock(true);
-         for (File f : root.listFiles()) {
-            if (!f.delete()) log.warn("Had problems removing file {0}", f);
-         }
-      } catch (Exception e) {
-         throw new CacheLoaderException("Problems clearing cache store", e);
-      } finally {
-         releaseGlobalLock(true);
+   protected void clearInternal() throws CacheLoaderException {
+      for (File f : root.listFiles()) {
+         if (!f.delete()) log.warn("Had problems removing file {0}", f);
       }
    }
 
-   public void purgeExpired() {
-      purgerService.execute(new Runnable() {
-         public void run() {
-            try {
-               loadAll();
-            } catch (CacheLoaderException e) {
-               log.info("Problems encountered while purging expired", e);
-            }
-         }
-      });
+   protected void purgeInternal() throws CacheLoaderException {
+      loadAll();
    }
 
    protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
@@ -252,18 +220,9 @@
          if (!root.mkdirs())
             throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
       }
-      if (cfg.isPurgeSynchronously()) {
-         purgerService = new WithinThreadExecutor();
-      } else {
-         purgerService = Executors.newSingleThreadExecutor();
-      }
       streamBufferSize = cfg.getStreamBufferSize();
    }
 
-   public void stop() {
-      purgerService.shutdownNow();
-   }
-
    public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
       return loadBucket(key.hashCode() + "");
    }

Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -22,7 +22,6 @@
  */
 public class FileCacheStoreConfig extends BucketBasedCacheStoreConfig {
    String location = "Horizon-FileCacheStore";
-   private boolean purgeSynchronously = false;
    private int streamBufferSize = 8192;
 
    public FileCacheStoreConfig() {
@@ -38,15 +37,7 @@
       this.location = location;
    }
 
-   public boolean isPurgeSynchronously() {
-      return purgeSynchronously;
-   }
 
-   public void setPurgeSynchronously(boolean purgeSynchronously) {
-      testImmutability("purgeSynchronously");
-      this.purgeSynchronously = purgeSynchronously;
-   }
-
    public int getStreamBufferSize() {
       return streamBufferSize;
    }

Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -13,13 +13,16 @@
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.sql.Connection;
 import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
 import java.util.Set;
-import java.util.HashSet;
 
 /**
  * // TODO: Manik: Document this!
@@ -29,6 +32,7 @@
 public class JdbcCacheStore extends BucketBasedCacheStore {
 
    private static final Log log = LogFactory.getLog(JdbcCacheStore.class);
+   public final static String STREAM_DELIMITER = "__jdbcCacheLoader_done__";
 
    private JdbcCacheStoreConfig config;
    private ConnectionFactory connectionFactory;
@@ -87,6 +91,7 @@
          ps.setString(1, bucket.getBucketName());
          ByteBuffer byteBuffer = marshall(bucket);
          ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
+         ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
          int insertedRows = ps.executeUpdate();
          if (insertedRows != 1) {
             throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
@@ -111,12 +116,12 @@
          ps = conn.prepareStatement(sql);
          ByteBuffer buffer = marshall(bucket);
          ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
-         ps.setString(2, bucket.getBucketName());
+         ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+         ps.setString(3, bucket.getBucketName());
          int updatedRows = ps.executeUpdate();
          if (updatedRows != 1) {
             throw new CacheLoaderException("Unexpected  update result: '" + updatedRows + "'. Expected values is 1");
          }
-
       } catch (SQLException e) {
          logAndThrow(e, "sql failure while updating bucket: " + bucket);
       } finally {
@@ -155,7 +160,6 @@
       }
    }
 
-
    public Set<StoredEntry> loadAll() throws CacheLoaderException {
       Connection conn = null;
       PreparedStatement ps = null;
@@ -169,6 +173,12 @@
          ps = conn.prepareStatement(sql);
          rs = ps.executeQuery();
          Set<StoredEntry> result = new HashSet<StoredEntry>();
+         while (rs.next()) {
+            InputStream binaryStream = rs.getBinaryStream(1);
+            Bucket bucket = unmarshall(binaryStream);
+            result.addAll(bucket.getStoredEntries());
+         }
+         return result;
       } catch (SQLException e) {
          String message = "sql failure while loading key: ";
          log.error(message, e);
@@ -178,25 +188,217 @@
          JdbcUtil.safeClose(ps);
          releaseConnection(conn);
       }
-      return null;
    }
 
-   public void fromStream(InputStream inputStream) throws CacheLoaderException {
-      throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+   protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      try {
+         conn = getConnection();
+         String sql = config.getInsertBucketSql();
+         ps = conn.prepareStatement(sql);
+
+         int readBuckets = 0;
+         int batchSize = 100;
+         String bucketName = (String) ois.readObject();
+         while (!bucketName.equals(STREAM_DELIMITER)) {
+            Bucket bucket = (Bucket) ois.readObject();
+            readBuckets++;
+            ps.setString(1, bucketName);
+            ByteBuffer buffer = marshall(bucket);
+            ps.setBinaryStream(2, buffer.getStream(), buffer.getLength());
+            ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
+            if (readBuckets % batchSize == 0) {
+               ps.executeBatch();
+               if (log.isTraceEnabled())
+                  log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
+            } else {
+               ps.addBatch();
+            }
+            bucketName = (String) ois.readObject();
+         }
+         if (readBuckets % batchSize != 0)
+            ps.executeBatch();//flush the batch
+         if (log.isTraceEnabled())
+            log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
+      } catch (IOException ex) {
+         logAndThrow(ex, "I/O failure while integrating state into store");
+      } catch (SQLException e) {
+         logAndThrow(e, "SQL failure while integrating state into store");
+      } catch (ClassNotFoundException e) {
+         logAndThrow(e, "Unexpected failure while integrating state into store");
+      } finally {
+         JdbcUtil.safeClose(ps);
+         releaseConnection(conn);
+      }
    }
 
-   public void toStream(OutputStream outputStream) throws CacheLoaderException {
-      throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+   protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      try {
+         conn = getConnection();
+         String sql = config.getLoadAllSql();
+         ps = conn.prepareStatement(sql);
+         rs = ps.executeQuery();
+         rs.setFetchSize(100);
+         while (rs.next()) {
+            InputStream inputStream = rs.getBinaryStream(1);
+            Bucket bucket = unmarshall(inputStream);
+            String bucketName = rs.getString(2);
+            oos.writeObject(bucketName);
+            oos.writeObject(bucket);
+         }
+         oos.writeObject(STREAM_DELIMITER);
+      } catch (SQLException ex) {
+         logAndThrow(ex, "SQL failure while writing store's content to stream");
+      }
+      catch (IOException e) {
+         logAndThrow(e, "IO failure while writing store's content to stream");
+      } finally {
+         JdbcUtil.safeClose(rs);
+         JdbcUtil.safeClose(ps);
+         releaseConnection(conn);
+      }
    }
 
-   public void clear() throws CacheLoaderException {
-      throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+   protected void clearInternal() throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      try {
+         String sql = config.getClearSql();
+         conn = getConnection();
+         ps = conn.prepareStatement(sql);
+         int result = ps.executeUpdate();
+         if (log.isTraceEnabled())
+            log.trace("Successfully removed " + result + " rows.");
+      } catch (SQLException ex) {
+         logAndThrow(ex, "Failed clearing JdbcCacheStore");
+      } finally {
+         JdbcUtil.safeClose(ps);
+         releaseConnection(conn);
+      }
    }
 
-   public void purgeExpired() throws CacheLoaderException {
-      throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+   protected void purgeInternal() throws CacheLoaderException {
+      Connection conn = null;
+      PreparedStatement ps = null;
+      ResultSet rs = null;
+      Set<Bucket> expiredBuckets = new HashSet<Bucket>();
+      final int batchSize = 100;
+      try {
+         String sql = config.getSelectExpiredBucketsSql();
+         conn = getConnection();
+         ps = conn.prepareStatement(sql);
+         ps.setLong(1, System.currentTimeMillis());
+         rs = ps.executeQuery();
+         while (rs.next()) {
+            String key = rs.getString(2);
+            if (immediateLockForWritting(key)) {
+               if (log.isTraceEnabled()) log.trace("Adding bucket keyed " + key + " for purging.");
+               InputStream binaryStream = rs.getBinaryStream(1);
+               Bucket bucket = unmarshall(binaryStream);
+               bucket.setBucketName(key);
+               expiredBuckets.add(bucket);
+            } else {
+               if (log.isTraceEnabled())
+                  log.trace("Could not acquire write lock for " + key + ", this won't be purged even though it has expired elements");
+            }
+         }
+      } catch (SQLException ex) {
+         //if something happens make sure buckets locks are being release
+         releaseLocks(expiredBuckets);
+         releaseConnection(conn);
+         logAndThrow(ex, "Failed clearing JdbcCacheStore");
+      } finally {
+         JdbcUtil.safeClose(ps);
+         JdbcUtil.safeClose(rs);
+      }
+
+      if (log.isTraceEnabled())
+         log.trace("Found following buckets: " + expiredBuckets + " which are about to be expired");
+
+      if (expiredBuckets.isEmpty()) return;
+      Set<Bucket> emptyBuckets = new HashSet<Bucket>();
+      //now update all the buckets in batch
+      try {
+         String sql = config.getSaveBucketSql();
+         ps = conn.prepareStatement(sql);
+         int updateCount = 0;
+         Iterator<Bucket> it = expiredBuckets.iterator();
+         while (it.hasNext()) {
+            Bucket bucket = it.next();
+            bucket.removeExpiredEntries();
+            if (!bucket.isEmpty()) {
+               ByteBuffer byteBuffer = marshall(bucket);
+               ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+               ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+               ps.addBatch();
+               updateCount++;
+               if (updateCount % batchSize == 0) {
+                  ps.executeBatch();
+                  if (log.isTraceEnabled()) log.trace("Flushing batch, update count is: " + updateCount);
+               }
+            } else {
+               it.remove();
+               emptyBuckets.add(bucket);
+            }
+         }
+         //flush the batch
+         if (updateCount % batchSize != 0) {
+            ps.executeBatch();
+         }
+         if (log.isTraceEnabled()) log.trace("Updated " + updateCount + " buckets.");
+      } catch (SQLException ex) {
+         //if something happens make sure buckets locks are being release
+         releaseLocks(emptyBuckets);
+         releaseConnection(conn);
+         logAndThrow(ex, "Failed clearing JdbcCacheStore");
+      } finally {
+         //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
+         releaseLocks(expiredBuckets);
+         JdbcUtil.safeClose(ps);
+      }
+
+
+      if (log.isTraceEnabled()) log.trace("About to remove empty buckets " + emptyBuckets);
+
+      if (emptyBuckets.isEmpty()) return;
+      //then remove the empty buckets
+      try {
+         String sql = config.getDeleteBucketSql();
+         ps = conn.prepareStatement(sql);
+         int deletionCount = 0;
+         for (Bucket bucket : emptyBuckets) {
+            ps.setString(1, bucket.getBucketName());
+            ps.addBatch();
+            deletionCount++;
+            if (deletionCount % batchSize == 0) {
+               if (log.isTraceEnabled()) log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
+               ps.executeBatch();
+            }
+         }
+         if (deletionCount % batchSize != 0) {
+            int[] batchResult = ps.executeBatch();
+            if (log.isTraceEnabled()) log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
+         }
+      } catch (SQLException ex) {
+         //if something happens make sure buckets locks are being release
+         logAndThrow(ex, "Failed clearing JdbcCacheStore");
+      } finally {
+         releaseLocks(emptyBuckets);
+         JdbcUtil.safeClose(ps);
+         releaseConnection(conn);
+      }
    }
 
+   private void releaseLocks(Set<Bucket> expiredBucketKeys) throws CacheLoaderException {
+      for (Bucket bucket : expiredBucketKeys) {
+         unlock(bucket.getBucketName());
+      }
+   }
+
    public Class<? extends CacheLoaderConfig> getConfigurationClass() {
       return JdbcCacheStoreConfig.class;
    }
@@ -206,7 +408,8 @@
    }
 
    private void releaseConnection(Connection conn) {
-      connectionFactory.releaseConnection(conn);
+      if (conn != null)//connection might be null as we only release it in finally blocks
+         connectionFactory.releaseConnection(conn);
    }
 
    private ByteBuffer marshall(Bucket bucket) throws CacheLoaderException {

Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -17,7 +17,7 @@
 
    private String connectionFactoryClass;
 
-   /* required by NonManagedConnectionFactory */
+   /* required by SimpleConnectionFactory */
    private String connectionUrl;
    private String userName;
    private String password;
@@ -30,10 +30,17 @@
    private String keyColumnType;
    private String dataColumnName;
    private String dataColumnType;
+   private String timestampColumnName;
+   private String timestampColumnType;
+
+   /* cache for sql commands */
    private String insertBucketSql;
    private String saveBucketSql;
    private String loadBucketSql;
    private String loadAllSql;
+   private String clearSql;
+   private String selectExpiredBucketsSql;
+   private String deleteBucketSql;
 
    public JdbcCacheStoreConfig() {
       className = JdbcCacheStore.class.getName();
@@ -144,23 +151,38 @@
       this.dataColumnType = dataColumnType;
    }
 
+   public String getTimestampColumnName() {
+      return timestampColumnName;
+   }
+
+   public void setTimestampColumnName(String timestampColumnName) {
+      this.timestampColumnName = timestampColumnName;
+   }
+
+   public String getTimestampColumnType() {
+      return timestampColumnType;
+   }
+
+   public void setTimestampColumnType(String timestampColumnType) {
+      this.timestampColumnType = timestampColumnType;
+   }
+
    @Override
    public JdbcCacheStoreConfig clone() {
-      JdbcCacheStoreConfig dolly = (JdbcCacheStoreConfig) super.clone();
       //don't have to assign any variables as all are primitives, and cannot change
-      return dolly;
+      return (JdbcCacheStoreConfig) super.clone();
    }
 
    public String getInsertBucketSql() {
       if (insertBucketSql == null) {
-         insertBucketSql = "INSERT INTO " + tableName + " (" + keyColumnName + ", " + dataColumnName + ")";
+         insertBucketSql = "INSERT INTO " + tableName + " (" + keyColumnName + ", " + dataColumnName + ", " + timestampColumnName + ") VALUES(?,?,?)";
       }
       return insertBucketSql;
    }
 
    public String getSaveBucketSql() {
       if (saveBucketSql == null) {
-         saveBucketSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? WHERE " + keyColumnName + " = ?";
+         saveBucketSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? , " + timestampColumnName + "=? WHERE " + keyColumnName + " = ?";
       }
       return saveBucketSql;
    }
@@ -172,10 +194,31 @@
       return loadBucketSql;
    }
 
+   public String getDeleteBucketSql() {
+      if (deleteBucketSql == null) {
+         deleteBucketSql = "DELETE FROM " + tableName + " WHERE " + keyColumnName + " = ?";
+      }
+      return deleteBucketSql;
+   }
+
    public String getLoadAllSql() {
       if (loadAllSql == null) {
-         loadAllSql = "SELECT " + dataColumnName + " FROM " + tableName;
+         loadAllSql = "SELECT " + dataColumnName + "," + keyColumnName + " FROM " + tableName;
       }
       return loadAllSql;
    }
+
+   public String getClearSql() {
+      if (clearSql == null) {
+         clearSql = "DELETE FROM " + tableName;
+      }
+      return clearSql;
+   }
+
+   public String getSelectExpiredBucketsSql() {
+      if (selectExpiredBucketsSql == null) {
+         selectExpiredBucketsSql = getLoadAllSql() + " WHERE " + timestampColumnName + "< ?";
+      }
+      return selectExpiredBucketsSql;
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -1,8 +1,9 @@
 package org.horizon.loader.jdbc;
 
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
 import java.sql.Statement;
-import java.sql.SQLException;
-import java.sql.ResultSet;
 
 /**
  * // TODO: Mircea: Document this!
@@ -20,6 +21,16 @@
       }
    }
 
+   public static void safeClose(Connection connection) {
+      if (connection != null) {
+         try {
+            connection.close();
+         } catch (SQLException e) {
+            e.printStackTrace();
+         }
+      }
+   }
+
    public static void safeClose(ResultSet rs) {
       if (rs != null) {
          try {

Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -84,11 +84,12 @@
       // removed CONSTRAINT clause as this causes problems with some databases, like Informix.
       assertMandatoryElemenetsPresent();
       String creatTableDdl = "CREATE TABLE " + config.getTableName() + "(" + config.getKeyColumnName() + " " + config.getKeyColumnType()
-            + " NOT NULL, " + config.getDataColumnName() + " " + config.getDataColumnType() +
+            + " NOT NULL, " + config.getDataColumnName() + " " + config.getDataColumnType() + ", "
+            + config.getTimestampColumnName() + " " + config.getTimestampColumnType() +            
             ", PRIMARY KEY (" + config.getKeyColumnName() + "))";
       if (log.isTraceEnabled())
          log.trace("Creating table with following DDL: '" + creatTableDdl + "'.");
-      executeDdlStatement(creatTableDdl);
+      executeUpdateSql(creatTableDdl);
    }
 
    private void assertMandatoryElemenetsPresent() throws CacheLoaderException {
@@ -97,17 +98,20 @@
       assrtNotNull(config.getTableName(), "tableName needed in order to create table");
       assrtNotNull(config.getDataColumnName(), "dataColumnName needed in order to create table");
       assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
+      assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
+      assrtNotNull(config.getTimestampColumnName(), "timestampColumnName needed in order to create table");
+      assrtNotNull(config.getTimestampColumnType(), "timestampColumnType needed in order to create table");
    }
 
    private void assrtNotNull(String keyColumnType, String message) throws CacheLoaderException {
       if (keyColumnType == null || keyColumnType.trim().length() == 0) throw new CacheLoaderException(message);
    }
 
-   private void executeDdlStatement(String creatTableDdl) throws CacheLoaderException {
+   private void executeUpdateSql(String sql) throws CacheLoaderException {
       Statement statement = null;
       try {
          statement = connection.createStatement();
-         statement.executeUpdate(creatTableDdl);
+         statement.executeUpdate(sql);
       } catch (SQLException e) {
          log.error("Error while creating table",e);
          throw new CacheLoaderException(e);
@@ -118,9 +122,11 @@
 
    public void dropTable() throws CacheLoaderException {
       String dropTableDdl = "DROP TABLE " + config.getTableName();
+      String clearTable = "DELETE FROM " + config.getTableName();
+      executeUpdateSql(clearTable);
       if (log.isTraceEnabled())
          log.trace("Dropping table with following DDL '" + dropTableDdl + "\'");
-      executeDdlStatement(dropTableDdl);
+      executeUpdateSql(dropTableDdl);
    }
 
    private static String toLowerCase(String s) {

Modified: core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -22,13 +22,16 @@
 package org.horizon.lock;
 
 import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
 
 import java.util.List;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 /**
  * A simple implementation of lock striping, using cache entry keys to lock on, primarily used to help make {@link
- * org.horizon.loader.CacheLoaderOld} implemtations thread safe.
+ * org.horizon.loader.CacheLoader} implemtations thread safe.
  * <p/>
  * Backed by a set of {@link java.util.concurrent.locks.ReentrantReadWriteLock} instances, and using the key hashcodes
  * to determine buckets.
@@ -38,10 +41,14 @@
  * <p/>
  *
  * @author <a href="mailto:manik at jboss.org">Manik Surtani</a>
+ * @author Mircea.Markus at jboss.com
  * @since 1.0
  */
 @ThreadSafe
 public class StripedLock {
+
+   private static Log log = LogFactory.getLog(StripedLock.class);
+
    private static final int DEFAULT_CONCURRENCY = 20;
    private final int lockSegmentMask;
    private final int lockSegmentShift;
@@ -82,7 +89,6 @@
     */
    public void acquireLock(Object key, boolean exclusive) {
       ReentrantReadWriteLock lock = getLock(key);
-
       if (exclusive) {
          lock.writeLock().lock();
       } else {
@@ -90,6 +96,20 @@
       }
    }
 
+   public boolean acquireLock(String key, boolean exclusive, long millis) {
+      ReentrantReadWriteLock lock = getLock(key);
+      try {
+         if (exclusive) {
+            return lock.writeLock().tryLock(millis, TimeUnit.MILLISECONDS);
+         } else {
+            return lock.readLock().tryLock(millis, TimeUnit.MILLISECONDS);
+         }
+      } catch (InterruptedException e) {
+         log.warn("Thread insterrupted while trying to acquire lock", e);
+         return false;
+      }
+   }
+
    /**
     * Releases a lock the caller may be holding. This method is idempotent.
     */
@@ -154,4 +174,6 @@
       }
       return count;
    }
+
+
 }

Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -1,8 +1,15 @@
 package org.horizon.loader.jdbc;
 
 import org.horizon.loader.CacheStore;
+import org.horizon.marshall.ObjectStreamMarshaller;
+import org.testng.annotations.AfterMethod;
 import org.testng.annotations.Test;
 
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
 /**
  * // TODO: Mircea: Document this!
  *
@@ -15,19 +22,33 @@
 
    protected CacheStore createCacheStore() throws Exception {
       try {
+         Class.forName("com.mysql.jdbc.Driver").newInstance();
+         Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
+         Statement st = connection.createStatement();
+         try {
+            st.executeUpdate("DROP TABLE horizon_jdbc");
+         } catch (SQLException e) {
+            //ignore, might be the table does not exist
+         }
+         JdbcUtil.safeClose(st);
+         JdbcUtil.safeClose(connection);
+
          jdbcCacheStore = new JdbcCacheStore();
          JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
-         config.setConnectionFactoryClass(NonManagedConnectionFactory.class.getName());
+         config.setConnectionFactoryClass(PooledConnectionFactory.class.getName());
          config.setConnectionUrl("jdbc:mysql://localhost/horizon");
          config.setUserName("root");
          config.setPassword("root");
          config.setDriverClass("com.mysql.jdbc.Driver");
-         config.setTableName("horizon_jdc");
+         config.setTableName("horizon_jdbc");
          config.setKeyColumnName("key_name");
          config.setKeyColumnType("varchar(255)");
          config.setDataColumnName("BUCKET");
-         config.setDataColumnType("BINARY");
-         jdbcCacheStore.init(config, null, null);
+         config.setDataColumnType("BLOB");
+         config.setTimestampColumnName("TIMESTAMP");
+         config.setTimestampColumnType("BIGINT");
+         config.setPurgeSynchronously(true);
+         jdbcCacheStore.init(config, null, new ObjectStreamMarshaller());
          jdbcCacheStore.start();
          return jdbcCacheStore;
       } catch (Throwable e) {
@@ -35,4 +56,11 @@
          throw (Exception) e;
       }
    }
+
+   //todo  move this in upper class
+   @AfterMethod
+   public void assertNoLocksHeldAfterTest() {
+      assert jdbcCacheStore.getBucketLockCount() == 0;
+      assert jdbcCacheStore.getGlobalLockCount() == 0;
+   }
 }

Added: core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -0,0 +1,59 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.test.UnitTestDatabaseManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+ at Test(groups = "functional", testName = "loader.jdbc.PooledConnectionFactoryTest")
+public class PooledConnectionFactoryTest {
+
+   private PooledConnectionFactory factory;
+
+   @AfterMethod
+   public void destroyFacotry() {
+      factory.stop();
+   }
+
+   public void testValuesNoOverrides() throws Exception {
+      factory = new PooledConnectionFactory();
+      JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+      factory.start(config);
+      int hadcodedMaxPoolSize = factory.getPooledDataSource().getMaxPoolSize();
+      Set<Connection> connections = new HashSet<Connection>();
+      for (int i = 0; i < hadcodedMaxPoolSize; i++) {
+         connections.add(factory.getConnection());
+      }
+      assert connections.size() == hadcodedMaxPoolSize;
+      assert factory.getPooledDataSource().getNumBusyConnections() == hadcodedMaxPoolSize;
+      for (Connection conn : connections) {
+         conn.close();
+      }
+      long start = System.currentTimeMillis();
+      while (System.currentTimeMillis() - start < 2000) {
+         if (factory.getPooledDataSource().getNumBusyConnections() == 0) break;
+      }
+      //this must happen eventually 
+      assert  factory.getPooledDataSource().getNumBusyConnections() == 0;
+   }
+
+   public void testWithPorpertyOverrides() throws Exception {
+      String prevVal = System.setProperty("c3p0.maxPoolSize", "3");
+      System.out.println(new File(".").getAbsolutePath());
+      factory = new PooledConnectionFactory();
+      JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+      factory.start(config);
+      assert factory.getPooledDataSource().getMaxPoolSize() == 3 : "expected 3, received " + factory.getPooledDataSource().getMaxPoolSize();
+      if (prevVal != null) System.setProperty("c3p0.maxPoolSize", prevVal);
+
+   }
+}


Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java	2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java	2009-02-26 14:25:24 UTC (rev 7791)
@@ -8,6 +8,7 @@
 
 import java.sql.Connection;
 import java.sql.DriverManager;
+import java.sql.PreparedStatement;
 import java.sql.ResultSet;
 import java.sql.SQLException;
 import java.sql.Statement;
@@ -17,7 +18,7 @@
  *
  * @author
  */
- at Test(groups = "functional", testName ="loader.jdbc.TableManipulationTest", enabled = false)
+ at Test(groups = "functional", testName = "loader.jdbc.TableManipulationTest", enabled = false)
 public class TableManipulationTest {
 
    Connection connection;
@@ -30,6 +31,7 @@
       connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
       Statement st = connection.createStatement();
       try {
+         st.executeUpdate("DELETE FROM horizon_test");
          st.executeUpdate("DROP TABLE horizon_test");
       } catch (SQLException e) {
          //ignore, might be the table does not exist
@@ -41,6 +43,8 @@
       config.setTableName("horizon_test");
       config.setKeyColumnName("KEY_HASH");
       config.setDataColumnName("BUCKET");
+      config.setTimestampColumnName("TIMESTAMP");
+      config.setTimestampColumnType("BIGINT");
       tableManipulation = new TableManipulation(connection, config);
    }
 
@@ -56,6 +60,8 @@
       config.setTableName("horizon");
       config.setKeyColumnName("dsadsa");
       config.setDataColumnName("dsadsa");
+      config.setTimestampColumnName("timestamp");
+      config.setTimestampColumnType("BIGINT");
       Connection mockConnection = createMock(Connection.class);
       Statement mockStatement = createNiceMock(Statement.class);
       expect(mockConnection.createStatement()).andReturn(mockStatement);
@@ -102,6 +108,24 @@
          config.setDataColumnName("abc");
          assert true : "We do not expect a failure here";
       }
+
+      config.setTimestampColumnName(null);
+      try {
+         other.createTable();
+         assert false : "missing config param, exception expected";
+      } catch (CacheLoaderException e) {
+         config.setDataColumnName("timestamp");
+         assert true : "We do not expect a failure here";
+      }
+
+      config.setTimestampColumnType(null);
+      try {
+         other.createTable();
+         assert false : "missing config param, exception expected";
+      } catch (CacheLoaderException e) {
+         config.setDataColumnName("BIGINT");
+         assert true : "We do not expect a failure here";
+      }
    }
 
    public void testCreateTable() throws Exception {
@@ -117,8 +141,16 @@
    }
 
    @Test(dependsOnMethods = "testExists")
-   public void testDrop() throws CacheLoaderException {
+   public void testDrop() throws Exception {
       assert tableManipulation.tableExists();
+      PreparedStatement ps = null;
+      try {
+         ps = connection.prepareStatement("INSERT INTO horizon_test(KEY_HASH) values(?)");
+         ps.setString(1, System.currentTimeMillis() + "");
+         assert 1 == ps.executeUpdate();
+      } finally {
+         JdbcUtil.safeClose(ps);
+      }
       tableManipulation.dropTable();
       assert !tableManipulation.tableExists();
    }




More information about the jbosscache-commits mailing list