JBoss Cache SVN: r7795 - demos/core-demo-gui/tags/1.2.GA.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-26 09:53:37 -0500 (Thu, 26 Feb 2009)
New Revision: 7795
Modified:
demos/core-demo-gui/tags/1.2.GA/pom.xml
Log:
Modified: demos/core-demo-gui/tags/1.2.GA/pom.xml
===================================================================
--- demos/core-demo-gui/tags/1.2.GA/pom.xml 2009-02-26 14:52:32 UTC (rev 7794)
+++ demos/core-demo-gui/tags/1.2.GA/pom.xml 2009-02-26 14:53:37 UTC (rev 7795)
@@ -10,7 +10,7 @@
</parent>
<groupId>org.jboss.cache</groupId>
<artifactId>jbosscache-demo</artifactId>
- <version>1.2-SNAPSHOT</version>
+ <version>1.2.GA</version>
<name>JBoss Cache - Core Edition GUI Demo</name>
<description>JBoss Cache - Core Edition GUI Demo</description>
<packaging>jar</packaging>
15 years, 10 months
JBoss Cache SVN: r7794 - demos/core-demo-gui/tags.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-26 09:52:32 -0500 (Thu, 26 Feb 2009)
New Revision: 7794
Added:
demos/core-demo-gui/tags/1.2.GA/
Log:
Copied: demos/core-demo-gui/tags/1.2.GA (from rev 7793, demos/core-demo-gui/trunk)
15 years, 10 months
JBoss Cache SVN: r7793 - demos/core-demo-gui/trunk.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-26 09:47:44 -0500 (Thu, 26 Feb 2009)
New Revision: 7793
Modified:
demos/core-demo-gui/trunk/pom.xml
Log:
Modified: demos/core-demo-gui/trunk/pom.xml
===================================================================
--- demos/core-demo-gui/trunk/pom.xml 2009-02-26 14:26:18 UTC (rev 7792)
+++ demos/core-demo-gui/trunk/pom.xml 2009-02-26 14:47:44 UTC (rev 7793)
@@ -18,7 +18,7 @@
<dependency>
<groupId>org.jboss.cache</groupId>
<artifactId>jbosscache-core</artifactId>
- <version>3.0.0.CR3</version>
+ <version>3.0.3.GA</version>
</dependency>
<dependency>
<groupId>jgoodies</groupId>
15 years, 10 months
JBoss Cache SVN: r7792 - core/branches/flat/src/main/java/org/horizon/loader/jdbc.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2009-02-26 09:26:18 -0500 (Thu, 26 Feb 2009)
New Revision: 7792
Added:
core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java
Log:
ongoing FileCacheStoreWork
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java 2009-02-26 14:26:18 UTC (rev 7792)
@@ -0,0 +1,101 @@
+package org.horizon.loader.jdbc;
+
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+import com.mchange.v2.c3p0.DataSources;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+
+import java.beans.PropertyVetoException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Pooled connection factory based on C3P0. For a complete configuration reference, look <a
+ * href="http://www.mchange.com/projects/c3p0/index.html#configuration">here</a>. The connection pool can be configured
+ * in various ways, as described <a href="http://www.mchange.com/projects/c3p0/index.html#configuration_files">here</a>.
+ * The simplest way is by having an <tt>c3p0.properties</tt> file in the classpath. If no such file is found, default,
+ * hardcoded valus will be used.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class PooledConnectionFactory extends ConnectionFactory {
+
+ private static Log log = LogFactory.getLog(PooledConnectionFactory.class);
+ private ComboPooledDataSource pooledDataSource;
+
+ @Override
+ public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
+ logFileOverride();
+ pooledDataSource = new ComboPooledDataSource();
+ pooledDataSource.setProperties(new Properties());
+ try {
+ pooledDataSource.setDriverClass(config.getDriverClass()); //loads the jdbc driver
+ } catch (PropertyVetoException e) {
+ String message = "Error while instatianting JDBC driver: '" + config.getDriverClass();
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ pooledDataSource.setJdbcUrl(config.getConnectionUrl());
+ pooledDataSource.setUser(config.getUserName());
+ pooledDataSource.setPassword(config.getPassword());
+ }
+
+ private void logFileOverride() {
+ URL propsUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0.properties");
+ URL xmlUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0-config.xml");
+ if (log.isInfoEnabled() && propsUrl != null) {
+ log.info("Found 'c3p0.properties' in classpath: " + propsUrl);
+ }
+ if (log.isInfoEnabled() && xmlUrl != null) {
+ log.info("Found 'c3p0-config.xml' in classpath: " + xmlUrl);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ DataSources.destroy(pooledDataSource);
+ if (log.isTraceEnabled()) {
+ log.debug("Sucessfully stopped PooledConnectionFactory.");
+ }
+ }
+ catch (SQLException sqle) {
+ log.warn("Could not destroy C3P0 connection pool: " + pooledDataSource, sqle);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws CacheLoaderException {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("DataSource before checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
+ log.trace("DataSource before checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
+ }
+ Connection connection = pooledDataSource.getConnection();
+ if (log.isTraceEnabled()) {
+ log.trace("DataSource after checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
+ log.trace("DataSource after checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
+ log.trace("Connection checked out: " + connection);
+ }
+ return connection;
+ } catch (SQLException e) {
+ throw new CacheLoaderException("Failed obtaining connection from PooledDataSource", e);
+ }
+ }
+
+ @Override
+ public void releaseConnection(Connection conn) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ log.warn("Issues closing connection", e);
+ }
+ }
+
+ ComboPooledDataSource getPooledDataSource() {
+ return pooledDataSource;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java 2009-02-26 14:26:18 UTC (rev 7792)
@@ -0,0 +1,79 @@
+package org.horizon.loader.jdbc;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+public class SimpleConnectionFactory extends ConnectionFactory {
+
+ private static Log log = LogFactory.getLog(SimpleConnectionFactory.class);
+
+ private String connectionUrl;
+ private String userName;
+ private String password;
+
+ public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
+ loadDriver(config.getDriverClass());
+ this.connectionUrl = config.getConnectionUrl();
+ this.userName = config.getUserName();
+ this.password = config.getPassword();
+ }
+
+ public void stop() {
+ //do nothing
+ }
+
+ public Connection getConnection() throws CacheLoaderException {
+ try {
+ Connection connection = DriverManager.getConnection(connectionUrl, userName, password);
+ if (connection == null)
+ throw new CacheLoaderException("Received null connection from the DriverManager!");
+ return connection;
+ } catch (SQLException e) {
+ throw new CacheLoaderException("Could not obtain a new connection", e);
+ }
+ }
+
+ public void releaseConnection(Connection conn) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ log.warn("Failure while closing the connection to the database ", e);
+ }
+ }
+
+ private void loadDriver(String driverClass) throws CacheLoaderException {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Attempting to load driver " + driverClass);
+ }
+ Class.forName(driverClass).newInstance();
+ }
+ catch (Throwable th) {
+ String message = "Failed loading driver with class: '" + driverClass + "'";
+ log.error(message, th);
+ throw new CacheLoaderException(message, th);
+ }
+ }
+
+ public String getConnectionUrl() {
+ return connectionUrl;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
15 years, 10 months
JBoss Cache SVN: r7791 - in core/branches/flat/src: main/java/org/horizon/loader/file and 3 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2009-02-26 09:25:24 -0500 (Thu, 26 Feb 2009)
New Revision: 7791
Added:
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
Log:
ongoing FileCacheStoreWork
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/Bucket.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -73,6 +73,16 @@
return entries.values();
}
+ public long timestampOfFirstEntryToExpire() {
+ long result = Long.MAX_VALUE;
+ for (StoredEntry se : entries.values()) {
+ if (se.getExpiryTime() < result) {
+ result = se.getExpiryTime();
+ }
+ }
+ return result;
+ }
+
@Override
public String toString() {
return "Bucket{" +
@@ -80,4 +90,8 @@
", bucketName='" + bucketName + '\'' +
'}';
}
+
+ public boolean isEmpty() {
+ return entries.isEmpty();
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -2,18 +2,25 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.horizon.lock.StripedLock;
+import org.horizon.Cache;
+import org.horizon.util.concurrent.WithinThreadExecutor;
import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
-import org.horizon.loader.CacheLoaderException;
-import org.horizon.loader.CacheLoaderConfig;
-import org.horizon.Cache;
+import org.horizon.lock.StripedLock;
import org.horizon.marshall.Marshaller;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.io.ObjectOutputStream;
/**
* //TODO comment this
@@ -28,6 +35,7 @@
private StripedLock bucketLocks;
private BucketBasedCacheStoreConfig config;
private long globalLockTimeoutMillis;
+ private ExecutorService purgerService;
/**
* This global lock guards against direct store access via clear() and the stream APIs. These three methods would
@@ -43,8 +51,17 @@
public void start() throws CacheLoaderException {
bucketLocks = new StripedLock(config.getLockConcurrencyLevel());
globalLockTimeoutMillis = config.getLockAcquistionTimeout();
+ if (config.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
}
+ public void stop() throws CacheLoaderException {
+ purgerService.shutdownNow();
+ }
+
public StoredEntry load(Object key) throws CacheLoaderException {
if (log.isTraceEnabled()) log.trace("Loading entry " + key);
String keyHashCode = String.valueOf(key.hashCode());
@@ -61,8 +78,6 @@
} else {
return se;
}
- } catch (Exception e) {
- throw new CacheLoaderException("Problems loading key " + key, e);
} finally {
unlock(keyHashCode);
}
@@ -90,9 +105,6 @@
bucket.addEntry(ed);
insertBucket(bucket);
}
- }
- catch (Exception ex) {
- throw new CacheLoaderException("Problems storing entry with key " + ed.getKey(), ex);
} finally {
unlock(keyHashCode);
}
@@ -112,14 +124,70 @@
if (success) saveBucket(bucket);
return success;
}
- } catch (Exception e) {
- throw new CacheLoaderException("Problems removing key " + key, e);
} finally {
unlock(keyHashCodeStr);
}
}
+ public void fromStream(InputStream inputStream) throws CacheLoaderException {
+ ObjectInputStream ois = null;
+ try {
+ // first clear all local state
+ acquireGlobalLock(true);
+ clear();
+ ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
+ new ObjectInputStream(inputStream);
+ fromStreamInternal(ois);
+ }
+ catch (IOException e) {
+ throw new CacheLoaderException("Cannot convert to ObjectInputSream", e);
+ } finally {
+ releaseGlobalLock(true);
+ // we should close the stream we created!
+ if (inputStream != ois) safeClose(ois);
+ }
+ }
+ public void toStream(OutputStream outputStream) throws CacheLoaderException {
+ ObjectOutputStream oos = null;
+ try {
+ acquireGlobalLock(true);
+ try {
+ oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
+ new ObjectOutputStream(outputStream);
+ } catch (IOException e) {
+ throw new CacheLoaderException(e);
+ }
+ toStreamInternal(oos);
+ } finally {
+ releaseGlobalLock(true);
+ // we should close the stream we created!
+ if (oos != outputStream) safeClose(oos);
+ }
+ }
+
+ public void clear() throws CacheLoaderException {
+ log.trace("Clearing store");
+ try {
+ acquireGlobalLock(true);
+ clearInternal();
+ } finally {
+ releaseGlobalLock(true);
+ }
+ }
+
+ public void purgeExpired() {
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ purgeInternal();
+ } catch (CacheLoaderException e) {
+ log.info("Problems encountered while purging expired", e);
+ }
+ }
+ });
+ }
+
protected void unlock(String keyHashCode) {
bucketLocks.releaseLock(keyHashCode);
globalLock.readLock().unlock();
@@ -135,6 +203,18 @@
bucketLocks.acquireLock(keyHashCode, true);
}
+ protected boolean immediateLockForWritting(String keyHashCode) throws CacheLoaderException {
+ try {
+ if (!globalLock.readLock().tryLock(0, TimeUnit.MILLISECONDS)) {
+ return false;
+ }
+ } catch (InterruptedException e) {
+ log.warn("Received interrupt signal while waiting for global lock aquisition");
+ throw new CacheLoaderException(e);
+ }
+ return bucketLocks.acquireLock(keyHashCode, true, 0);
+ }
+
protected void lockForReading(String keyHashCode) throws CacheLoaderException {
try {
globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
@@ -145,10 +225,14 @@
bucketLocks.acquireLock(keyHashCode, false);
}
- protected void acquireGlobalLock(boolean exclusive) throws TimeoutException, InterruptedException {
+ protected void acquireGlobalLock(boolean exclusive) throws CacheLoaderException {
Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
- if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
- throw new TimeoutException("Timed out trying to acquire " + (exclusive ? "exclusive" : "shared") + " global lock after " + globalLockTimeoutMillis + " millis. Lock is " + l);
+ try {
+ if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
+ throw new CacheLoaderException("Timed out trying to acquire " + (exclusive ? "exclusive" : "shared") + " global lock after " + globalLockTimeoutMillis + " millis. Lock is " + l);
+ } catch (InterruptedException e) {
+ throw new CacheLoaderException(e);
+ }
}
protected void releaseGlobalLock(boolean exclusive) {
@@ -172,4 +256,12 @@
protected abstract void saveBucket(Bucket bucket) throws CacheLoaderException;
protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
+
+ protected abstract void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException;
+
+ protected abstract void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException;
+
+ protected abstract void clearInternal() throws CacheLoaderException;
+
+ protected abstract void purgeInternal() throws CacheLoaderException;
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -11,6 +11,7 @@
private int lockConcurrencyLevel = 2048;
private long lockAcquistionTimeout = 60000;
+ private boolean purgeSynchronously = false;
public int getLockConcurrencyLevel() {
return lockConcurrencyLevel;
@@ -29,4 +30,13 @@
testImmutability("lockAcquistionTimeout");
this.lockAcquistionTimeout = lockAcquistionTimeout;
}
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ testImmutability("purgeSynchronously");
+ this.purgeSynchronously = purgeSynchronously;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -10,13 +10,17 @@
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
-import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.io.*;
+import java.io.BufferedInputStream;
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.util.HashSet;
import java.util.Set;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
/**
* A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}. This file store stores stuff in the
@@ -42,8 +46,8 @@
public class FileCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(FileCacheStore.class);
private int streamBufferSize;
- ExecutorService purgerService;
+
FileCacheStoreConfig cfg;
Cache cache;
Marshaller m;
@@ -77,14 +81,8 @@
return result;
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- ObjectInputStream ois = null;
+ protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
try {
- // first clear all local state
- acquireGlobalLock(true);
- clear();
- ois = (inputStream instanceof ObjectInputStream) ? (ObjectInputStream) inputStream :
- new ObjectInputStream(inputStream);
int numFiles = ois.readInt();
byte[] buffer = new byte[streamBufferSize];
int bytesRead, totalBytesRead = 0;
@@ -101,30 +99,20 @@
bos.write(buffer, 0, bytesRead);
}
bos.flush();
- bos.close();
+ safeClose(bos);
fos.flush();
- fos.close();
+ safeClose(fos);
totalBytesRead = 0;
}
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O error", e);
+ } catch (ClassNotFoundException e) {
+ throw new CacheLoaderException("Unexpected expcetion", e);
}
- catch (Exception e) {
- CacheLoaderException cle = (e instanceof CacheLoaderException) ? (CacheLoaderException) e :
- new CacheLoaderException("Problems reading from stream", e);
- throw cle;
- }
- finally {
- releaseGlobalLock(true);
- // we should close the stream we created!
- if (inputStream != ois) safeClose(ois);
- }
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- ObjectOutputStream oos = null;
+ protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
try {
- acquireGlobalLock(true);
- oos = (outputStream instanceof ObjectOutputStream) ? (ObjectOutputStream) outputStream :
- new ObjectOutputStream(outputStream);
File[] files = root.listFiles();
oos.writeInt(files.length);
byte[] buffer = new byte[streamBufferSize];
@@ -145,39 +133,19 @@
bis.close();
fileInStream.close();
}
- } catch (Exception ioe) {
- throw new CacheLoaderException("Problems handling stream", ioe);
- } finally {
- releaseGlobalLock(true);
- // we should close the stream we created!
- if (oos != outputStream) safeClose(oos);
+ } catch (IOException e) {
+ throw new CacheLoaderException("I/O expcetion while generating stream", e);
}
}
- public void clear() throws CacheLoaderException {
- log.trace("Clearing store");
- try {
- acquireGlobalLock(true);
- for (File f : root.listFiles()) {
- if (!f.delete()) log.warn("Had problems removing file {0}", f);
- }
- } catch (Exception e) {
- throw new CacheLoaderException("Problems clearing cache store", e);
- } finally {
- releaseGlobalLock(true);
+ protected void clearInternal() throws CacheLoaderException {
+ for (File f : root.listFiles()) {
+ if (!f.delete()) log.warn("Had problems removing file {0}", f);
}
}
- public void purgeExpired() {
- purgerService.execute(new Runnable() {
- public void run() {
- try {
- loadAll();
- } catch (CacheLoaderException e) {
- log.info("Problems encountered while purging expired", e);
- }
- }
- });
+ protected void purgeInternal() throws CacheLoaderException {
+ loadAll();
}
protected Bucket loadBucket(String bucketName) throws CacheLoaderException {
@@ -252,18 +220,9 @@
if (!root.mkdirs())
throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
}
- if (cfg.isPurgeSynchronously()) {
- purgerService = new WithinThreadExecutor();
- } else {
- purgerService = Executors.newSingleThreadExecutor();
- }
streamBufferSize = cfg.getStreamBufferSize();
}
- public void stop() {
- purgerService.shutdownNow();
- }
-
public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
return loadBucket(key.hashCode() + "");
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -22,7 +22,6 @@
*/
public class FileCacheStoreConfig extends BucketBasedCacheStoreConfig {
String location = "Horizon-FileCacheStore";
- private boolean purgeSynchronously = false;
private int streamBufferSize = 8192;
public FileCacheStoreConfig() {
@@ -38,15 +37,7 @@
this.location = location;
}
- public boolean isPurgeSynchronously() {
- return purgeSynchronously;
- }
- public void setPurgeSynchronously(boolean purgeSynchronously) {
- testImmutability("purgeSynchronously");
- this.purgeSynchronously = purgeSynchronously;
- }
-
public int getStreamBufferSize() {
return streamBufferSize;
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -13,13 +13,16 @@
import java.io.IOException;
import java.io.InputStream;
-import java.io.OutputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
import java.util.Set;
-import java.util.HashSet;
/**
* // TODO: Manik: Document this!
@@ -29,6 +32,7 @@
public class JdbcCacheStore extends BucketBasedCacheStore {
private static final Log log = LogFactory.getLog(JdbcCacheStore.class);
+ public final static String STREAM_DELIMITER = "__jdbcCacheLoader_done__";
private JdbcCacheStoreConfig config;
private ConnectionFactory connectionFactory;
@@ -87,6 +91,7 @@
ps.setString(1, bucket.getBucketName());
ByteBuffer byteBuffer = marshall(bucket);
ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
int insertedRows = ps.executeUpdate();
if (insertedRows != 1) {
throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
@@ -111,12 +116,12 @@
ps = conn.prepareStatement(sql);
ByteBuffer buffer = marshall(bucket);
ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
- ps.setString(2, bucket.getBucketName());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, bucket.getBucketName());
int updatedRows = ps.executeUpdate();
if (updatedRows != 1) {
throw new CacheLoaderException("Unexpected update result: '" + updatedRows + "'. Expected values is 1");
}
-
} catch (SQLException e) {
logAndThrow(e, "sql failure while updating bucket: " + bucket);
} finally {
@@ -155,7 +160,6 @@
}
}
-
public Set<StoredEntry> loadAll() throws CacheLoaderException {
Connection conn = null;
PreparedStatement ps = null;
@@ -169,6 +173,12 @@
ps = conn.prepareStatement(sql);
rs = ps.executeQuery();
Set<StoredEntry> result = new HashSet<StoredEntry>();
+ while (rs.next()) {
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(binaryStream);
+ result.addAll(bucket.getStoredEntries());
+ }
+ return result;
} catch (SQLException e) {
String message = "sql failure while loading key: ";
log.error(message, e);
@@ -178,25 +188,217 @@
JdbcUtil.safeClose(ps);
releaseConnection(conn);
}
- return null;
}
- public void fromStream(InputStream inputStream) throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+ protected void fromStreamInternal(ObjectInputStream ois) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = getConnection();
+ String sql = config.getInsertBucketSql();
+ ps = conn.prepareStatement(sql);
+
+ int readBuckets = 0;
+ int batchSize = 100;
+ String bucketName = (String) ois.readObject();
+ while (!bucketName.equals(STREAM_DELIMITER)) {
+ Bucket bucket = (Bucket) ois.readObject();
+ readBuckets++;
+ ps.setString(1, bucketName);
+ ByteBuffer buffer = marshall(bucket);
+ ps.setBinaryStream(2, buffer.getStream(), buffer.getLength());
+ ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
+ if (readBuckets % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
+ } else {
+ ps.addBatch();
+ }
+ bucketName = (String) ois.readObject();
+ }
+ if (readBuckets % batchSize != 0)
+ ps.executeBatch();//flush the batch
+ if (log.isTraceEnabled())
+ log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
+ } catch (IOException ex) {
+ logAndThrow(ex, "I/O failure while integrating state into store");
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL failure while integrating state into store");
+ } catch (ClassNotFoundException e) {
+ logAndThrow(e, "Unexpected failure while integrating state into store");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void toStream(OutputStream outputStream) throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+ protected void toStreamInternal(ObjectOutputStream oos) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ conn = getConnection();
+ String sql = config.getLoadAllSql();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(100);
+ while (rs.next()) {
+ InputStream inputStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(inputStream);
+ String bucketName = rs.getString(2);
+ oos.writeObject(bucketName);
+ oos.writeObject(bucket);
+ }
+ oos.writeObject(STREAM_DELIMITER);
+ } catch (SQLException ex) {
+ logAndThrow(ex, "SQL failure while writing store's content to stream");
+ }
+ catch (IOException e) {
+ logAndThrow(e, "IO failure while writing store's content to stream");
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void clear() throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+ protected void clearInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = config.getClearSql();
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully removed " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
- public void purgeExpired() throws CacheLoaderException {
- throw new IllegalStateException("TODO - please implement me!!!"); //todo implement!!!
+ protected void purgeInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ Set<Bucket> expiredBuckets = new HashSet<Bucket>();
+ final int batchSize = 100;
+ try {
+ String sql = config.getSelectExpiredBucketsSql();
+ conn = getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setLong(1, System.currentTimeMillis());
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ String key = rs.getString(2);
+ if (immediateLockForWritting(key)) {
+ if (log.isTraceEnabled()) log.trace("Adding bucket keyed " + key + " for purging.");
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = unmarshall(binaryStream);
+ bucket.setBucketName(key);
+ expiredBuckets.add(bucket);
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Could not acquire write lock for " + key + ", this won't be purged even though it has expired elements");
+ }
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(expiredBuckets);
+ releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ JdbcUtil.safeClose(rs);
+ }
+
+ if (log.isTraceEnabled())
+ log.trace("Found following buckets: " + expiredBuckets + " which are about to be expired");
+
+ if (expiredBuckets.isEmpty()) return;
+ Set<Bucket> emptyBuckets = new HashSet<Bucket>();
+ //now update all the buckets in batch
+ try {
+ String sql = config.getSaveBucketSql();
+ ps = conn.prepareStatement(sql);
+ int updateCount = 0;
+ Iterator<Bucket> it = expiredBuckets.iterator();
+ while (it.hasNext()) {
+ Bucket bucket = it.next();
+ bucket.removeExpiredEntries();
+ if (!bucket.isEmpty()) {
+ ByteBuffer byteBuffer = marshall(bucket);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.addBatch();
+ updateCount++;
+ if (updateCount % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled()) log.trace("Flushing batch, update count is: " + updateCount);
+ }
+ } else {
+ it.remove();
+ emptyBuckets.add(bucket);
+ }
+ }
+ //flush the batch
+ if (updateCount % batchSize != 0) {
+ ps.executeBatch();
+ }
+ if (log.isTraceEnabled()) log.trace("Updated " + updateCount + " buckets.");
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(emptyBuckets);
+ releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
+ releaseLocks(expiredBuckets);
+ JdbcUtil.safeClose(ps);
+ }
+
+
+ if (log.isTraceEnabled()) log.trace("About to remove empty buckets " + emptyBuckets);
+
+ if (emptyBuckets.isEmpty()) return;
+ //then remove the empty buckets
+ try {
+ String sql = config.getDeleteBucketSql();
+ ps = conn.prepareStatement(sql);
+ int deletionCount = 0;
+ for (Bucket bucket : emptyBuckets) {
+ ps.setString(1, bucket.getBucketName());
+ ps.addBatch();
+ deletionCount++;
+ if (deletionCount % batchSize == 0) {
+ if (log.isTraceEnabled()) log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
+ ps.executeBatch();
+ }
+ }
+ if (deletionCount % batchSize != 0) {
+ int[] batchResult = ps.executeBatch();
+ if (log.isTraceEnabled()) log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ logAndThrow(ex, "Failed clearing JdbcCacheStore");
+ } finally {
+ releaseLocks(emptyBuckets);
+ JdbcUtil.safeClose(ps);
+ releaseConnection(conn);
+ }
}
+ private void releaseLocks(Set<Bucket> expiredBucketKeys) throws CacheLoaderException {
+ for (Bucket bucket : expiredBucketKeys) {
+ unlock(bucket.getBucketName());
+ }
+ }
+
public Class<? extends CacheLoaderConfig> getConfigurationClass() {
return JdbcCacheStoreConfig.class;
}
@@ -206,7 +408,8 @@
}
private void releaseConnection(Connection conn) {
- connectionFactory.releaseConnection(conn);
+ if (conn != null)//connection might be null as we only release it in finally blocks
+ connectionFactory.releaseConnection(conn);
}
private ByteBuffer marshall(Bucket bucket) throws CacheLoaderException {
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -17,7 +17,7 @@
private String connectionFactoryClass;
- /* required by NonManagedConnectionFactory */
+ /* required by SimpleConnectionFactory */
private String connectionUrl;
private String userName;
private String password;
@@ -30,10 +30,17 @@
private String keyColumnType;
private String dataColumnName;
private String dataColumnType;
+ private String timestampColumnName;
+ private String timestampColumnType;
+
+ /* cache for sql commands */
private String insertBucketSql;
private String saveBucketSql;
private String loadBucketSql;
private String loadAllSql;
+ private String clearSql;
+ private String selectExpiredBucketsSql;
+ private String deleteBucketSql;
public JdbcCacheStoreConfig() {
className = JdbcCacheStore.class.getName();
@@ -144,23 +151,38 @@
this.dataColumnType = dataColumnType;
}
+ public String getTimestampColumnName() {
+ return timestampColumnName;
+ }
+
+ public void setTimestampColumnName(String timestampColumnName) {
+ this.timestampColumnName = timestampColumnName;
+ }
+
+ public String getTimestampColumnType() {
+ return timestampColumnType;
+ }
+
+ public void setTimestampColumnType(String timestampColumnType) {
+ this.timestampColumnType = timestampColumnType;
+ }
+
@Override
public JdbcCacheStoreConfig clone() {
- JdbcCacheStoreConfig dolly = (JdbcCacheStoreConfig) super.clone();
//don't have to assign any variables as all are primitives, and cannot change
- return dolly;
+ return (JdbcCacheStoreConfig) super.clone();
}
public String getInsertBucketSql() {
if (insertBucketSql == null) {
- insertBucketSql = "INSERT INTO " + tableName + " (" + keyColumnName + ", " + dataColumnName + ")";
+ insertBucketSql = "INSERT INTO " + tableName + " (" + keyColumnName + ", " + dataColumnName + ", " + timestampColumnName + ") VALUES(?,?,?)";
}
return insertBucketSql;
}
public String getSaveBucketSql() {
if (saveBucketSql == null) {
- saveBucketSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? WHERE " + keyColumnName + " = ?";
+ saveBucketSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? , " + timestampColumnName + "=? WHERE " + keyColumnName + " = ?";
}
return saveBucketSql;
}
@@ -172,10 +194,31 @@
return loadBucketSql;
}
+ public String getDeleteBucketSql() {
+ if (deleteBucketSql == null) {
+ deleteBucketSql = "DELETE FROM " + tableName + " WHERE " + keyColumnName + " = ?";
+ }
+ return deleteBucketSql;
+ }
+
public String getLoadAllSql() {
if (loadAllSql == null) {
- loadAllSql = "SELECT " + dataColumnName + " FROM " + tableName;
+ loadAllSql = "SELECT " + dataColumnName + "," + keyColumnName + " FROM " + tableName;
}
return loadAllSql;
}
+
+ public String getClearSql() {
+ if (clearSql == null) {
+ clearSql = "DELETE FROM " + tableName;
+ }
+ return clearSql;
+ }
+
+ public String getSelectExpiredBucketsSql() {
+ if (selectExpiredBucketsSql == null) {
+ selectExpiredBucketsSql = getLoadAllSql() + " WHERE " + timestampColumnName + "< ?";
+ }
+ return selectExpiredBucketsSql;
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -1,8 +1,9 @@
package org.horizon.loader.jdbc;
+import java.sql.Connection;
+import java.sql.ResultSet;
+import java.sql.SQLException;
import java.sql.Statement;
-import java.sql.SQLException;
-import java.sql.ResultSet;
/**
* // TODO: Mircea: Document this!
@@ -20,6 +21,16 @@
}
}
+ public static void safeClose(Connection connection) {
+ if (connection != null) {
+ try {
+ connection.close();
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
public static void safeClose(ResultSet rs) {
if (rs != null) {
try {
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -84,11 +84,12 @@
// removed CONSTRAINT clause as this causes problems with some databases, like Informix.
assertMandatoryElemenetsPresent();
String creatTableDdl = "CREATE TABLE " + config.getTableName() + "(" + config.getKeyColumnName() + " " + config.getKeyColumnType()
- + " NOT NULL, " + config.getDataColumnName() + " " + config.getDataColumnType() +
+ + " NOT NULL, " + config.getDataColumnName() + " " + config.getDataColumnType() + ", "
+ + config.getTimestampColumnName() + " " + config.getTimestampColumnType() +
", PRIMARY KEY (" + config.getKeyColumnName() + "))";
if (log.isTraceEnabled())
log.trace("Creating table with following DDL: '" + creatTableDdl + "'.");
- executeDdlStatement(creatTableDdl);
+ executeUpdateSql(creatTableDdl);
}
private void assertMandatoryElemenetsPresent() throws CacheLoaderException {
@@ -97,17 +98,20 @@
assrtNotNull(config.getTableName(), "tableName needed in order to create table");
assrtNotNull(config.getDataColumnName(), "dataColumnName needed in order to create table");
assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
+ assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
+ assrtNotNull(config.getTimestampColumnName(), "timestampColumnName needed in order to create table");
+ assrtNotNull(config.getTimestampColumnType(), "timestampColumnType needed in order to create table");
}
private void assrtNotNull(String keyColumnType, String message) throws CacheLoaderException {
if (keyColumnType == null || keyColumnType.trim().length() == 0) throw new CacheLoaderException(message);
}
- private void executeDdlStatement(String creatTableDdl) throws CacheLoaderException {
+ private void executeUpdateSql(String sql) throws CacheLoaderException {
Statement statement = null;
try {
statement = connection.createStatement();
- statement.executeUpdate(creatTableDdl);
+ statement.executeUpdate(sql);
} catch (SQLException e) {
log.error("Error while creating table",e);
throw new CacheLoaderException(e);
@@ -118,9 +122,11 @@
public void dropTable() throws CacheLoaderException {
String dropTableDdl = "DROP TABLE " + config.getTableName();
+ String clearTable = "DELETE FROM " + config.getTableName();
+ executeUpdateSql(clearTable);
if (log.isTraceEnabled())
log.trace("Dropping table with following DDL '" + dropTableDdl + "\'");
- executeDdlStatement(dropTableDdl);
+ executeUpdateSql(dropTableDdl);
}
private static String toLowerCase(String s) {
Modified: core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -22,13 +22,16 @@
package org.horizon.lock;
import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import java.util.List;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* A simple implementation of lock striping, using cache entry keys to lock on, primarily used to help make {@link
- * org.horizon.loader.CacheLoaderOld} implemtations thread safe.
+ * org.horizon.loader.CacheLoader} implemtations thread safe.
* <p/>
* Backed by a set of {@link java.util.concurrent.locks.ReentrantReadWriteLock} instances, and using the key hashcodes
* to determine buckets.
@@ -38,10 +41,14 @@
* <p/>
*
* @author <a href="mailto:manik@jboss.org">Manik Surtani</a>
+ * @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
@ThreadSafe
public class StripedLock {
+
+ private static Log log = LogFactory.getLog(StripedLock.class);
+
private static final int DEFAULT_CONCURRENCY = 20;
private final int lockSegmentMask;
private final int lockSegmentShift;
@@ -82,7 +89,6 @@
*/
public void acquireLock(Object key, boolean exclusive) {
ReentrantReadWriteLock lock = getLock(key);
-
if (exclusive) {
lock.writeLock().lock();
} else {
@@ -90,6 +96,20 @@
}
}
+ public boolean acquireLock(String key, boolean exclusive, long millis) {
+ ReentrantReadWriteLock lock = getLock(key);
+ try {
+ if (exclusive) {
+ return lock.writeLock().tryLock(millis, TimeUnit.MILLISECONDS);
+ } else {
+ return lock.readLock().tryLock(millis, TimeUnit.MILLISECONDS);
+ }
+ } catch (InterruptedException e) {
+ log.warn("Thread insterrupted while trying to acquire lock", e);
+ return false;
+ }
+ }
+
/**
* Releases a lock the caller may be holding. This method is idempotent.
*/
@@ -154,4 +174,6 @@
}
return count;
}
+
+
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -1,8 +1,15 @@
package org.horizon.loader.jdbc;
import org.horizon.loader.CacheStore;
+import org.horizon.marshall.ObjectStreamMarshaller;
+import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+
/**
* // TODO: Mircea: Document this!
*
@@ -15,19 +22,33 @@
protected CacheStore createCacheStore() throws Exception {
try {
+ Class.forName("com.mysql.jdbc.Driver").newInstance();
+ Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
+ Statement st = connection.createStatement();
+ try {
+ st.executeUpdate("DROP TABLE horizon_jdbc");
+ } catch (SQLException e) {
+ //ignore, might be the table does not exist
+ }
+ JdbcUtil.safeClose(st);
+ JdbcUtil.safeClose(connection);
+
jdbcCacheStore = new JdbcCacheStore();
JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
- config.setConnectionFactoryClass(NonManagedConnectionFactory.class.getName());
+ config.setConnectionFactoryClass(PooledConnectionFactory.class.getName());
config.setConnectionUrl("jdbc:mysql://localhost/horizon");
config.setUserName("root");
config.setPassword("root");
config.setDriverClass("com.mysql.jdbc.Driver");
- config.setTableName("horizon_jdc");
+ config.setTableName("horizon_jdbc");
config.setKeyColumnName("key_name");
config.setKeyColumnType("varchar(255)");
config.setDataColumnName("BUCKET");
- config.setDataColumnType("BINARY");
- jdbcCacheStore.init(config, null, null);
+ config.setDataColumnType("BLOB");
+ config.setTimestampColumnName("TIMESTAMP");
+ config.setTimestampColumnType("BIGINT");
+ config.setPurgeSynchronously(true);
+ jdbcCacheStore.init(config, null, new ObjectStreamMarshaller());
jdbcCacheStore.start();
return jdbcCacheStore;
} catch (Throwable e) {
@@ -35,4 +56,11 @@
throw (Exception) e;
}
}
+
+ //todo move this in upper class
+ @AfterMethod
+ public void assertNoLocksHeldAfterTest() {
+ assert jdbcCacheStore.getBucketLockCount() == 0;
+ assert jdbcCacheStore.getGlobalLockCount() == 0;
+ }
}
Added: core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -0,0 +1,59 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.test.UnitTestDatabaseManager;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.Test;
+
+import java.io.File;
+import java.sql.Connection;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * // TODO: Mircea: Document this!
+ *
+ * @author
+ */
+@Test(groups = "functional", testName = "loader.jdbc.PooledConnectionFactoryTest")
+public class PooledConnectionFactoryTest {
+
+ private PooledConnectionFactory factory;
+
+ @AfterMethod
+ public void destroyFacotry() {
+ factory.stop();
+ }
+
+ public void testValuesNoOverrides() throws Exception {
+ factory = new PooledConnectionFactory();
+ JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+ factory.start(config);
+ int hadcodedMaxPoolSize = factory.getPooledDataSource().getMaxPoolSize();
+ Set<Connection> connections = new HashSet<Connection>();
+ for (int i = 0; i < hadcodedMaxPoolSize; i++) {
+ connections.add(factory.getConnection());
+ }
+ assert connections.size() == hadcodedMaxPoolSize;
+ assert factory.getPooledDataSource().getNumBusyConnections() == hadcodedMaxPoolSize;
+ for (Connection conn : connections) {
+ conn.close();
+ }
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < 2000) {
+ if (factory.getPooledDataSource().getNumBusyConnections() == 0) break;
+ }
+ //this must happen eventually
+ assert factory.getPooledDataSource().getNumBusyConnections() == 0;
+ }
+
+ public void testWithPorpertyOverrides() throws Exception {
+ String prevVal = System.setProperty("c3p0.maxPoolSize", "3");
+ System.out.println(new File(".").getAbsolutePath());
+ factory = new PooledConnectionFactory();
+ JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+ factory.start(config);
+ assert factory.getPooledDataSource().getMaxPoolSize() == 3 : "expected 3, received " + factory.getPooledDataSource().getMaxPoolSize();
+ if (prevVal != null) System.setProperty("c3p0.maxPoolSize", prevVal);
+
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-02-25 18:30:30 UTC (rev 7790)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-02-26 14:25:24 UTC (rev 7791)
@@ -8,6 +8,7 @@
import java.sql.Connection;
import java.sql.DriverManager;
+import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
@@ -17,7 +18,7 @@
*
* @author
*/
-@Test(groups = "functional", testName ="loader.jdbc.TableManipulationTest", enabled = false)
+@Test(groups = "functional", testName = "loader.jdbc.TableManipulationTest", enabled = false)
public class TableManipulationTest {
Connection connection;
@@ -30,6 +31,7 @@
connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
Statement st = connection.createStatement();
try {
+ st.executeUpdate("DELETE FROM horizon_test");
st.executeUpdate("DROP TABLE horizon_test");
} catch (SQLException e) {
//ignore, might be the table does not exist
@@ -41,6 +43,8 @@
config.setTableName("horizon_test");
config.setKeyColumnName("KEY_HASH");
config.setDataColumnName("BUCKET");
+ config.setTimestampColumnName("TIMESTAMP");
+ config.setTimestampColumnType("BIGINT");
tableManipulation = new TableManipulation(connection, config);
}
@@ -56,6 +60,8 @@
config.setTableName("horizon");
config.setKeyColumnName("dsadsa");
config.setDataColumnName("dsadsa");
+ config.setTimestampColumnName("timestamp");
+ config.setTimestampColumnType("BIGINT");
Connection mockConnection = createMock(Connection.class);
Statement mockStatement = createNiceMock(Statement.class);
expect(mockConnection.createStatement()).andReturn(mockStatement);
@@ -102,6 +108,24 @@
config.setDataColumnName("abc");
assert true : "We do not expect a failure here";
}
+
+ config.setTimestampColumnName(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setDataColumnName("timestamp");
+ assert true : "We do not expect a failure here";
+ }
+
+ config.setTimestampColumnType(null);
+ try {
+ other.createTable();
+ assert false : "missing config param, exception expected";
+ } catch (CacheLoaderException e) {
+ config.setDataColumnName("BIGINT");
+ assert true : "We do not expect a failure here";
+ }
}
public void testCreateTable() throws Exception {
@@ -117,8 +141,16 @@
}
@Test(dependsOnMethods = "testExists")
- public void testDrop() throws CacheLoaderException {
+ public void testDrop() throws Exception {
assert tableManipulation.tableExists();
+ PreparedStatement ps = null;
+ try {
+ ps = connection.prepareStatement("INSERT INTO horizon_test(KEY_HASH) values(?)");
+ ps.setString(1, System.currentTimeMillis() + "");
+ assert 1 == ps.executeUpdate();
+ } finally {
+ JdbcUtil.safeClose(ps);
+ }
tableManipulation.dropTable();
assert !tableManipulation.tableExists();
}
15 years, 10 months
JBoss Cache SVN: r7790 - in core/branches/flat/src: main/java/org/horizon/container and 7 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-25 13:30:30 -0500 (Wed, 25 Feb 2009)
New Revision: 7790
Added:
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
Log:
More WIP on porting NBST
Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -21,10 +21,10 @@
*/
package org.horizon.commands.tx;
-import org.horizon.commands.DataCommand;
import org.horizon.commands.ReplicableCommand;
import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.remoting.transport.Address;
import org.horizon.transaction.GlobalTransaction;
@@ -42,11 +42,11 @@
public class PrepareCommand extends AbstractTransactionBoundaryCommand {
public static final byte METHOD_ID = 10;
- protected List<DataCommand> modifications;
+ protected List<WriteCommand> modifications;
protected Address localAddress;
protected boolean onePhaseCommit;
- public PrepareCommand(GlobalTransaction gtx, List<DataCommand> modifications, Address localAddress, boolean onePhaseCommit) {
+ public PrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, Address localAddress, boolean onePhaseCommit) {
this.gtx = gtx;
this.modifications = modifications;
this.localAddress = localAddress;
@@ -64,7 +64,7 @@
return visitor.visitPrepareCommand(ctx, this);
}
- public List<DataCommand> getModifications() {
+ public List<WriteCommand> getModifications() {
return modifications;
}
@@ -97,7 +97,7 @@
@SuppressWarnings("unchecked")
public void setParameters(int commandId, Object[] args) {
gtx = (GlobalTransaction) args[0];
- modifications = (List<DataCommand>) args[1];
+ modifications = (List<WriteCommand>) args[1];
localAddress = (Address) args[2];
onePhaseCommit = (Boolean) args[3];
}
@@ -130,7 +130,7 @@
PrepareCommand copy = new PrepareCommand();
copy.gtx = gtx;
copy.localAddress = localAddress;
- copy.modifications = modifications == null ? null : new ArrayList<DataCommand>(modifications);
+ copy.modifications = modifications == null ? null : new ArrayList<WriteCommand>(modifications);
copy.onePhaseCommit = onePhaseCommit;
return copy;
}
@@ -146,7 +146,7 @@
}
public boolean containsModificationType(Class<? extends ReplicableCommand> replicableCommandClass) {
- for (DataCommand mod : getModifications()) {
+ for (WriteCommand mod : getModifications()) {
if (mod.getClass().equals(replicableCommandClass)) {
return true;
}
Modified: core/branches/flat/src/main/java/org/horizon/container/DataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/container/DataContainer.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -59,4 +59,6 @@
Set<Object> purgeExpiredEntries();
StoredEntry createEntryForStorage(Object key);
+
+ Set<StoredEntry> getAllEntriesForStorage();
}
Modified: core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/container/UnsortedDataContainer.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -205,6 +205,22 @@
return new StoredEntry(key, ecv.getValue(), ecv.getCreatedTime(), ecv.getExpiryTime());
}
+ public Set<StoredEntry> getAllEntriesForStorage() {
+ Set<StoredEntry> set = new HashSet<StoredEntry>(immortalData.size() + expirableData.size());
+ for (Map.Entry<Object, CachedValue> entry: immortalData.entrySet())
+ set.add(new StoredEntry(entry.getKey(), entry.getValue().getValue()));
+
+ for (Iterator<Map.Entry<Object, ExpirableCachedValue>> it = expirableData.entrySet().iterator(); it.hasNext();) {
+ Map.Entry<Object, ExpirableCachedValue> entry = it.next();
+ ExpirableCachedValue ecv = entry.getValue();
+ if (ecv.isExpired())
+ it.remove();
+ else
+ set.add(new StoredEntry(entry.getKey(), ecv.getValue(), ecv.getCreatedTime(), ecv.getExpiryTime()));
+ }
+ return set;
+ }
+
private class KeySet extends AbstractSet<Object> {
Set<Object> immortalKeys;
Set<Object> expirableKeys;
Modified: core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/factories/EmptyConstructorNamedCacheFactory.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -33,6 +33,7 @@
import org.horizon.marshall.VersionAwareMarshaller;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.transaction.TransactionTable;
+import org.horizon.transaction.TransactionLog;
/**
* Simple factory that just uses reflection and an empty constructor of the component type.
@@ -42,7 +43,8 @@
*/
@DefaultFactoryFor(classes = {CacheNotifier.class, EntryFactory.class, CommandsFactory.class,
CacheLoaderManager.class, InvocationContextContainer.class,
- TransactionTable.class, BatchContainer.class, ContextFactory.class})
+ TransactionTable.class, BatchContainer.class, ContextFactory.class,
+ TransactionLog.class})
public class EmptyConstructorNamedCacheFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
@Override
public <T> T construct(Class<T> componentType) {
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -23,7 +23,6 @@
import org.horizon.commands.AbstractVisitor;
import org.horizon.commands.CommandsFactory;
-import org.horizon.commands.DataCommand;
import org.horizon.commands.VisitableCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.write.ClearCommand;
@@ -124,9 +123,9 @@
throw new IllegalStateException("cannot find transaction transactionContext for " + gtx);
if (transactionContext.hasModifications()) {
- List<DataCommand> mods;
+ List<WriteCommand> mods;
if (transactionContext.hasLocalModifications()) {
- mods = new ArrayList<DataCommand>(command.getModifications());
+ mods = new ArrayList<WriteCommand>(command.getModifications());
mods.removeAll(transactionContext.getLocalModifications());
} else {
mods = command.getModifications();
@@ -162,7 +161,7 @@
return retval;
}
- private void broadcastInvalidate(List<DataCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
+ private void broadcastInvalidate(List<WriteCommand> modifications, Transaction tx, InvocationContext ctx) throws Throwable {
if (ctx.getTransaction() != null && !isLocalModeForced(ctx)) {
if (modifications == null || modifications.isEmpty()) return;
InvalidationFilterVisitor filterVisitor = new InvalidationFilterVisitor(modifications.size());
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManager.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -23,6 +23,8 @@
boolean isFetchPersistentState();
void preload();
+
+ boolean isEnabled();
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheLoaderManagerImpl.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -75,15 +75,15 @@
}
public boolean isUsingPassivation() {
- return clmConfig.isPassivation();
+ return isEnabled() ? clmConfig.isPassivation() : false;
}
public boolean isShared() {
- return clmConfig.isShared();
+ return isEnabled() ? clmConfig.isShared() : false;
}
public boolean isFetchPersistentState() {
- return clmConfig.isFetchPersistentState();
+ return isEnabled() ? clmConfig.isFetchPersistentState() : false;
}
@Start(priority = 10)
@@ -100,6 +100,10 @@
}
}
+ public boolean isEnabled() {
+ return clmConfig != null;
+ }
+
/**
* Performs a preload on the cache based on the cache loader preload configs used when configuring the cache.
*/
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -13,14 +13,6 @@
*/
private volatile boolean isStateSet = false;
- public boolean isStateSet() {
- return isStateSet;
- }
-
- public void setStateSet(boolean stateSet) {
- isStateSet = stateSet;
- }
-
public StateTransferException getSetStateException() {
return setStateException;
}
@@ -43,6 +35,7 @@
public void notifyStateReceiptSucceeded() {
synchronized (stateLock) {
+ isStateSet = true;
// Notify wait that state has been set.
stateLock.notifyAll();
}
@@ -50,6 +43,7 @@
public void notifyStateReceiptFailed(StateTransferException setStateException) {
this.setStateException = setStateException;
+ isStateSet = false;
notifyStateReceiptSucceeded();
}
}
Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -21,12 +21,20 @@
*/
package org.horizon.statetransfer;
-import org.horizon.Cache;
+import org.horizon.AdvancedCache;
+import org.horizon.transaction.TransactionLog;
import org.horizon.config.Configuration;
+import org.horizon.container.DataContainer;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
+import org.horizon.invocation.Options;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.StoredEntry;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
import org.horizon.remoting.RPCManager;
import org.horizon.util.Util;
@@ -36,25 +44,40 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
public class StateTransferManagerImpl implements StateTransferManager {
RPCManager rpcManager;
- Cache cache;
+ AdvancedCache cache;
Configuration configuration;
+ DataContainer dataContainer;
+ CacheLoaderManager clm;
+ CacheStore cs;
+ Marshaller marshaller;
+ TransactionLog transactionLog;
private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
private static final Delimiter DELIMITER = new Delimiter();
@Inject
- public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration configuration) {
+ public void injectDependencies(RPCManager rpcManager, AdvancedCache cache, Configuration configuration,
+ DataContainer dataContainer, CacheLoaderManager clm, Marshaller marshaller,
+ TransactionLog transactionLog) {
this.rpcManager = rpcManager;
this.cache = cache;
this.configuration = configuration;
+ this.dataContainer = dataContainer;
+ this.clm = clm;
+ this.marshaller = marshaller;
+ this.transactionLog = transactionLog;
}
@Start(priority = 14)
// it is imperative that this starts *after* the RPCManager does.
public void start() throws StateTransferException {
+ cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null : clm.getCacheStore();
+
long startTime = 0;
if (log.isDebugEnabled()) {
log.debug("Initiating state transfer process");
@@ -81,6 +104,7 @@
delimit(oos);
oos.flush();
oos.close();
+ if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
// just close the object stream but do NOT close the underlying stream
} catch (StateTransferException ste) {
throw ste;
@@ -99,6 +123,7 @@
assertDelimited(ois);
applyPersistentState(ois);
assertDelimited(ois);
+ if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
ois.close();
// just close the object stream but do NOT close the underlying stream
} catch (StateTransferException ste) {
@@ -109,19 +134,49 @@
}
private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
- throw new StateTransferException("Implement me!");
+ dataContainer.clear();
+ try {
+ Set<StoredEntry> set = (Set<StoredEntry>) marshaller.objectFromObjectStream(i);
+ for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(), se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+ } catch (Exception e) {
+ dataContainer.clear();
+ throw new StateTransferException(e);
+ }
}
private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException {
- throw new StateTransferException("Implement me!");
+ // write all StoredEntries to the stream using the marshaller.
+ // TODO is it safe enough to get these from the data container directly?
+ try {
+ Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+ marshaller.objectToObjectStream(s, o);
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
}
private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
- throw new StateTransferException("Implement me!");
+ if (cs == null) {
+ if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured. Skipping applying persistent state.");
+ } else {
+ try {
+ cs.fromStream(i);
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
+ }
+ }
}
-
+
private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
- throw new StateTransferException("Implement me!");
+ if (cs == null) {
+ if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured. Skipping generating persistent state.");
+ } else {
+ try {
+ cs.toStream(o);
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
+ }
+ }
}
private void delimit(ObjectOutputStream o) throws IOException {
@@ -135,7 +190,8 @@
} catch (Exception e) {
throw new StateTransferException(e);
}
- if ((o == null) || !(o instanceof Delimiter)) throw new StateTransferException("Expected a delimiter, recieved " + o);
+ if ((o == null) || !(o instanceof Delimiter))
+ throw new StateTransferException("Expected a delimiter, recieved " + o);
}
// used as a marker for streams.
Added: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -0,0 +1,178 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.transaction;
+
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.ObjectOutputStream;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Logs transactions and writes for Non-Blocking State Transfer
+ *
+ * @author Jason T. Greene
+ */
+public class TransactionLog
+{
+ private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
+ private final BlockingQueue<LogEntry> entries = new LinkedBlockingQueue<LogEntry>();
+ private AtomicBoolean active = new AtomicBoolean();
+
+ public static class LogEntry
+ {
+ private final GlobalTransaction transaction;
+ private final List<WriteCommand> modifications;
+
+ public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications)
+ {
+ this.transaction = transaction;
+ this.modifications = modifications;
+ }
+
+ public GlobalTransaction getTransaction()
+ {
+ return transaction;
+ }
+
+ public List<WriteCommand> getModifications()
+ {
+ return modifications;
+ }
+ }
+
+ private static Log log = LogFactory.getLog(TransactionLog.class);
+
+ public void logPrepare(PrepareCommand command)
+ {
+ pendingPrepares.put(command.getGlobalTransaction(), command);
+ }
+
+ public void logCommit(GlobalTransaction gtx)
+ {
+ PrepareCommand command = pendingPrepares.remove(gtx);
+ // it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
+ // modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
+ // prepare.
+ if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
+ }
+
+ private void addEntry(LogEntry entry)
+ {
+ if (! isActive())
+ return;
+
+ for (;;)
+ {
+ try
+ {
+ if (log.isTraceEnabled())
+ log.trace("Added commit entry to tx log" + entry);
+
+ entries.put(entry);
+ break;
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications)
+ {
+ // Just in case...
+ if (gtx != null) pendingPrepares.remove(gtx);
+ if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
+ }
+
+ public void logNoTxWrite(WriteCommand write)
+ {
+ if (! isActive())
+ return;
+
+ ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
+ list.add(write);
+ addEntry(new LogEntry(null, list));
+ }
+
+ public void rollback(GlobalTransaction gtx)
+ {
+ pendingPrepares.remove(gtx);
+ }
+
+ public boolean isActive()
+ {
+ return active.get();
+ }
+
+ public boolean activate()
+ {
+ return active.compareAndSet(false, true);
+ }
+
+ public void deactivate()
+ {
+ active.set(false);
+ if (entries.size() > 0)
+ log.error("Unprocessed Transaction Log Entries! = " + entries.size());
+ entries.clear();
+ }
+
+ public int size()
+ {
+ return entries.size();
+ }
+
+ public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception
+ {
+ List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+
+ while (entries.drainTo(buffer, 10) > 0)
+ {
+ for (LogEntry entry : buffer)
+ marshaller.objectToObjectStream(entry, out);
+
+ buffer.clear();
+ }
+ }
+
+ public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception
+ {
+ for (PrepareCommand entry : pendingPrepares.values())
+ marshaller.objectToObjectStream(entry, out);
+ }
+
+ public boolean hasPendingPrepare(PrepareCommand command)
+ {
+ return pendingPrepares.containsKey(command.getGlobalTransaction());
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/test/ReplListener.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/ReplListener.java 2009-02-25 17:39:20 UTC (rev 7789)
+++ core/branches/flat/src/test/java/org/horizon/test/ReplListener.java 2009-02-25 18:30:30 UTC (rev 7790)
@@ -1,16 +1,16 @@
package org.horizon.test;
import org.horizon.Cache;
+import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.tx.CommitCommand;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.interceptors.base.CommandInterceptor;
-import org.horizon.commands.VisitableCommand;
-import org.horizon.commands.DataCommand;
-import org.horizon.commands.tx.PrepareCommand;
-import org.horizon.commands.tx.CommitCommand;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.Set;
-import java.util.HashSet;
-import java.util.Arrays;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -114,7 +114,7 @@
Object o = invokeNextInterceptor(ctx, cmd);
if (!ctx.isOriginLocal()) {
markAsVisited(cmd);
- for (DataCommand mod : cmd.getModifications()) markAsVisited(mod);
+ for (WriteCommand mod : cmd.getModifications()) markAsVisited(mod);
}
return o;
}
15 years, 10 months
JBoss Cache SVN: r7788 - in core/branches/flat/src: test/java/org/horizon/loader and 1 other directory.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-25 12:29:45 -0500 (Wed, 25 Feb 2009)
New Revision: 7788
Modified:
core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
Log:
More detailed fromStream()/toStream() contract and appropriate test
Modified: core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-25 16:42:15 UTC (rev 7787)
+++ core/branches/flat/src/main/java/org/horizon/loader/CacheStore.java 2009-02-25 17:29:45 UTC (rev 7788)
@@ -29,7 +29,14 @@
* implementation-specific format, typically generated using {@link #toStream(java.io.OutputStream)}. While not a
* requirement, it is recommended that implementations make use of the {@link org.horizon.marshall.Marshaller} when
* dealing with the stream to make use of efficient marshalling.
- *
+ * <p />
+ * It is imperative that implementations <b><i>do not</i></b> close the stream after finishing with it.
+ * <p />
+ * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream
+ * since other processes may write additional data to the stream after the cache store has written to it. As such,
+ * either markers or some other mechanism to prevent the store from reading too much information should be employed
+ * when writing to the stream in {@link #fromStream(java.io.InputStream)} to prevent data corruption.
+ * <p />
* @param inputStream stream to read from
* @throws CacheLoaderException in the event of problems writing to the store
*/
@@ -41,7 +48,14 @@
* <p/>
* While not a requirement, it is recommended that implementations make use of the {@link
* org.horizon.marshall.Marshaller} when dealing with the stream to make use of efficient marshalling.
- *
+ * <p />
+ * It is imperative that implementations <b><i>do not</i></b> close the stream after finishing with it.
+ * <p />
+ * It is also <b><i>recommended</b></i> that implementations use their own start and end markers on the stream
+ * since other processes may write additional data to the stream after the cache store has written to it. As such,
+ * either markers or some other mechanism to prevent the store from reading too much information in {@link #fromStream(java.io.InputStream)}
+ * should be employed, to prevent data corruption.
+ * <p />
* @param outputStream stream to write to
* @throws CacheLoaderException in the event of problems reading from the store
*/
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-25 16:42:15 UTC (rev 7787)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-02-25 17:29:45 UTC (rev 7788)
@@ -248,7 +248,6 @@
ByteArrayOutputStream out = new ByteArrayOutputStream();
cs.toStream(out);
- out.flush();
out.close();
cs.clear();
cs.fromStream(new ByteArrayInputStream(out.toByteArray()));
@@ -264,6 +263,42 @@
assert expected.isEmpty();
}
+ public void testStreamingAPIReusingStreams() throws IOException, ClassNotFoundException, CacheLoaderException {
+ cs.store(new StoredEntry("k1", "v1", -1, -1));
+ cs.store(new StoredEntry("k2", "v2", -1, -1));
+ cs.store(new StoredEntry("k3", "v3", -1, -1));
+
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ byte[] dummyStartBytes = {1,2,3,4,5,6,7,8};
+ byte[] dummyEndBytes = {8,7,6,5,4,3,2,1};
+ out.write(dummyStartBytes);
+ cs.toStream(out);
+ out.write(dummyEndBytes);
+ out.close();
+ cs.clear();
+
+ // first pop the start bytes
+ byte[] dummy = new byte[8];
+ ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray());
+ int bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i=1; i<9; i++) assert dummy[i - 1] == i : "Start byte stream corrupted!";
+ cs.fromStream(in);
+ bytesRead = in.read(dummy, 0, 8);
+ assert bytesRead == 8;
+ for (int i=8; i>0; i--) assert dummy[8 - i] == i : "Start byte stream corrupted!";
+
+ Set<StoredEntry> set = cs.loadAll();
+
+ assert set.size() == 3;
+ Set expected = new HashSet();
+ expected.add("k1");
+ expected.add("k2");
+ expected.add("k3");
+ for (StoredEntry se : set) assert expected.remove(se.getKey());
+ assert expected.isEmpty();
+ }
+
public void testConfigFile() throws Exception {
Class<? extends CacheLoaderConfig> cfgClass = cs.getConfigurationClass();
CacheLoaderConfig clc = Util.getInstance(cfgClass);
15 years, 10 months
JBoss Cache SVN: r7787 - core/trunk/src/main/java/org/jboss/cache/transaction.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-25 11:42:15 -0500 (Wed, 25 Feb 2009)
New Revision: 7787
Modified:
core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
Log:
Receiving a commit for which there is no prepare is allowed
Modified: core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-25 16:30:09 UTC (rev 7786)
+++ core/trunk/src/main/java/org/jboss/cache/transaction/TransactionLog.java 2009-02-25 16:42:15 UTC (rev 7787)
@@ -21,6 +21,12 @@
*/
package org.jboss.cache.transaction;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.commands.WriteCommand;
+import org.jboss.cache.commands.tx.PrepareCommand;
+import org.jboss.cache.marshall.Marshaller;
+
import java.io.ObjectOutputStream;
import java.util.ArrayList;
import java.util.List;
@@ -30,12 +36,6 @@
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.commands.WriteCommand;
-import org.jboss.cache.commands.tx.PrepareCommand;
-import org.jboss.cache.marshall.Marshaller;
-
/**
* Logs transactions and writes for Non-Blocking State Transfer
*
@@ -79,13 +79,10 @@
public void logCommit(GlobalTransaction gtx)
{
PrepareCommand command = pendingPrepares.remove(gtx);
- if (command == null)
- {
- log.error("Could not find matching prepare for commit: " + gtx);
- return;
- }
-
- addEntry(new LogEntry(gtx, command.getModifications()));
+ // it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
+ // modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
+ // prepare.
+ if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
}
private void addEntry(LogEntry entry)
@@ -114,7 +111,7 @@
{
// Just in case...
if (gtx != null) pendingPrepares.remove(gtx);
- addEntry(new LogEntry(gtx, modifications));
+ if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
}
public void logNoTxWrite(WriteCommand write)
15 years, 10 months
JBoss Cache SVN: r7786 - core/trunk/src/main/java/org/jboss/cache/commands/write.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-25 11:30:09 -0500 (Wed, 25 Feb 2009)
New Revision: 7786
Modified:
core/trunk/src/main/java/org/jboss/cache/commands/write/ClearDataCommand.java
Log:
Reduced log level on clearing a nonexistent node
Modified: core/trunk/src/main/java/org/jboss/cache/commands/write/ClearDataCommand.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/commands/write/ClearDataCommand.java 2009-02-25 15:52:32 UTC (rev 7785)
+++ core/trunk/src/main/java/org/jboss/cache/commands/write/ClearDataCommand.java 2009-02-25 16:30:09 UTC (rev 7786)
@@ -70,7 +70,7 @@
NodeSPI targetNode = peekVersioned(ctx);
if (targetNode == null)
{
- if (log.isInfoEnabled()) log.info("node " + fqn + " not found");
+ if (log.isDebugEnabled()) log.debug("node " + fqn + " not found");
return null;
}
15 years, 10 months
JBoss Cache SVN: r7785 - in core/branches/flat/src: main/java/org/horizon/config and 17 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-02-25 10:52:32 -0500 (Wed, 25 Feb 2009)
New Revision: 7785
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/test/java/org/horizon/statetransfer/
core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Removed:
core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
Modified:
core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
core/branches/flat/src/main/java/org/horizon/config/Configuration.java
core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
core/branches/flat/src/main/java/org/horizon/util/Util.java
core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
core/branches/flat/src/test/resources/configs/named-cache-test.xml
core/branches/flat/src/test/resources/log4j.xml
Log:
Initial state transfer impl
Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -54,6 +54,7 @@
import org.horizon.marshall.Marshaller;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.remoting.RPCManager;
+import org.horizon.statetransfer.StateTransferManager;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -84,6 +85,8 @@
private DataContainer dataContainer;
private static final Log log = LogFactory.getLog(CacheDelegate.class);
private CacheManager cacheManager;
+ // this is never used here but should be injected - this is a hack to make sure the StateTransferManager is properly constructed if needed.
+ private StateTransferManager stateTransferManager;
public CacheDelegate(String name) {
this.name = name;
@@ -101,7 +104,7 @@
BatchContainer batchContainer,
RPCManager rpcManager, DataContainer dataContainer,
Marshaller marshaller,
- CacheManager cacheManager) {
+ CacheManager cacheManager, StateTransferManager stateTransferManager) {
this.invocationContextContainer = invocationContextContainer;
this.commandsFactory = commandsFactory;
this.invoker = interceptorChain;
@@ -115,6 +118,7 @@
this.dataContainer = dataContainer;
this.marshaller = marshaller;
this.cacheManager = cacheManager;
+ this.stateTransferManager = stateTransferManager;
}
@SuppressWarnings("unchecked")
Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -98,6 +98,9 @@
return this == REPL_SYNC || this == INVALIDATION_SYNC || this == LOCAL;
}
+ public boolean isClustered() {
+ return this != LOCAL;
+ }
}
// ------------------------------------------------------------------------------------------------------------
Modified: core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -148,7 +148,6 @@
s.add(TransactionManagerFactory.class);
s.add(ReplicationQueueFactory.class);
s.add(StateTransferManagerFactory.class);
- s.add(StateTransferFactory.class);
s.add(LockManagerFactory.class);
s.add(DataContainerFactory.class);
s.add(EvictionManagerFactory.class);
Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -109,9 +109,13 @@
globalComponents.start();
}
boolean needToNotify = state != ComponentStatus.RUNNING && state != ComponentStatus.INITIALIZING;
+
+ // set this up *before* starting the components since some components - specifically state transfer - needs to be
+ // able to locate this registry via the InboundInvocationHandler
+ globalComponents.registerNamedComponentRegistry(this, cacheName);
+
super.start();
if (needToNotify && state == ComponentStatus.RUNNING) {
- globalComponents.registerNamedComponentRegistry(this, cacheName);
cacheManagerNotifier.notifyCacheStarted(cacheName);
}
}
Deleted: core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.horizon.factories;
-
-/**
- * Factory class able to create {@link StateTransferGenerator} and {@link StateTransferIntegrator} instances.
- * <p/>
- * Updated in 3.0.0 to extend ComponentFactory, etc.
- * <p/>
- *
- * @author <a href="brian.stansberry(a)jboss.com">Brian Stansberry</a>
- * @author Manik Surtani
- * @since 1.0
- */
-// TODO: Implement me
-//@DefaultFactoryFor(classes = {StateTransferGenerator.class, StateTransferIntegrator.class})
-public class StateTransferFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
- public <T> T construct(Class<T> componentType) {
- return null;
-// if (componentType.equals(StateTransferIntegrator.class))
-// {
-// return componentType.cast(new DefaultStateTransferIntegrator());
-// }
-// else
-// {
-// return componentType.cast(new DefaultStateTransferGenerator());
-// }
- }
-}
Modified: core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -22,7 +22,7 @@
package org.horizon.factories;
import org.horizon.factories.annotations.DefaultFactoryFor;
-import org.horizon.statetransfer.DefaultStateTransferManager;
+import org.horizon.statetransfer.StateTransferManagerImpl;
import org.horizon.statetransfer.StateTransferManager;
/**
@@ -32,8 +32,11 @@
* @since 1.0
*/
@DefaultFactoryFor(classes = StateTransferManager.class)
-public class StateTransferManagerFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
+public class StateTransferManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
public <T> T construct(Class<T> componentType) {
- return componentType.cast(new DefaultStateTransferManager());
+ if (configuration.getCacheMode().isClustered() && configuration.isFetchInMemoryState())
+ return componentType.cast(new StateTransferManagerImpl());
+ else
+ return null;
}
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -3,10 +3,15 @@
import org.horizon.commands.RPCCommand;
import org.horizon.factories.scopes.Scope;
import org.horizon.factories.scopes.Scopes;
+import org.horizon.statetransfer.StateTransferException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
/**
* A globally scoped component, that is able to locate named caches and invoke remotely originating calls on the
- * appropriate cache.
+ * appropriate cache. The primary goal of this component is to act as a bridge between the globally scoped {@link org.horizon.remoting.RPCManager}
+ * and named-cache scoped components.
*
* @author Manik Surtani
* @since 1.0
@@ -19,6 +24,27 @@
*
* @param command command to invoke
* @return results, if any, from the invocation
+ * @throws Throwable in the event of problems executing the command
*/
Object handle(RPCCommand command) throws Throwable;
+
+ /**
+ * Applies state onto a named cache. State to be read from the stream. Implementations should NOT close the stream
+ * after use.
+ *
+ * @param cacheName name of cache to apply state
+ * @param i stream to read from
+ * @throws StateTransferException in the event of problems
+ */
+ void applyState(String cacheName, InputStream i) throws StateTransferException;
+
+ /**
+ * Generates state from a named cache. State to be written to the stream. Implementations should NOT close the stream
+ * after use.
+ *
+ * @param cacheName name of cache from which to generate state
+ * @param o stream to write state to
+ * @throws StateTransferException in the event of problems
+ */
+ void generateState(String cacheName, OutputStream o) throws StateTransferException;
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -12,7 +12,12 @@
import org.horizon.invocation.InvocationContextContainer;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.statetransfer.StateTransferException;
+import org.horizon.statetransfer.StateTransferManager;
+import java.io.InputStream;
+import java.io.OutputStream;
+
/**
* Sets the cache interceptor chain on an RPCCommand before calling it to perform
*
@@ -34,9 +39,15 @@
String cacheName = cmd.getCacheName();
ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
if (cr == null) {
- log.info("Cache named {0} does not exist on this cache manager!", cacheName);
+ if (log.isInfoEnabled()) log.info("Cache named {0} does not exist on this cache manager!", cacheName);
return null;
}
+
+ if (!cr.getStatus().allowInvocations()) {
+ if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in a state to handle invocations. Its state is {1}", cacheName, cr.getStatus());
+ return null;
+ }
+
InterceptorChain ic = cr.getComponent(InterceptorChain.class);
InvocationContextContainer icc = cr.getComponent(InvocationContextContainer.class);
CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
@@ -46,4 +57,23 @@
commandsFactory.initializeReplicableCommand(cmd);
return cmd.perform(icc.get());
}
+
+ public void applyState(String cacheName, InputStream i) throws StateTransferException {
+ getStateTransferManager(cacheName).applyState(i);
+ }
+
+ public void generateState(String cacheName, OutputStream o) throws StateTransferException {
+ getStateTransferManager(cacheName).generateState(o);
+ }
+
+ private StateTransferManager getStateTransferManager(String cacheName) throws StateTransferException {
+ ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
+ if (cr == null) {
+ String msg = "Cache named "+cacheName+" does not exist on this cache manager!";
+ log.info(msg);
+ throw new StateTransferException(msg);
+ }
+
+ return cr.getComponent(StateTransferManager.class);
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -27,6 +27,7 @@
import org.horizon.factories.scopes.Scopes;
import org.horizon.lifecycle.Lifecycle;
import org.horizon.remoting.transport.Address;
+import org.horizon.statetransfer.StateTransferException;
import java.util.List;
@@ -111,4 +112,14 @@
* @return a list of members. Typically, this would be defensively copied.
*/
List<Address> getMembers();
+
+ /**
+ * Initiates a state retrieval process from neighbouring caches. This method will block until it either times out,
+ * or state is retrieved and applied.
+ *
+ * @param cacheName name of cache requesting state
+ * @param timeout length of time to try to retrieve state on each peer
+ * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+ */
+ void retrieveState(String cacheName, long timeout) throws StateTransferException;
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -10,10 +10,13 @@
import org.horizon.jmx.annotations.MBean;
import org.horizon.jmx.annotations.ManagedAttribute;
import org.horizon.jmx.annotations.ManagedOperation;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
import java.text.NumberFormat;
import java.util.List;
@@ -34,6 +37,7 @@
private final AtomicLong replicationCount = new AtomicLong(0);
private final AtomicLong replicationFailures = new AtomicLong(0);
boolean statisticsEnabled = false; // by default, don't gather statistics.
+ private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
@Inject
public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
@@ -44,7 +48,7 @@
this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
}
- @Start
+ @Start(priority = 10)
public void start() {
t.start();
}
@@ -82,6 +86,47 @@
return t.getMembers();
}
+ public void retrieveState(String cacheName, long timeout) throws StateTransferException {
+ List<Address> members = getMembers();
+ if (members.size() < 2) {
+ if (log.isDebugEnabled())
+ log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
+ return;
+ }
+
+ boolean success = false;
+ outer:
+ for (int i = 0, wait = 1000; i < 5; i++) {
+ for (Address member : members) {
+ if (!member.equals(getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
+ success = true;
+ break outer;
+ }
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+ }
+ }
+
+ if (!success) {
+ if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
+
+ try {
+ Thread.sleep(wait <<= 2);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ if (!success) throw new StateTransferException("Unable to fetch state on startup");
+ }
+
// -------------------------------------------- JMX information -----------------------------------------------
@ManagedOperation
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -11,6 +11,7 @@
import org.horizon.remoting.InboundInvocationHandler;
import org.horizon.remoting.ResponseFilter;
import org.horizon.remoting.ResponseMode;
+import org.horizon.statetransfer.StateTransferException;
import java.util.List;
import java.util.Properties;
@@ -78,4 +79,16 @@
* @return a list of members. Typically, this would be defensively copied.
*/
List<Address> getMembers();
+
+ /**
+ * Initiates a state retrieval from a specific cache (by typically invoking {@link org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}),
+ * and applies this state to the current cache via the {@link InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+ *
+ * @param cacheName name of cache for which to retrieve state
+ * @param address address of remote cache from which to retrieve state
+ * @param timeout state retrieval timeout in milliseconds
+ * @throws org.horizon.statetransfer.StateTransferException if state cannot be retrieved from the specific cache
+ * @return true if state was transferred and applied successfully, false if it timed out.
+ */
+ boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException;
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -15,24 +15,30 @@
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
import org.horizon.util.FileLookup;
+import org.horizon.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
-import org.jgroups.MessageListener;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
/**
@@ -41,7 +47,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class JGroupsTransport implements Transport, MembershipListener, MessageListener {
+public class JGroupsTransport implements Transport, MembershipListener, ExtendedMessageListener {
public static final String CONFIGURATION_STRING = "configurationString";
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
@@ -61,7 +67,15 @@
Marshaller marshaller;
ExecutorService asyncExecutor;
CacheManagerNotifier notifier;
+ final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
+ /**
+ * Reference to an exception that was raised during state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+
+
// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
// ------------------------------------------------------------------------------------------------------------------
@@ -204,6 +218,27 @@
return members;
}
+ public boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException {
+ boolean cleanup = false;
+ try {
+ StateTransferMonitor mon = new StateTransferMonitor();
+ if (stateTransfersInProgress.putIfAbsent(cacheName, mon) != null)
+ throw new StateTransferException("There already appears to be a state transfer in progress for the cache named " + cacheName);
+
+ cleanup = true;
+ ((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout, false);
+ mon.waitForState();
+ return true;
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ if (log.isInfoEnabled()) log.info("Unable to retrieve state from member " + address, e);
+ return false;
+ } finally {
+ if (cleanup) stateTransfersInProgress.remove(cacheName);
+ }
+ }
+
public Address getAddress() {
if (address == null) {
address = new JGroupsAddress(channel.getLocalAddress());
@@ -325,8 +360,7 @@
}
public void block() {
- // TODO: Do we need these for state transfer?
- // a no-op for now
+ // a no-op
}
public void receive(Message msg) {
@@ -334,17 +368,55 @@
}
public byte[] getState() {
- // TODO: Do we need these for state transfer?
- // a no-op for now
- return null;
+ throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
}
public void setState(byte[] state) {
- // TODO: Do we need these for state transfer?
- // a no-op for now
+ throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
}
+ public byte[] getState(String state_id) {
+ throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported! Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+ }
+ public void setState(String state_id, byte[] state) {
+ throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported! Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+ }
+
+ public void getState(OutputStream ostream) {
+ throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
+ }
+
+ public void getState(String cacheName, OutputStream ostream) {
+ if (trace) log.trace("Received request to generate state for cache {0}. Attempting to generate state.", cacheName);
+ try {
+ inboundInvocationHandler.generateState(cacheName, ostream);
+ } catch (StateTransferException e) {
+ log.error("Caught while responding to state transfer request", e);
+ } finally {
+ Util.closeStream(ostream);
+ }
+ }
+
+ public void setState(InputStream istream) {
+ throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
+ }
+
+ public void setState(String cacheName, InputStream istream) {
+ if (trace) log.trace("Received state for cache {0}. Attempting to apply state.", cacheName);
+ StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+ try {
+ inboundInvocationHandler.applyState(cacheName, istream);
+ mon.notifyStateReceiptSucceeded();
+ } catch (StateTransferException e) {
+ log.error("Failed setting state", e);
+ mon.notifyStateReceiptFailed(e);
+ } finally {
+ Util.closeStream(istream);
+ }
+ }
+
+
// ------------------------------------------------------------------------------------------------------------------
// Helpers to convert between Address types
// ------------------------------------------------------------------------------------------------------------------
@@ -361,6 +433,10 @@
return retval;
}
+ private org.jgroups.Address toJGroupsAddress(Address a) {
+ return ((JGroupsAddress) a).address;
+ }
+
private List<Address> fromJGroupsAddressList(List<org.jgroups.Address> list) {
if (list == null || list.isEmpty()) return Collections.emptyList();
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,55 @@
+package org.horizon.remoting.transport.jgroups;
+
+import org.horizon.statetransfer.StateTransferException;
+
+public class StateTransferMonitor {
+ /**
+ * Reference to an exception that was raised during state installation on this cache.
+ */
+ protected volatile StateTransferException setStateException;
+ private final Object stateLock = new Object();
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+ public boolean isStateSet() {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet) {
+ isStateSet = stateSet;
+ }
+
+ public StateTransferException getSetStateException() {
+ return setStateException;
+ }
+
+ public void waitForState() throws Exception {
+ synchronized (stateLock) {
+ while (!isStateSet) {
+ if (setStateException != null) {
+ throw setStateException;
+ }
+
+ try {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex) {
+ }
+ }
+ }
+ }
+
+ public void notifyStateReceiptSucceeded() {
+ synchronized (stateLock) {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+
+ public void notifyStateReceiptFailed(StateTransferException setStateException) {
+ this.setStateException = setStateException;
+ notifyStateReceiptSucceeded();
+ }
+}
Deleted: core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.horizon.statetransfer;
-
-import org.horizon.factories.annotations.Inject;
-import org.horizon.factories.annotations.Start;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class DefaultStateTransferManager implements StateTransferManager {
-
- @Inject
- public void injectDependencies() {
- }
-
- @Start(priority = 14)
- public void start() {
- }
-
- public void getState(ObjectOutputStream out, Object o, long timeout, boolean force, boolean suppressErrors) throws Exception {
- throw new UnsupportedOperationException("Implement me properly!");
- }
-
- public void setState(ObjectInputStream in, Object o) throws Exception {
- throw new UnsupportedOperationException("fix me!");
- }
-
- // TODO: implement me
- protected void setState()
- {
- }
-}
Added: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,24 @@
+package org.horizon.statetransfer;
+
+/**
+ * An exception to denote problems in transferring state between cache instances in a cluster
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class StateTransferException extends Exception {
+ public StateTransferException() {
+ }
+
+ public StateTransferException(String message) {
+ super(message);
+ }
+
+ public StateTransferException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StateTransferException(Throwable cause) {
+ super(cause);
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -21,12 +21,19 @@
*/
package org.horizon.statetransfer;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Handles generation and application of state on the cache
+ */
+(a)Scope(Scopes.NAMED_CACHE)
public interface StateTransferManager {
- void getState(ObjectOutputStream out, Object o, long timeout, boolean force, boolean suppressErrors) throws Exception;
+ void generateState(OutputStream out) throws StateTransferException;
- void setState(ObjectInputStream in, Object o) throws Exception;
+ void applyState(InputStream in) throws StateTransferException;
}
Copied: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java (from rev 7782, core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.Configuration;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Start;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.RPCManager;
+import org.horizon.util.Util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public class StateTransferManagerImpl implements StateTransferManager {
+
+ RPCManager rpcManager;
+ Cache cache;
+ Configuration configuration;
+ private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+ private static final Delimiter DELIMITER = new Delimiter();
+
+ @Inject
+ public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration configuration) {
+ this.rpcManager = rpcManager;
+ this.cache = cache;
+ this.configuration = configuration;
+ }
+
+ @Start(priority = 14)
+ // it is imperative that this starts *after* the RPCManager does.
+ public void start() throws StateTransferException {
+ long startTime = 0;
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating state transfer process");
+ startTime = System.currentTimeMillis();
+ }
+
+ rpcManager.retrieveState(cache.getName(), configuration.getStateRetrievalTimeout());
+
+ if (log.isDebugEnabled()) {
+ long duration = System.currentTimeMillis() - startTime;
+ log.debug("State transfer process completed in {0}", Util.prettyPrintTime(duration));
+ }
+ }
+
+ public void generateState(OutputStream out) throws StateTransferException {
+ if (log.isDebugEnabled()) log.debug("Generating state");
+
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ delimit(oos);
+ generateInMemoryState(oos);
+ delimit(oos);
+ generatePersistentState(oos);
+ delimit(oos);
+ oos.flush();
+ oos.close();
+ // just close the object stream but do NOT close the underlying stream
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ }
+
+ public void applyState(InputStream in) throws StateTransferException {
+ if (log.isDebugEnabled()) log.debug("Applying state");
+
+ try {
+ ObjectInputStream ois = new ObjectInputStream(in);
+ assertDelimited(ois);
+ applyInMemoryState(ois);
+ assertDelimited(ois);
+ applyPersistentState(ois);
+ assertDelimited(ois);
+ ois.close();
+ // just close the object stream but do NOT close the underlying stream
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ }
+
+ private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void delimit(ObjectOutputStream o) throws IOException {
+ o.writeObject(DELIMITER);
+ }
+
+ private void assertDelimited(ObjectInputStream i) throws StateTransferException {
+ Object o;
+ try {
+ o = i.readObject();
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ if ((o == null) || !(o instanceof Delimiter)) throw new StateTransferException("Expected a delimiter, recieved " + o);
+ }
+
+ // used as a marker for streams.
+ private static final class Delimiter implements Serializable {
+
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -21,7 +21,10 @@
*/
package org.horizon.util;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.Method;
+import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
@@ -126,4 +129,46 @@
return "Added Entries " + addedEntries + " Removeed Entries " + removedEntries + " Modified Entries " + modifiedEntries;
}
}
+
+ /**
+ * Prints a time for display
+ *
+ * @param millis time in millis
+ * @return the time, represented as millis, seconds, minutes or hours as appropriate, with suffix
+ */
+ public static String prettyPrintTime(long millis) {
+ if (millis < 1000) return millis + " milliseconds";
+ NumberFormat nf = NumberFormat.getNumberInstance();
+ nf.setMaximumFractionDigits(2);
+ double toPrint = ((double) millis) / 1000;
+ if (toPrint < 300) {
+ return nf.format(toPrint) + " seconds";
+ }
+
+ toPrint = toPrint / 60;
+
+ if (toPrint < 120) {
+ return nf.format(toPrint) + " minutes";
+ }
+
+ toPrint = toPrint / 60;
+
+ return nf.format(toPrint) + " hours";
+ }
+
+ public static void closeStream(InputStream i) {
+ try {
+ if (i != null) i.close();
+ } catch (Exception e) {
+
+ }
+ }
+
+ public static void closeStream(OutputStream o) {
+ try {
+ if (o != null) o.close();
+ } catch (Exception e) {
+
+ }
+ }
}
Modified: core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -22,6 +22,7 @@
cm1 = TestingUtil.createClusteredCacheManager();
cfg = new Configuration();
cfg.setCacheMode(CacheMode.REPL_SYNC);
+ cfg.setFetchInMemoryState(false);
cm1.defineCache("cache", cfg);
}
Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -31,12 +31,10 @@
TransactionManager tm1;
protected void createCacheManagers() throws Throwable {
- Configuration c = new Configuration();
+ Configuration c = getDefaultConfig();
c.setInvocationBatchingEnabled(true);
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setSyncCommitPhase(true);
- c.setSyncRollbackPhase(true);
createClusteredCaches(2, "replSync", c);
Modified: core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -52,8 +52,7 @@
c = namedCaches.get("syncRepl");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
c = namedCaches.get("asyncRepl");
@@ -61,23 +60,20 @@
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert !c.isUseReplQueue();
assert !c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
c = namedCaches.get("asyncReplQueue");
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert c.isUseReplQueue();
assert c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
c = namedCaches.get("txSyncRepl");
assert c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
c = namedCaches.get("overriding");
@@ -112,8 +108,7 @@
assert c.getTransactionManagerLookupClass() == null;
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
@@ -126,8 +121,7 @@
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert !c.isUseReplQueue();
assert !c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
assert c.getConcurrencyLevel() == 100;
@@ -141,8 +135,7 @@
assert c.getReplQueueInterval() == 1234;
assert c.getReplQueueMaxElements() == 100;
assert c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
assert c.getConcurrencyLevel() == 100;
@@ -151,8 +144,7 @@
c.applyOverrides(namedCaches.get("txSyncRepl"));
assert c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
Modified: core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -45,7 +45,7 @@
cm1 = addClusterEnabledCacheManager();
cm2 = addClusterEnabledCacheManager();
- Configuration conf = new Configuration();
+ Configuration conf = getDefaultConfig();
conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
DummyInMemoryCacheStore.Cfg cfg = new DummyInMemoryCacheStore.Cfg();
cfg.setStore("Store-" + storeCounter.getAndIncrement());
@@ -64,7 +64,7 @@
((DummyInMemoryCacheStore.Cfg) conf.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
cm2.defineCache("pushing", conf);
- conf = new Configuration();
+ conf = getDefaultConfig();
conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
cfg = new DummyInMemoryCacheStore.Cfg();
cfg.setStore("Store-" + storeCounter.getAndIncrement());
Modified: core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -33,6 +33,8 @@
public void testForceSharedComponents() throws NamedCacheNotFoundException {
Configuration defaultCfg = new Configuration();
defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ defaultCfg.setFetchInMemoryState(false);
+ defaultCfg.setFetchInMemoryState(false);
// cache manager with default configuration
cm = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(),
@@ -61,6 +63,7 @@
ec.setAlgorithmConfig(new FIFOAlgorithmConfig());
Configuration defaultCfg = new Configuration();
+ defaultCfg.setFetchInMemoryState(false);
defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
defaultCfg.setEvictionConfig(ec);
// cache manager with default configuration
Modified: core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -29,6 +29,7 @@
cm2 = TestingUtil.createClusteredCacheManager();
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ c.setFetchInMemoryState(false);
cm1.defineCache("cache", c);
cm2.defineCache("cache", c);
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,64 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Address implements Serializable {
+ private static final long serialVersionUID = 5943073369866339615L;
+
+ String street = null;
+ String city = "San Jose";
+ int zip = 0;
+
+ public String getStreet() {
+ return street;
+ }
+
+ public void setStreet(String street) {
+ this.street = street;
+ }
+
+ public String getCity() {
+ return city;
+ }
+
+ public void setCity(String city) {
+ this.city = city;
+ }
+
+ public int getZip() {
+ return zip;
+ }
+
+ public void setZip(int zip) {
+ this.zip = zip;
+ }
+
+ public String toString() {
+ return "street=" + getStreet() + ", city=" + getCity() + ", zip=" + getZip();
+ }
+
+// public Object writeReplace() {
+// return this;
+// }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Address address = (Address) o;
+
+ if (zip != address.zip) return false;
+ if (city != null ? !city.equals(address.city) : address.city != null) return false;
+ if (street != null ? !street.equals(address.street) : address.street != null) return false;
+
+ return true;
+ }
+
+ public int hashCode() {
+ int result;
+ result = (street != null ? street.hashCode() : 0);
+ result = 29 * result + (city != null ? city.hashCode() : 0);
+ result = 29 * result + zip;
+ return result;
+ }
+}
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,56 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Person implements Serializable {
+
+ private static final long serialVersionUID = -885384294556845285L;
+
+ String name = null;
+ Address address;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setName(Object obj) {
+ this.name = (String) obj;
+ }
+
+ public Address getAddress() {
+ return address;
+ }
+
+ public void setAddress(Address address) {
+ this.address = address;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("name=").append(getName()).append(" Address= ").append(address);
+ return sb.toString();
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Person person = (Person) o;
+
+ if (address != null ? !address.equals(person.address) : person.address != null) return false;
+ if (name != null ? !name.equals(person.name) : person.name != null) return false;
+
+ return true;
+ }
+
+ public int hashCode() {
+ int result;
+ result = (name != null ? name.hashCode() : 0);
+ result = 29 * result + (address != null ? address.hashCode() : 0);
+ return result;
+ }
+}
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,287 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.config.Configuration;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.MultipleCacheManagersTest;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+@Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest", enabled = false)
+public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
+
+ protected static final String ADDRESS_CLASSNAME = Address.class.getName();
+ protected static final String PERSON_CLASSNAME = Person.class.getName();
+ public static final String A_B_NAME = "a_b_name";
+ public static final String A_C_NAME = "a_c_name";
+ public static final String A_D_NAME = "a_d_age";
+ public static final String A_B_AGE = "a_b_age";
+ public static final String A_C_AGE = "a_c_age";
+ public static final String A_D_AGE = "a_d_age";
+ public static final String JOE = "JOE";
+ public static final String BOB = "BOB";
+ public static final String JANE = "JANE";
+ public static final Integer TWENTY = 20;
+ public static final Integer FORTY = 40;
+
+ Configuration config;
+ private static final String cacheName = "nbst";
+
+ private volatile int testCount = 0;
+
+ private static final Log log = LogFactory.getLog(StateTransferFunctionalTest.class);
+
+ public StateTransferFunctionalTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ protected void createCacheManagers() throws Throwable {
+ // This impl only really sets up a configuration for use later.
+ config = new Configuration();
+ config.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ config.setSyncCommitPhase(true);
+ config.setSyncReplTimeout(30000);
+ config.setFetchInMemoryState(true);
+ config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ }
+
+ private CacheManager createCacheManager() {
+ CacheManager cm = addClusterEnabledCacheManager();
+ cm.defineCache(cacheName, config);
+ return cm;
+ }
+
+ public static class DelayTransfer implements Serializable {
+ private transient int count;
+
+ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ // RPC is first serialization, ST is second
+ if (count++ == 0)
+ return;
+
+ try {
+ // This sleep is not required for the test to function,
+ // however it improves the possibility of finding errors
+ // (since it keeps the tx log going)
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ private static class WritingRunner implements Runnable {
+ private final Cache<Object, Object> cache;
+ private final boolean tx;
+ private volatile boolean stop;
+ private volatile int result;
+ private TransactionManager tm;
+
+ WritingRunner(Cache<Object, Object> cache, boolean tx) {
+ this.cache = cache;
+ this.tx = tx;
+ if (tx) tm = TestingUtil.getTransactionManager(cache);
+ }
+
+ public int result() {
+ return result;
+ }
+
+ public void run() {
+ int c = 0;
+ while (!stop) {
+ try {
+ if (tx) tm.begin();
+ cache.put("test" + c, c++);
+ if (tx) tm.commit();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ log.error(e);
+ }
+ }
+ result = c;
+ }
+
+ public void stop() {
+ stop = true;
+ }
+ }
+
+ public void testInitialStateTransfer() throws Exception {
+ testCount++;
+ log.info("testInitialStateTransfer start - " + testCount);
+ Cache<Object, Object> cache1, cache2;
+ cache1 = createCacheManager().getCache(cacheName);
+ writeInitialData(cache1);
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ verifyInitialData(cache2);
+ log.info("testInitialStateTransfer end - " + testCount);
+ }
+
+ public void testConcurrentStateTransfer() throws Exception {
+ testCount++;
+ log.info("testConcurrentStateTransfer start - " + testCount);
+ Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 = null;
+ cache1 = createCacheManager().getCache(cacheName);
+ writeInitialData(cache1);
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ cache1.put("delay", new DelayTransfer());
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+ verifyInitialData(cache2);
+
+ final CacheManager cm3 = createCacheManager();
+ final CacheManager cm4 = createCacheManager();
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ cm3.getCache(cacheName);
+ }
+ });
+ t1.start();
+
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ cm4.getCache(cacheName);
+ }
+ });
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ cache3 = cm3.getCache(cacheName);
+ cache4 = cm4.getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+ verifyInitialData(cache3);
+ verifyInitialData(cache4);
+ log.info("testConcurrentStateTransfer end - " + testCount);
+ }
+
+ public void testSTWithThirdWritingNonTxCache() throws Exception {
+ testCount++;
+ log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
+ thirdWritingCacheTest(false, "nbst1");
+ log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
+ }
+
+ public void testSTWithThirdWritingTxCache() throws Exception {
+ testCount++;
+ log.info("testSTWithThirdWritingTxCache start - " + testCount);
+ thirdWritingCacheTest(true, "nbst2");
+ log.info("testSTWithThirdWritingTxCache end - " + testCount);
+ }
+
+ public void testSTWithWritingNonTxThread() throws Exception {
+ testCount++;
+ log.info("testSTWithWritingNonTxThread start - " + testCount);
+ writingThreadTest(false, "nbst3");
+ log.info("testSTWithWritingNonTxThread end - " + testCount);
+ }
+
+ public void testSTWithWritingTxThread() throws Exception {
+ testCount++;
+ log.info("testSTWithWritingTxThread start - " + testCount);
+ writingThreadTest(true, "nbst4");
+ log.info("testSTWithWritingTxThread end - " + testCount);
+ }
+
+ private void thirdWritingCacheTest(boolean tx, String name) throws InterruptedException {
+ Cache<Object, Object> cache1, cache2, cache3;
+ cache1 = createCacheManager().getCache(cacheName);
+ cache3 = createCacheManager().getCache(cacheName);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache3, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assert cache2.get("test" + c).equals(c);
+ }
+
+ private void verifyInitialData(Cache<Object, Object> c) {
+ assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
+ assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
+ assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
+ assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
+ }
+
+ private void writeInitialData(final Cache<Object, Object> c) {
+ c.put(A_B_NAME, JOE);
+ c.put(A_B_AGE, TWENTY);
+ c.put(A_C_NAME, BOB);
+ c.put(A_C_AGE, FORTY);
+ }
+
+ private void writingThreadTest(boolean tx, String name) throws InterruptedException {
+ Cache<Object, Object> cache1 = null, cache2 = null;
+ cache1 = createCacheManager().getCache(cacheName);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache1, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assert cache2.get("test" + c).equals(c);
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-25 15:52:32 UTC (rev 7785)
@@ -178,6 +178,7 @@
Configuration configuration = new Configuration();
configuration.setSyncCommitPhase(true);
configuration.setSyncRollbackPhase(true);
+ configuration.setFetchInMemoryState(false);
return configuration;
}
Modified: core/branches/flat/src/test/resources/configs/named-cache-test.xml
===================================================================
--- core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-25 15:52:32 UTC (rev 7785)
@@ -38,21 +38,21 @@
<namedCache name="syncRepl">
<clustering>
- <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<sync replTimeout="15000"/>
</clustering>
</namedCache>
<namedCache name="asyncRepl">
<clustering>
- <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<async useAsyncSerialization="false"/>
</clustering>
</namedCache>
<namedCache name="asyncReplQueue">
<clustering>
- <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<async useReplQueue="true" replQueueInterval="1234" replQueueMaxElements="100"/>
</clustering>
</namedCache>
@@ -60,7 +60,7 @@
<namedCache name="txSyncRepl">
<transaction/>
<clustering>
- <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<sync replTimeout="15000"/>
</clustering>
</namedCache>
Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/resources/log4j.xml 2009-02-25 15:52:32 UTC (rev 7785)
@@ -45,13 +45,17 @@
<!-- ================ -->
<category name="org.horizon">
- <priority value="INFO"/>
+ <priority value="WARN"/>
</category>
<category name="org.horizon.profiling">
<priority value="WARN"/>
</category>
+ <category name="org.horizon.jmx">
+ <priority value="WARN"/>
+ </category>
+
<category name="org.horizon.factories">
<priority value="WARN"/>
</category>
@@ -62,8 +66,8 @@
<root>
<priority value="WARN"/>
- <!--<appender-ref ref="CONSOLE"/>-->
- <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <!--<appender-ref ref="FILE"/>-->
</root>
</log4j:configuration>
15 years, 10 months