Author: mircea.markus
Date: 2009-02-26 09:25:24 -0500 (Thu, 26 Feb 2009)
New Revision: 7791
Added:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
Log:
ongoing FileCacheStoreWork
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java 2009-02-25
18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -73,6 +73,16 @@
return entries.values();
}
+ public long timestampOfFirstEntryToExpire() {
+ long result = Long.MAX_VALUE;
+ for (StoredEntry se : entries.values()) {
+ if (se.getExpiryTime() < result) {
+ result = se.getExpiryTime();
+ }
+ }
+ return result;
+ }
+
@Override
public String toString() {
return "Bucket{" +
@@ -80,4 +90,8 @@
", bucketName='" + bucketName + '\'' +
'}';
}
+
+ public boolean isEmpty() {
+ return entries.isEmpty();
+ }
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -2,18 +2,25 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.horizon.lock.StripedLock;
+import org.horizon.Cache;
+import org.horizon.util.concurrent.WithinThreadExecutor;
import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
-import org.horizon.loader.CacheLoaderException;
-import org.horizon.loader.CacheLoaderConfig;
-import org.horizon.Cache;
+import org.horizon.lock.StripedLock;
import org.horizon.marshall.Marshaller;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
/**
* //TODO comment this
@@ -28,6 +35,7 @@
private StripedLock bucketLocks;
private BucketBasedCacheStoreConfig config;
private long globalLockTimeoutMillis;
+ private ExecutorService purgerService;
/**
* This global lock guards against direct store access via clear() and the stream
APIs. These three methods would
@@ -43,8 +51,17 @@
public void start() throws CacheLoaderException {
bucketLocks = new StripedLock(config.getLockConcurrencyLevel());
globalLockTimeoutMillis = config.getLockAcquistionTimeout();
+ if (config.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
}
+ public void stop() throws CacheLoaderException {
+ purgerService.shutdownNow();
+ }
+
public StoredEntry load(Object key) throws CacheLoaderException {
if (log.isTraceEnabled()) log.trace("Loading entry " + key);
String keyHashCode = String.valueOf(key.hashCode());
@@ -61,8 +78,6 @@
} else {
return se;
}
- } catch (Exception e) {
- throw new CacheLoaderException("Problems loading key " + key, e);
} finally {
unlock(keyHashCode);
}
@@ -90,9 +105,6 @@
bucket.addEntry(ed);
insertBucket(bucket);
}
- }
- catch (Exception ex) {
- throw new CacheLoaderException("Problems storing entry with key " +
ed.getKey(), ex);
} finally {
unlock(keyHashCode);
}
@@ -112,14 +124,70 @@
if (success) saveBucket(bucket);
return success;
}
- } catch (Exception e) {
- throw new CacheLoaderException("Problems removing key " + key, e);
} finally {
unlock(keyHashCodeStr);
}
}
+ public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ ObjectInputStream ois = null;
+ try {
+ // first clear all local state
+ acquireGlobalLock(true);
+ clear();
+ ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream)
inputStream :
+ new ObjectInputStream(inputStream);
+ fromStreamInternal(ois);
+ }
+ catch (IOException e) {
+ throw new CacheLoaderException("Cannot convert to ObjectInputSream",
e);
+ } finally {
+ releaseGlobalLock(true);
+ // we should close the stream we created!
+ if (inputStream != ois) safeClose(ois);
+ }
+ }
+ public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ ObjectOutputStream oos = null;
+ try {
+ acquireGlobalLock(true);
+ try {
+ oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
+ new ObjectOutputStream(outputStream);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ toStreamInternal(oos);
+ } finally {
+ releaseGlobalLock(true);
+ // we should close the stream we created!
+ if (oos != outputStream) safeClose(oos);
+ }
+ }
+
+ public void clear() throws CacheLoaderException {
+ log.trace("Clearing store");
+ try {
+ acquireGlobalLock(true);
+ clearInternal();
+ } finally {
+ releaseGlobalLock(true);
+ }
+ }
+
+ public void purgeExpired() {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ purgeInternal();
+ } catch (CacheLoaderException e) {
+ log.info("Problems encountered while purging expired", e);
+ }
+ }
+ });
+ }
+
protected void unlock(String keyHashCode) {
bucketLocks.releaseLock(keyHashCode);
globalLock.readLock().unlock();
@@ -135,6 +203,18 @@
bucketLocks.acquireLock(keyHashCode, true);
}
+ protected boolean immediateLockForWritting(String keyHashCode) throws
CacheLoaderException {
+ try {
+ if (!globalLock.readLock().tryLock(0, TimeUnit.MILLISECONDS)) {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ log.warn("Received interrupt signal while waiting for global lock
aquisition");
+ throw new CacheLoaderException(e);
+ }
+ return bucketLocks.acquireLock(keyHashCode, true, 0);
+ }
+
protected void lockForReading(String keyHashCode) throws CacheLoaderException {
try {
globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
@@ -145,10 +225,14 @@
bucketLocks.acquireLock(keyHashCode, false);
}
- protected void acquireGlobalLock(boolean exclusive) throws TimeoutException,
InterruptedException {
+ protected void acquireGlobalLock(boolean exclusive) throws CacheLoaderException {
Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
- if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
- throw new TimeoutException("Timed out trying to acquire " + (exclusive
? "exclusive" : "shared") + " global lock after " +
globalLockTimeoutMillis + " millis. Lock is " + l);
+ try {
+ if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
+ throw new CacheLoaderException("Timed out trying to acquire " +
(exclusive ? "exclusive" : "shared") + " global lock after "
+ globalLockTimeoutMillis + " millis. Lock is " + l);
+ } catch (InterruptedException e) {
+ throw new CacheLoaderException(e);
+ }
}
protected void releaseGlobalLock(boolean exclusive) {
@@ -172,4 +256,12 @@
protected abstract void saveBucket(Bucket bucket) throws CacheLoaderException;
protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
+
+ protected abstract void toStreamInternal(ObjectOutputStream oos) throws
CacheLoaderException;
+
+ protected abstract void fromStreamInternal(ObjectInputStream ois) throws
CacheLoaderException;
+
+ protected abstract void clearInternal() throws CacheLoaderException;
+
+ protected abstract void purgeInternal() throws CacheLoaderException;
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -11,6 +11,7 @@
private int lockConcurrencyLevel = 2048;
private long lockAcquistionTimeout = 60000;
+ private boolean purgeSynchronously = false;
public int getLockConcurrencyLevel() {
return lockConcurrencyLevel;
@@ -29,4 +30,13 @@
testImmutability("lockAcquistionTimeout");
this.lockAcquistionTimeout = lockAcquistionTimeout;
}
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ testImmutability("purgeSynchronously");
+ this.purgeSynchronously = purgeSynchronously;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -10,13 +10,17 @@
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
-import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}. This
file store stores stuff in the
@@ -42,8 +46,8 @@
public class FileCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(FileCacheStore.class);
private int streamBufferSize;
- ExecutorService purgerService;
+
FileCacheStoreConfig cfg;
Cache cache;
Marshaller m;
@@ -77,14 +81,8 @@
return result;
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- ObjectInputStream ois = null;
+ protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException
{
try {
- // first clear all local state
- acquireGlobalLock(true);
- clear();
- ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream)
inputStream :
- new ObjectInputStream(inputStream);
int numFiles = ois.readInt();
byte[] buffer = new byte[streamBufferSize];
int bytesRead, totalBytesRead = 0;
@@ -101,30 +99,20 @@
bos.write(buffer, 0, bytesRead);
}
bos.flush();
- bos.close();
+ safeClose(bos);
fos.flush();
- fos.close();
+ safeClose(fos);
totalBytesRead = 0;
}
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O error", e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException("Unexpected expcetion", e);
}
- catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ?
(CacheLoaderException) e :
- new CacheLoaderException("Problems reading from stream", e);
- throw cle;
- }
- finally {
- releaseGlobalLock(true);
- // we should close the stream we created!
- if (inputStream != ois) safeClose(ois);
- }
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- ObjectOutputStream oos = null;
+ protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
try {
- acquireGlobalLock(true);
- oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream)
outputStream :
- new ObjectOutputStream(outputStream);
File[] files = root.listFiles();
oos.writeInt(files.length);
byte[] buffer = new byte[streamBufferSize];
@@ -145,39 +133,19 @@
bis.close();
fileInStream.close();
}
- } catch (Exception ioe) {
- throw new CacheLoaderException("Problems handling stream", ioe);
- } finally {
- releaseGlobalLock(true);
- // we should close the stream we created!
- if (oos != outputStream) safeClose(oos);
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O expcetion while generating
stream", e);
}
}
- public void clear() throws CacheLoaderException {
- log.trace("Clearing store");
- try {
- acquireGlobalLock(true);
- for (File f : root.listFiles()) {
- if (!f.delete()) log.warn("Had problems removing file {0}", f);
- }
- } catch (Exception e) {
- throw new CacheLoaderException("Problems clearing cache store", e);
- } finally {
- releaseGlobalLock(true);
+ protected void clearInternal() throws CacheLoaderException {
+ for (File f : root.listFiles()) {
+ if (!f.delete()) log.warn("Had problems removing file {0}", f);
}
}
- public void purgeExpired() {
- purgerService.execute(new Runnable() {
- public void run() {
- try {
- loadAll();
- } catch (CacheLoaderException e) {
- log.info("Problems encountered while purging expired", e);
- }
- }
- });
+ protected void purgeInternal() throws CacheLoaderException {
+ loadAll();
}
protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
@@ -252,18 +220,9 @@
if (!root.mkdirs())
throw new ConfigurationException("Directory " +
root.getAbsolutePath() + " does not exist and cannot be created!");
}
- if (cfg.isPurgeSynchronously()) {
- purgerService = new WithinThreadExecutor();
- } else {
- purgerService = Executors.newSingleThreadExecutor();
- }
streamBufferSize = cfg.getStreamBufferSize();
}
- public void stop() {
- purgerService.shutdownNow();
- }
-
public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
return loadBucket(key.hashCode() + "");
}
Modified:
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -22,7 +22,6 @@
*/
public class FileCacheStoreConfig extends BucketBasedCacheStoreConfig {
String location = "Horizon-FileCacheStore";
- private boolean purgeSynchronously = false;
private int streamBufferSize = 8192;
public FileCacheStoreConfig() {
@@ -38,15 +37,7 @@
this.location = location;
}
- public boolean isPurgeSynchronously() {
- return purgeSynchronously;
- }
- public void setPurgeSynchronously(boolean purgeSynchronously) {
- testImmutability("purgeSynchronously");
- this.purgeSynchronously = purgeSynchronously;
- }
-
public int getStreamBufferSize() {
return streamBufferSize;
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -13,13 +13,16 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
-import java.util.HashSet;
/**
* // TODO: Manik: Document this!
@@ -29,6 +32,7 @@
public class JdbcCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(JdbcCacheStore.class);
+ public final static String STREAM_DELIMITER = "__jdbcCacheLoader_done__";
private JdbcCacheStoreConfig config;
private ConnectionFactory connectionFactory;
@@ -87,6 +91,7 @@
ps.setString(1, bucket.getBucketName());
ByteBuffer byteBuffer = marshall(bucket);
ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
int insertedRows = ps.executeUpdate();
if (insertedRows != 1) {
throw new CacheLoaderException("Unexpected insert result: '" +
insertedRows + "'. Expected values is 1");
@@ -111,12 +116,12 @@
ps = conn.prepareStatement(sql);
ByteBuffer buffer = marshall(bucket);
ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
- ps.setString(2, bucket.getBucketName());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, bucket.getBucketName());
int updatedRows = ps.executeUpdate();
if (updatedRows != 1) {
throw new CacheLoaderException("Unexpected update result: '" +
updatedRows + "'. Expected values is 1");
}
-
} catch (SQLException e) {
logAndThrow(e, "sql failure while updating bucket: " + bucket);
} finally {
@@ -155,7 +160,6 @@
}
}
-
public Set<StoredEntry> loadAll() throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
@@ -169,6 +173,12 @@
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
Set<StoredEntry> result = new HashSet<StoredEntry>();
+ while (rs.next()) {
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(binaryStream);
+ result.addAll(bucket.getStoredEntries());
+ }
+ return result;
} catch (SQLException e) {
String message = "sql failure while loading key: ";
log.error(message, e);
@@ -178,25 +188,217 @@
JdbcUtil.safeClose(ps);
releaseConnection(conn);
}
- return null;
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo
implement!!!
+ protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException
{
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = getConnection();
+ String sql = config.getInsertBucketSql();
+ ps = conn.prepareStatement(sql);
+
+ int readBuckets = 0;
+ int batchSize = 100;
+ String bucketName = (String) ois.readObject();
+ while (!bucketName.equals(STREAM_DELIMITER)) {
+ Bucket bucket = (Bucket) ois.readObject();
+ readBuckets++;
+ ps.setString(1, bucketName);
+ ByteBuffer buffer = marshall(bucket);
+ ps.setBinaryStream(2, buffer.getStream(), buffer.getLength());
+ ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
+ if (readBuckets % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Executing batch " + (readBuckets / batchSize) +
", batch size is " + batchSize);
+ } else {
+ ps.addBatch();
+ }
+ bucketName = (String) ois.readObject();
+ }
+ if (readBuckets % batchSize != 0)
+ ps.executeBatch();//flush the batch
+ if (log.isTraceEnabled())
+ log.trace("Successfully inserted " + readBuckets + " buckets
into the database, batch size is " + batchSize);
+ } catch (IOException ex) {
+ logAndThrow(ex, "I/O failure while integrating state into store");
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL failure while integrating state into store");
+ } catch (ClassNotFoundException e) {
+ logAndThrow(e, "Unexpected failure while integrating state into
store");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo
implement!!!
+ protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ conn = getConnection();
+ String sql = config.getLoadAllSql();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(100);
+ while (rs.next()) {
+ InputStream inputStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(inputStream);
+ String bucketName = rs.getString(2);
+ oos.writeObject(bucketName);
+ oos.writeObject(bucket);
+ }
+ oos.writeObject(STREAM_DELIMITER);
+ } catch (SQLException ex) {
+ logAndThrow(ex, "SQL failure while writing store's content to
stream");
+ }
+ catch (IOException e) {
+ logAndThrow(e, "IO failure while writing store's content to
stream");
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void clear() throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo
implement!!!
+ protected void clearInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = config.getClearSql();
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully removed " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void purgeExpired() throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo
implement!!!
+ protected void purgeInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ Set<Bucket> expiredBuckets = new HashSet<Bucket>();
+ final int batchSize = 100;
+ try {
+ String sql = config.getSelectExpiredBucketsSql();
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setLong(1, System.currentTimeMillis());
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ String key = rs.getString(2);
+ if (immediateLockForWritting(key)) {
+ if (log.isTraceEnabled()) log.trace("Adding bucket keyed " + key
+ " for purging.");
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(binaryStream);
+ bucket.setBucketName(key);
+ expiredBuckets.add(bucket);
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Could not acquire write lock for " + key + ",
this won't be purged even though it has expired elements");
+ }
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(expiredBuckets);
+ releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ JdbcUtil.safeClose(rs);
+ }
+
+ if (log.isTraceEnabled())
+ log.trace("Found following buckets: " + expiredBuckets + " which
are about to be expired");
+
+ if (expiredBuckets.isEmpty()) return;
+ Set<Bucket> emptyBuckets = new HashSet<Bucket>();
+ //now update all the buckets in batch
+ try {
+ String sql = config.getSaveBucketSql();
+ ps = conn.prepareStatement(sql);
+ int updateCount = 0;
+ Iterator<Bucket> it = expiredBuckets.iterator();
+ while (it.hasNext()) {
+ Bucket bucket = it.next();
+ bucket.removeExpiredEntries();
+ if (!bucket.isEmpty()) {
+ ByteBuffer byteBuffer = marshall(bucket);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.addBatch();
+ updateCount++;
+ if (updateCount % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled()) log.trace("Flushing batch, update count
is: " + updateCount);
+ }
+ } else {
+ it.remove();
+ emptyBuckets.add(bucket);
+ }
+ }
+ //flush the batch
+ if (updateCount % batchSize != 0) {
+ ps.executeBatch();
+ }
+ if (log.isTraceEnabled()) log.trace("Updated " + updateCount + "
buckets.");
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(emptyBuckets);
+ releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ //release locks for the updated buckets.This won't include empty buckets, as
these were migrated to emptyBuckets
+ releaseLocks(expiredBuckets);
+ JdbcUtil.safeClose(ps);
+ }
+
+
+ if (log.isTraceEnabled()) log.trace("About to remove empty buckets " +
emptyBuckets);
+
+ if (emptyBuckets.isEmpty()) return;
+ //then remove the empty buckets
+ try {
+ String sql = config.getDeleteBucketSql();
+ ps = conn.prepareStatement(sql);
+ int deletionCount = 0;
+ for (Bucket bucket : emptyBuckets) {
+ ps.setString(1, bucket.getBucketName());
+ ps.addBatch();
+ deletionCount++;
+ if (deletionCount % batchSize == 0) {
+ if (log.isTraceEnabled()) log.trace("Flushing deletion batch, total
deletion count so far is " + deletionCount);
+ ps.executeBatch();
+ }
+ }
+ if (deletionCount % batchSize != 0) {
+ int[] batchResult = ps.executeBatch();
+ if (log.isTraceEnabled()) log.trace("Flushed the batch and received
following results: " + Arrays.toString(batchResult));
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ releaseLocks(emptyBuckets);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
+ private void releaseLocks(Set<Bucket> expiredBucketKeys) throws
CacheLoaderException {
+ for (Bucket bucket : expiredBucketKeys) {
+ unlock(bucket.getBucketName());
+ }
+ }
+
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return JdbcCacheStoreConfig.class;
}
@@ -206,7 +408,8 @@
}
private void releaseConnection(Connection conn) {
- connectionFactory.releaseConnection(conn);
+ if (conn != null)//connection might be null as we only release it in finally
blocks
+ connectionFactory.releaseConnection(conn);
}
private ByteBuffer marshall(Bucket bucket) throws CacheLoaderException {
Modified:
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -17,7 +17,7 @@
private String connectionFactoryClass;
- /* required by NonManagedConnectionFactory */
+ /* required by SimpleConnectionFactory */
private String connectionUrl;
private String userName;
private String password;
@@ -30,10 +30,17 @@
private String keyColumnType;
private String dataColumnName;
private String dataColumnType;
+ private String timestampColumnName;
+ private String timestampColumnType;
+
+ /* cache for sql commands */
private String insertBucketSql;
private String saveBucketSql;
private String loadBucketSql;
private String loadAllSql;
+ private String clearSql;
+ private String selectExpiredBucketsSql;
+ private String deleteBucketSql;
public JdbcCacheStoreConfig() {
className = JdbcCacheStore.class.getName();
@@ -144,23 +151,38 @@
this.dataColumnType = dataColumnType;
}
+ public String getTimestampColumnName() {
+ return timestampColumnName;
+ }
+
+ public void setTimestampColumnName(String timestampColumnName) {
+ this.timestampColumnName = timestampColumnName;
+ }
+
+ public String getTimestampColumnType() {
+ return timestampColumnType;
+ }
+
+ public void setTimestampColumnType(String timestampColumnType) {
+ this.timestampColumnType = timestampColumnType;
+ }
+
@Override
public JdbcCacheStoreConfig clone() {
- JdbcCacheStoreConfig dolly = (JdbcCacheStoreConfig) super.clone();
//don't have to assign any variables as all are primitives, and cannot change
- return dolly;
+ return (JdbcCacheStoreConfig) super.clone();
}
public String getInsertBucketSql() {
if (insertBucketSql == null) {
- insertBucketSql = "INSERT INTO " + tableName + " (" +
keyColumnName + ", " + dataColumnName + ")";
+ insertBucketSql = "INSERT INTO " + tableName + " (" +
keyColumnName + ", " + dataColumnName + ", " + timestampColumnName +
") VALUES(?,?,?)";
}
return insertBucketSql;
}
public String getSaveBucketSql() {
if (saveBucketSql == null) {
- saveBucketSql = "UPDATE " + tableName + " SET " +
dataColumnName + " = ? WHERE " + keyColumnName + " = ?";
+ saveBucketSql = "UPDATE " + tableName + " SET " +
dataColumnName + " = ? , " + timestampColumnName + "=? WHERE " +
keyColumnName + " = ?";
}
return saveBucketSql;
}
@@ -172,10 +194,31 @@
return loadBucketSql;
}
+ public String getDeleteBucketSql() {
+ if (deleteBucketSql == null) {
+ deleteBucketSql = "DELETE FROM " + tableName + " WHERE " +
keyColumnName + " = ?";
+ }
+ return deleteBucketSql;
+ }
+
public String getLoadAllSql() {
if (loadAllSql == null) {
- loadAllSql = "SELECT " + dataColumnName + " FROM " +
tableName;
+ loadAllSql = "SELECT " + dataColumnName + "," +
keyColumnName + " FROM " + tableName;
}
return loadAllSql;
}
+
+ public String getClearSql() {
+ if (clearSql == null) {
+ clearSql = "DELETE FROM " + tableName;
+ }
+ return clearSql;
+ }
+
+ public String getSelectExpiredBucketsSql() {
+ if (selectExpiredBucketsSql == null) {
+ selectExpiredBucketsSql = getLoadAllSql() + " WHERE " +
timestampColumnName + "< ?";
+ }
+ return selectExpiredBucketsSql;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-02-25
18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -1,8 +1,9 @@
package org.horizon.loader.jdbc;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.SQLException;
-import java.sql.ResultSet;
/**
* // TODO: Mircea: Document this!
@@ -20,6 +21,16 @@
}
}
+ public static void safeClose(Connection connection) {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
public static void safeClose(ResultSet rs) {
if (rs != null) {
try {
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -84,11 +84,12 @@
// removed CONSTRAINT clause as this causes problems with some databases, like
Informix.
assertMandatoryElemenetsPresent();
String creatTableDdl = "CREATE TABLE " + config.getTableName() +
"(" + config.getKeyColumnName() + " " + config.getKeyColumnType()
- + " NOT NULL, " + config.getDataColumnName() + " " +
config.getDataColumnType() +
+ + " NOT NULL, " + config.getDataColumnName() + " " +
config.getDataColumnType() + ", "
+ + config.getTimestampColumnName() + " " +
config.getTimestampColumnType() +
", PRIMARY KEY (" + config.getKeyColumnName() + "))";
if (log.isTraceEnabled())
log.trace("Creating table with following DDL: '" + creatTableDdl +
"'.");
- executeDdlStatement(creatTableDdl);
+ executeUpdateSql(creatTableDdl);
}
private void assertMandatoryElemenetsPresent() throws CacheLoaderException {
@@ -97,17 +98,20 @@
assrtNotNull(config.getTableName(), "tableName needed in order to create
table");
assrtNotNull(config.getDataColumnName(), "dataColumnName needed in order to
create table");
assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to
create table");
+ assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to
create table");
+ assrtNotNull(config.getTimestampColumnName(), "timestampColumnName needed in
order to create table");
+ assrtNotNull(config.getTimestampColumnType(), "timestampColumnType needed in
order to create table");
}
private void assrtNotNull(String keyColumnType, String message) throws
CacheLoaderException {
if (keyColumnType == null || keyColumnType.trim().length() == 0) throw new
CacheLoaderException(message);
}
- private void executeDdlStatement(String creatTableDdl) throws CacheLoaderException {
+ private void executeUpdateSql(String sql) throws CacheLoaderException {
Statement statement = null;
try {
statement = connection.createStatement();
- statement.executeUpdate(creatTableDdl);
+ statement.executeUpdate(sql);
} catch (SQLException e) {
log.error("Error while creating table",e);
throw new CacheLoaderException(e);
@@ -118,9 +122,11 @@
public void dropTable() throws CacheLoaderException {
String dropTableDdl = "DROP TABLE " + config.getTableName();
+ String clearTable = "DELETE FROM " + config.getTableName();
+ executeUpdateSql(clearTable);
if (log.isTraceEnabled())
log.trace("Dropping table with following DDL '" + dropTableDdl +
"\'");
- executeDdlStatement(dropTableDdl);
+ executeUpdateSql(dropTableDdl);
}
private static String toLowerCase(String s) {
Modified: core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-25 18:30:30
UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-26 14:25:24
UTC (rev 7791)
@@ -22,13 +22,16 @@
package org.horizon.lock;
import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A simple implementation of lock striping, using cache entry keys to lock on, primarily
used to help make {@link
- * org.horizon.loader.CacheLoaderOld} implemtations thread safe.
+ * org.horizon.loader.CacheLoader} implemtations thread safe.
* <p/>
* Backed by a set of {@link java.util.concurrent.locks.ReentrantReadWriteLock}
instances, and using the key hashcodes
* to determine buckets.
@@ -38,10 +41,14 @@
* <p/>
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
@ThreadSafe
public class StripedLock {
+
+ private static Log log = LogFactory.getLog(StripedLock.class);
+
private static final int DEFAULT_CONCURRENCY = 20;
private final int lockSegmentMask;
private final int lockSegmentShift;
@@ -82,7 +89,6 @@
*/
public void acquireLock(Object key, boolean exclusive) {
ReentrantReadWriteLock lock = getLock(key);
-
if (exclusive) {
lock.writeLock().lock();
} else {
@@ -90,6 +96,20 @@
}
}
+ public boolean acquireLock(String key, boolean exclusive, long millis) {
+ ReentrantReadWriteLock lock = getLock(key);
+ try {
+ if (exclusive) {
+ return lock.writeLock().tryLock(millis, TimeUnit.MILLISECONDS);
+ } else {
+ return lock.readLock().tryLock(millis, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ log.warn("Thread insterrupted while trying to acquire lock", e);
+ return false;
+ }
+ }
+
/**
* Releases a lock the caller may be holding. This method is idempotent.
*/
@@ -154,4 +174,6 @@
}
return count;
}
+
+
}
Modified:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -1,8 +1,15 @@
package org.horizon.loader.jdbc;
import org.horizon.loader.CacheStore;
+import org.horizon.marshall.ObjectStreamMarshaller;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
/**
* // TODO: Mircea: Document this!
*
@@ -15,19 +22,33 @@
protected CacheStore createCacheStore() throws Exception {
try {
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ Connection connection =
DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root",
"root");
+ Statement st = connection.createStatement();
+ try {
+ st.executeUpdate("DROP TABLE horizon_jdbc");
+ } catch (SQLException e) {
+ //ignore, might be the table does not exist
+ }
+ JdbcUtil.safeClose(st);
+ JdbcUtil.safeClose(connection);
+
jdbcCacheStore = new JdbcCacheStore();
JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
- config.setConnectionFactoryClass(NonManagedConnectionFactory.class.getName());
+ config.setConnectionFactoryClass(PooledConnectionFactory.class.getName());
config.setConnectionUrl("jdbc:mysql://localhost/horizon");
config.setUserName("root");
config.setPassword("root");
config.setDriverClass("com.mysql.jdbc.Driver");
- config.setTableName("horizon_jdc");
+ config.setTableName("horizon_jdbc");
config.setKeyColumnName("key_name");
config.setKeyColumnType("varchar(255)");
config.setDataColumnName("BUCKET");
- config.setDataColumnType("BINARY");
- jdbcCacheStore.init(config, null, null);
+ config.setDataColumnType("BLOB");
+ config.setTimestampColumnName("TIMESTAMP");
+ config.setTimestampColumnType("BIGINT");
+ config.setPurgeSynchronously(true);
+ jdbcCacheStore.init(config, null, new ObjectStreamMarshaller());
jdbcCacheStore.start();
return jdbcCacheStore;
} catch (Throwable e) {
@@ -35,4 +56,11 @@
throw (Exception) e;
}
}
+
+ //todo move this in upper class
+ @AfterMethod
+ public void assertNoLocksHeldAfterTest() {
+ assert jdbcCacheStore.getBucketLockCount() == 0;
+ assert jdbcCacheStore.getGlobalLockCount() == 0;
+ }
}
Added:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -0,0 +1,59 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.test.UnitTestDatabaseManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+@Test(groups = "functional", testName =
"loader.jdbc.PooledConnectionFactoryTest")
+public class PooledConnectionFactoryTest {
+
+ private PooledConnectionFactory factory;
+
+ @AfterMethod
+ public void destroyFacotry() {
+ factory.stop();
+ }
+
+ public void testValuesNoOverrides() throws Exception {
+ factory = new PooledConnectionFactory();
+ JdbcCacheStoreConfig config =
UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+ factory.start(config);
+ int hadcodedMaxPoolSize = factory.getPooledDataSource().getMaxPoolSize();
+ Set<Connection> connections = new HashSet<Connection>();
+ for (int i = 0; i < hadcodedMaxPoolSize; i++) {
+ connections.add(factory.getConnection());
+ }
+ assert connections.size() == hadcodedMaxPoolSize;
+ assert factory.getPooledDataSource().getNumBusyConnections() ==
hadcodedMaxPoolSize;
+ for (Connection conn : connections) {
+ conn.close();
+ }
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000) {
+ if (factory.getPooledDataSource().getNumBusyConnections() == 0) break;
+ }
+ //this must happen eventually
+ assert factory.getPooledDataSource().getNumBusyConnections() == 0;
+ }
+
+ public void testWithPorpertyOverrides() throws Exception {
+ String prevVal = System.setProperty("c3p0.maxPoolSize", "3");
+ System.out.println(new File(".").getAbsolutePath());
+ factory = new PooledConnectionFactory();
+ JdbcCacheStoreConfig config =
UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+ factory.start(config);
+ assert factory.getPooledDataSource().getMaxPoolSize() == 3 : "expected 3,
received " + factory.getPooledDataSource().getMaxPoolSize();
+ if (prevVal != null) System.setProperty("c3p0.maxPoolSize", prevVal);
+
+ }
+}
Property changes on:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-02-25
18:30:30 UTC (rev 7790)
+++
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-02-26
14:25:24 UTC (rev 7791)
@@ -8,6 +8,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -17,7 +18,7 @@
*
* @author
*/
-@Test(groups = "functional", testName
="loader.jdbc.TableManipulationTest", enabled = false)
+@Test(groups = "functional", testName =
"loader.jdbc.TableManipulationTest", enabled = false)
public class TableManipulationTest {
Connection connection;
@@ -30,6 +31,7 @@
connection =
DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root",
"root");
Statement st = connection.createStatement();
try {
+ st.executeUpdate("DELETE FROM horizon_test");
st.executeUpdate("DROP TABLE horizon_test");
} catch (SQLException e) {
//ignore, might be the table does not exist
@@ -41,6 +43,8 @@
config.setTableName("horizon_test");
config.setKeyColumnName("KEY_HASH");
config.setDataColumnName("BUCKET");
+ config.setTimestampColumnName("TIMESTAMP");
+ config.setTimestampColumnType("BIGINT");
tableManipulation = new TableManipulation(connection, config);
}
@@ -56,6 +60,8 @@
config.setTableName("horizon");
config.setKeyColumnName("dsadsa");
config.setDataColumnName("dsadsa");
+ config.setTimestampColumnName("timestamp");
+ config.setTimestampColumnType("BIGINT");
Connection mockConnection = createMock(Connection.class);
Statement mockStatement = createNiceMock(Statement.class);
expect(mockConnection.createStatement()).andReturn(mockStatement);
@@ -102,6 +108,24 @@
config.setDataColumnName("abc");
assert true : "We do not expect a failure here";
}
+
+ config.setTimestampColumnName(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setDataColumnName("timestamp");
+ assert true : "We do not expect a failure here";
+ }
+
+ config.setTimestampColumnType(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setDataColumnName("BIGINT");
+ assert true : "We do not expect a failure here";
+ }
}
public void testCreateTable() throws Exception {
@@ -117,8 +141,16 @@
}
@Test(dependsOnMethods = "testExists")
- public void testDrop() throws CacheLoaderException {
+ public void testDrop() throws Exception {
assert tableManipulation.tableExists();
+ PreparedStatement ps = null;
+ try {
+ ps = connection.prepareStatement("INSERT INTO horizon_test(KEY_HASH)
values(?)");
+ ps.setString(1, System.currentTimeMillis() + "");
+ assert 1 == ps.executeUpdate();
+ } finally {
+ JdbcUtil.safeClose(ps);
+ }
tableManipulation.dropTable();
assert !tableManipulation.tableExists();
}