JBoss Cache SVN: r7836 - core/branches/flat/src/test/java/org/horizon/statetransfer.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-04 04:16:37 -0500 (Wed, 04 Mar 2009)
New Revision: 7836
Modified:
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Log:
Disabled NBST test
Modified: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-04 08:55:28 UTC (rev 7835)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-04 09:16:37 UTC (rev 7836)
@@ -16,7 +16,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
-@Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest")
+@Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest", enabled = false)
public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
protected static final String ADDRESS_CLASSNAME = Address.class.getName();
15 years, 10 months
JBoss Cache SVN: r7835 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-04 03:55:28 -0500 (Wed, 04 Mar 2009)
New Revision: 7835
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
Log:
JBCACHE-1487
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-04 08:53:25 UTC (rev 7834)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-04 08:55:28 UTC (rev 7835)
@@ -97,6 +97,12 @@
}
@Start(priority = 12)
+ public void start()
+ {
+ createRootNode();
+ started = true;
+ }
+
public void createRootNode()
{
usingMvcc = config != null && config.getNodeLockingScheme() == NodeLockingScheme.MVCC;
@@ -116,7 +122,6 @@
}
if (usingMvcc && rootInternal == null) setRoot(root); // sets the "internal root"
- started = true;
}
@Stop(priority = 100)
15 years, 10 months
JBoss Cache SVN: r7834 - core/branches/flat/src/test/java/org/horizon/loader.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-04 03:53:25 -0500 (Wed, 04 Mar 2009)
New Revision: 7834
Modified:
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
Log:
added test to ensure stop/start doesn't nuke values
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-04 00:18:45 UTC (rev 7833)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-04 08:53:25 UTC (rev 7834)
@@ -113,6 +113,30 @@
assert !cs.containsKey("k");
}
+ public void testStopStartDoesntNukeValues() throws InterruptedException, CacheLoaderException {
+ assert !cs.containsKey("k1");
+ assert !cs.containsKey("k2");
+
+ long now = System.currentTimeMillis();
+ long lifespan = 1;
+ StoredEntry se1 = new StoredEntry("k1", "v1", now, now + lifespan);
+ StoredEntry se2 = new StoredEntry("k2", "v2");
+
+ cs.store(se1);
+ cs.store(se2);
+ Thread.sleep(100);
+ cs.stop();
+ cs.start();
+ assert se1.isExpired();
+ assert cs.load("k1") == null;
+ assert !cs.containsKey("k1");
+ assert cs.load("k2") != null;
+ assert cs.containsKey("k2");
+ assert cs.load("k2").getValue().equals("v2");
+
+ }
+
+
public void testOnePhaseCommit() throws CacheLoaderException {
List<Modification> mods = new ArrayList<Modification>();
mods.add(new Store(new StoredEntry("k1", "v1", -1, -1)));
15 years, 10 months
JBoss Cache SVN: r7833 - core/trunk/src/main/java/org/jboss/cache.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-03 19:18:45 -0500 (Tue, 03 Mar 2009)
New Revision: 7833
Modified:
core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
Log:
JBCACHE-1487
Modified: core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-03 17:07:33 UTC (rev 7832)
+++ core/trunk/src/main/java/org/jboss/cache/DataContainerImpl.java 2009-03-04 00:18:45 UTC (rev 7833)
@@ -72,11 +72,13 @@
private BuddyFqnTransformer buddyFqnTransformer;
private Configuration config;
private boolean usingMvcc;
+ private volatile boolean started = false;
private static final InternalNode[] NULL_ARRAY = {null, null};
@Inject
public void injectDependencies(NodeFactory nodeFactory, LockManager lockManager, BuddyFqnTransformer transformer, Configuration configuration)
{
+ started = false;
setDependencies(nodeFactory, lockManager);
// We need to create a root node even at this stage since certain components rely on this being available before
@@ -114,11 +116,13 @@
}
if (usingMvcc && rootInternal == null) setRoot(root); // sets the "internal root"
+ started = true;
}
@Stop(priority = 100)
public void stop()
{
+ started = false;
// empty in-memory state
if (root != null)
{
@@ -435,8 +439,15 @@
@ManagedOperation(description = "Returns the number of nodes in the data container")
public int getNumberOfNodes()
{
- if (!usingMvcc) return numNodes(root) - 1;
- return numNodesMvcc(rootInternal) - 1;
+ if (started)
+ {
+ if (!usingMvcc) return numNodes(root) - 1;
+ return numNodesMvcc(rootInternal) - 1;
+ }
+ else
+ {
+ return 0;
+ }
}
private int numNodesMvcc(InternalNode node)
@@ -447,7 +458,7 @@
Set<InternalNode> children = node.getChildren();
for (InternalNode child : children)
{
- count += numNodesMvcc((InternalNode) child);
+ count += numNodesMvcc(child);
}
}
return count;
15 years, 10 months
JBoss Cache SVN: r7832 - core/branches/flat/src/main/java/org/horizon/remoting.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-03 12:07:33 -0500 (Tue, 03 Mar 2009)
New Revision: 7832
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
Log:
Better waits between attempts
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03 17:02:14 UTC (rev 7831)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03 17:07:33 UTC (rev 7832)
@@ -73,6 +73,10 @@
}
public void retrieveState(String cacheName, long timeout) throws StateTransferException {
+ // TODO make these configurable
+ int initialWaitTime = 1000; // millis
+ int waitTimeIncreaseFactor = 2;
+ int numRetries = 3;
List<Address> members = t.getMembers();
if (members.size() < 2) {
if (log.isDebugEnabled())
@@ -85,7 +89,7 @@
try {
outer:
- for (int i = 0, wait = 1000; i < 5; i++) {
+ for (int i = 0, wait = initialWaitTime; i < numRetries; i++) {
for (Address member : members) {
if (!member.equals(t.getAddress())) {
try {
@@ -108,7 +112,7 @@
log.warn("Could not find available peer for state, backing off and retrying");
try {
- Thread.sleep(wait <<= 2);
+ Thread.sleep(wait *= waitTimeIncreaseFactor);
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
15 years, 10 months
JBoss Cache SVN: r7831 - core/branches/flat/src/test/resources.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-03 12:02:14 -0500 (Tue, 03 Mar 2009)
New Revision: 7831
Modified:
core/branches/flat/src/test/resources/log4j.xml
Log:
Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml 2009-03-03 17:01:45 UTC (rev 7830)
+++ core/branches/flat/src/test/resources/log4j.xml 2009-03-03 17:02:14 UTC (rev 7831)
@@ -66,8 +66,8 @@
<root>
<priority value="WARN"/>
- <appender-ref ref="CONSOLE"/>
- <!--<appender-ref ref="FILE"/>-->
+ <!--<appender-ref ref="CONSOLE"/>-->
+ <appender-ref ref="FILE"/>
</root>
</log4j:configuration>
15 years, 10 months
JBoss Cache SVN: r7830 - core/branches/flat/src/test/java/org/horizon/statetransfer.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-03 12:01:45 -0500 (Tue, 03 Mar 2009)
New Revision: 7830
Modified:
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Log:
Modified: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java 2009-03-03 14:56:48 UTC (rev 7829)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java 2009-03-03 17:01:45 UTC (rev 7830)
@@ -10,7 +10,7 @@
import org.horizon.test.TestingUtil;
import org.testng.annotations.Test;
-@Test(groups = "functional", testName = "statetransfer.StateTransferCacheLoaderFunctionalTest", enabled = false)
+@Test(groups = "functional", testName = "statetransfer.StateTransferCacheLoaderFunctionalTest")
public class StateTransferCacheLoaderFunctionalTest extends StateTransferFunctionalTest {
int id;
ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
Modified: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03 14:56:48 UTC (rev 7829)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03 17:01:45 UTC (rev 7830)
@@ -16,7 +16,7 @@
import java.io.ObjectOutputStream;
import java.io.Serializable;
-@Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest", enabled = false)
+@Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest")
public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
protected static final String ADDRESS_CLASSNAME = Address.class.getName();
@@ -216,7 +216,6 @@
log.info("testSTWithWritingNonTxThread end - " + testCount);
}
- @Test(invocationCount = 10)
public void testSTWithWritingTxThread() throws Exception {
testCount++;
log.info("testSTWithWritingTxThread start - " + testCount);
15 years, 10 months
JBoss Cache SVN: r7829 - in core/branches/flat/src: main/java/org/horizon/config and 17 other directories.
by jbosscache-commits@lists.jboss.org
Author: manik.surtani(a)jboss.com
Date: 2009-03-03 09:56:48 -0500 (Tue, 03 Mar 2009)
New Revision: 7829
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
Modified:
core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
core/branches/flat/src/main/java/org/horizon/config/Configuration.java
core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
core/branches/flat/src/main/java/org/horizon/util/Util.java
core/branches/flat/src/main/resources/config-samples/all.xml
core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Log:
More NBST fixes
Modified: core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/commands/tx/PrepareCommand.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -22,7 +22,6 @@
package org.horizon.commands.tx;
import org.horizon.commands.ReplicableCommand;
-import org.horizon.commands.VisitableCommand;
import org.horizon.commands.Visitor;
import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
@@ -53,7 +52,7 @@
this.onePhaseCommit = onePhaseCommit;
}
- public void removeModifications(Collection<VisitableCommand> modificationsToRemove) {
+ public void removeModifications(Collection<WriteCommand> modificationsToRemove) {
if (modifications != null) modifications.removeAll(modificationsToRemove);
}
Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -58,6 +58,10 @@
return useAsyncSerialization;
}
+ public boolean isStateTransferEnabled() {
+ return fetchInMemoryState || (cacheLoaderManagerConfig != null && cacheLoaderManagerConfig.isFetchPersistentState());
+ }
+
/**
* Cache replication mode.
*/
Modified: core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/GlobalConfiguration.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -50,6 +50,7 @@
private short marshallVersion = DEFAULT_MARSHALL_VERSION;
private GlobalComponentRegistry gcr;
+ private long distributedSyncTimeout = 60000; // default
/**
* Behavior of the JVM shutdown hook registered by the cache
@@ -276,6 +277,15 @@
this.marshallVersion = Version.getVersionShort(marshallVersion);
}
+ public long getDistributedSyncTimeout() {
+ return distributedSyncTimeout;
+ }
+
+ public void setDistributedSyncTimeout(long distributedSyncTimeout) {
+ testImmutability("distributedSyncTimeout");
+ this.distributedSyncTimeout = distributedSyncTimeout;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
@@ -312,6 +322,7 @@
return false;
if (transportProperties != null ? !transportProperties.equals(that.transportProperties) : that.transportProperties != null)
return false;
+ if (distributedSyncTimeout != that.distributedSyncTimeout) return false;
return true;
}
@@ -335,6 +346,7 @@
result = 31 * result + (clusterName != null ? clusterName.hashCode() : 0);
result = 31 * result + (shutdownHookBehavior != null ? shutdownHookBehavior.hashCode() : 0);
result = 31 * result + (int) marshallVersion;
+ result = (int) (31 * result + distributedSyncTimeout);
return result;
}
Modified: core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/config/parsing/XmlConfigurationParserImpl.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -276,7 +276,7 @@
throw new ConfigurationException("Unable to configure eviction", e);
}
- if (p != null && !p.isEmpty()) XmlConfigHelper. setValues(cfg, p, false, true);
+ if (p != null && !p.isEmpty()) XmlConfigHelper.setValues(cfg, p, false, true);
evictionConfig.setAlgorithmConfig(cfg);
config.setEvictionConfig(evictionConfig);
@@ -369,6 +369,9 @@
tmp = getAttributeValue(e, "clusterName");
if (existsAttribute(tmp)) gc.setClusterName(tmp);
+ tmp = getAttributeValue(e, "distributedSyncTimeout");
+ if (existsAttribute(tmp)) gc.setDistributedSyncTimeout(getLong(tmp));
+
Properties p = XmlConfigHelper.extractProperties(e);
if (p != null) gc.setTransportProperties(p);
}
Modified: core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/context/TransactionContext.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
*/
package org.horizon.context;
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.transaction.GlobalTransaction;
import javax.transaction.Transaction;
@@ -41,7 +41,7 @@
*
* @param command modification
*/
- void addModification(VisitableCommand command);
+ void addModification(WriteCommand command);
/**
* Returns all modifications. If there are no modifications in this transaction this method will return an empty
@@ -49,7 +49,7 @@
*
* @return list of modifications.
*/
- List<VisitableCommand> getModifications();
+ List<WriteCommand> getModifications();
/**
* Adds a modification to the local modification list.
@@ -57,7 +57,7 @@
* @param command command to add to list. Should not be null.
* @throws NullPointerException if the command to be added is null.
*/
- void addLocalModification(VisitableCommand command);
+ void addLocalModification(WriteCommand command);
/**
* Returns all modifications that have been invoked with the LOCAL cache mode option. These will also be in the
@@ -65,7 +65,7 @@
*
* @return list of LOCAL modifications, or an empty list.
*/
- List<VisitableCommand> getLocalModifications();
+ List<WriteCommand> getLocalModifications();
/**
* Adds the key that has been removed in the scope of the current transaction.
Modified: core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/context/TransactionContextImpl.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -21,7 +21,7 @@
*/
package org.horizon.context;
-import org.horizon.commands.VisitableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.container.MVCCEntry;
import org.horizon.transaction.GlobalTransaction;
import org.horizon.util.FastCopyHashMap;
@@ -50,12 +50,12 @@
/**
* List<VisitableCommand> of modifications. They will be replicated on TX commit
*/
- private List<VisitableCommand> modificationList;
+ private List<WriteCommand> modificationList;
/**
* A list of modifications that have been encountered with a LOCAL mode option. These will be removed from the
* modification list during replication.
*/
- private List<VisitableCommand> localModifications;
+ private List<WriteCommand> localModifications;
/**
* A list of dummy uninitialised entries created by the cache loader interceptor to load data for a given entry in
@@ -97,24 +97,24 @@
lookedUpEntries.putAll(entries);
}
- public void addModification(VisitableCommand command) {
+ public void addModification(WriteCommand command) {
if (command == null) return;
- if (modificationList == null) modificationList = new LinkedList<VisitableCommand>();
+ if (modificationList == null) modificationList = new LinkedList<WriteCommand>();
modificationList.add(command);
}
- public List<VisitableCommand> getModifications() {
+ public List<WriteCommand> getModifications() {
if (modificationList == null) return Collections.emptyList();
return modificationList;
}
- public void addLocalModification(VisitableCommand command) {
+ public void addLocalModification(WriteCommand command) {
if (command == null) throw new NullPointerException("Command is null!");
- if (localModifications == null) localModifications = new LinkedList<VisitableCommand>();
+ if (localModifications == null) localModifications = new LinkedList<WriteCommand>();
localModifications.add(command);
}
- public List<VisitableCommand> getLocalModifications() {
+ public List<WriteCommand> getLocalModifications() {
if (localModifications == null) return Collections.emptyList();
return localModifications;
}
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CacheStoreInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -31,6 +31,7 @@
import org.horizon.commands.write.PutMapCommand;
import org.horizon.commands.write.RemoveCommand;
import org.horizon.commands.write.ReplaceCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.CacheLoaderManagerConfig;
import org.horizon.container.MVCCEntry;
import org.horizon.context.InvocationContext;
@@ -239,14 +240,14 @@
if (transactionContext == null) {
throw new Exception("transactionContext for transaction " + gtx + " not found in transaction table");
}
- List<VisitableCommand> modifications = transactionContext.getModifications();
+ List<WriteCommand> modifications = transactionContext.getModifications();
if (modifications.size() == 0) {
log.trace("Transaction has not logged any modifications!");
return;
}
log.trace("Cache loader modification list: {0}", modifications);
StoreModificationsBuilder modsBuilder = new StoreModificationsBuilder(getStatisticsEnabled());
- for (VisitableCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx, modsBuilder);
+ for (WriteCommand cacheCommand : modifications) cacheCommand.acceptVisitor(ctx, modsBuilder);
int numMods = modsBuilder.modifications.size();
log.trace("Converted method calls to cache loader modifications. List size: {0}", numMods);
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/CallInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -30,6 +30,7 @@
import org.horizon.commands.write.ClearCommand;
import org.horizon.commands.write.PutKeyValueCommand;
import org.horizon.commands.write.RemoveCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.interceptors.base.CommandInterceptor;
import org.horizon.transaction.GlobalTransaction;
@@ -99,7 +100,7 @@
return handleAlterCacheMethod(ctx, command);
}
- private Object handleAlterCacheMethod(InvocationContext ctx, VisitableCommand command)
+ private Object handleAlterCacheMethod(InvocationContext ctx, WriteCommand command)
throws Throwable {
Object result = invokeCommand(ctx, command);
if (ctx.isValidTransaction()) {
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvalidationInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -217,7 +217,8 @@
// increment invalidations counter if statistics maintained
incrementInvalidations();
InvalidateCommand command = commandsFactory.buildInvalidateCommand(keys);
- if (log.isDebugEnabled()) log.debug("Cache [" + rpcManager.getAddress() + "] replicating " + command);
+ if (log.isDebugEnabled())
+ log.debug("Cache [" + rpcManager.getTransport().getAddress() + "] replicating " + command);
// voila, invalidated!
replicateCall(ctx, command, synchronous);
}
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/InvocationContextInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -195,7 +195,7 @@
* @return true if the gtx is remote, false if it originated locally.
*/
private boolean isRemoteGlobalTx(GlobalTransaction gtx) {
- return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getAddress()));
+ return gtx != null && (gtx.getAddress() != null) && (!gtx.getAddress().equals(rpcManager.getTransport().getAddress()));
}
private void copyInvocationScopeOptionsToTxScope(InvocationContext ctx) {
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/ReplicationInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -112,7 +112,7 @@
if (command.isSuccessful()) {
if (ctx.getTransaction() == null && ctx.isOriginLocal()) {
if (trace) {
- log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getMembers() + ", mode=" +
+ log.trace("invoking method " + command.getClass().getSimpleName() + ", members=" + rpcManager.getTransport().getMembers() + ", mode=" +
configuration.getCacheMode() + ", exclude_self=" + true + ", timeout=" +
configuration.getSyncReplTimeout());
}
@@ -138,7 +138,7 @@
protected void runPreparePhase(PrepareCommand prepareMethod, GlobalTransaction gtx, InvocationContext ctx) throws Throwable {
boolean async = configuration.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
if (trace) {
- log.trace("(" + rpcManager.getAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
+ log.trace("(" + rpcManager.getTransport().getAddress() + "): running remote prepare for global tx " + gtx + " with async mode=" + async);
}
// this method will return immediately if we're the only member (because exclude_self=true)
Modified: core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/interceptors/TxInterceptor.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -28,6 +28,7 @@
import org.horizon.commands.tx.CommitCommand;
import org.horizon.commands.tx.PrepareCommand;
import org.horizon.commands.tx.RollbackCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.context.InvocationContext;
import org.horizon.context.TransactionContext;
import org.horizon.factories.ComponentRegistry;
@@ -42,6 +43,7 @@
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.remoting.ReplicationException;
import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
import org.horizon.transaction.TransactionTable;
import org.horizon.util.concurrent.ConcurrentHashSet;
@@ -74,6 +76,7 @@
private ComponentRegistry componentRegistry;
private ContextFactory contextFactory;
private CacheManager cacheManager;
+ private TransactionLog transactionLog;
/**
* List <Transaction>that we have registered for
@@ -89,7 +92,8 @@
@Inject
public void intialize(CacheManager cacheManager, ContextFactory contextFactory,
CacheNotifier notifier, InvocationContextContainer icc,
- CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager) {
+ CommandsFactory factory, ComponentRegistry componentRegistry, LockManager lockManager,
+ TransactionLog transactionLog) {
this.contextFactory = contextFactory;
this.commandsFactory = factory;
this.cacheManager = cacheManager;
@@ -97,6 +101,7 @@
this.invocationContextContainer = icc;
this.componentRegistry = componentRegistry;
this.lockManager = lockManager;
+ this.transactionLog = transactionLog;
setStatisticsEnabled(configuration.isExposeManagementStatistics());
}
@@ -226,7 +231,11 @@
@Override
public Object handleDefault(InvocationContext ctx, VisitableCommand command) throws Throwable {
try {
- return attachGtxAndPassUpChain(ctx, command);
+ Object retval = attachGtxAndPassUpChain(ctx, command);
+ // log non-transactional modification
+ if (command instanceof WriteCommand && ctx.getTransaction() == null)
+ transactionLog.logNoTxWrite((WriteCommand) command);
+ return retval;
}
catch (Throwable throwable) {
throwIfNeeded(ctx, throwable);
@@ -326,7 +335,9 @@
if (trace)
log.trace("Using one-phase prepare. Not propagating the prepare call up the stack until called to do so by the sync handler.");
} else {
- // now pass up the prepare method itself.
+ // first log the transaction...
+ transactionLog.logPrepare(command);
+ // then pass up the prepare method itself.
invokeNextInterceptor(ctx, command);
}
// JBCACHE-361 Confirm that the transaction is ACTIVE
@@ -439,20 +450,24 @@
// Transaction phase runners
// --------------------------------------------------------------
- protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List modifications, boolean onePhaseCommit) {
+ protected PrepareCommand buildPrepareCommand(GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhaseCommit) {
return commandsFactory.buildPrepareCommand(gtx, modifications, cacheManager.getAddress(), onePhaseCommit);
}
/**
* creates a commit()
*/
- protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List modifications, boolean onePhaseCommit) {
+ protected void runCommitPhase(InvocationContext ctx, GlobalTransaction gtx, List<WriteCommand> modifications, boolean onePhaseCommit) {
try {
VisitableCommand commitCommand = onePhaseCommit ? buildPrepareCommand(gtx, modifications, true) : commandsFactory.buildCommitCommand(gtx);
if (trace) log.trace("Running commit for " + gtx);
handleCommitRollback(ctx, commitCommand);
+ if (onePhaseCommit)
+ transactionLog.logOnePhaseCommit(gtx, modifications);
+ else
+ transactionLog.logCommit(gtx);
}
catch (Throwable e) {
log.warn("Commit failed. Clearing stale locks.");
@@ -487,6 +502,7 @@
// JBCACHE-457
VisitableCommand rollbackCommand = commandsFactory.buildRollbackCommand(gtx);
if (trace) log.trace(" running rollback for {0}", gtx);
+ transactionLog.rollback(gtx);
//JBCACHE-359 Store a lookup for the globalTransaction so a listener
// callback can find it
@@ -515,10 +531,12 @@
* Handles a local prepare - invoked by the sync handler. Tests if the current tx matches the gtx passed in to the
* method call and passes the prepare() call up the chain.
*/
- public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<VisitableCommand> modifications) throws Throwable {
+ public Object runPreparePhase(InvocationContext ctx, GlobalTransaction gtx, List<WriteCommand> modifications) throws Throwable {
// running a 2-phase commit.
- VisitableCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ PrepareCommand prepareCommand = buildPrepareCommand(gtx, modifications, false);
+ transactionLog.logPrepare(prepareCommand);
+
Object result;
// Is there a local transaction associated with GTX ?
@@ -624,7 +642,7 @@
private class RemoteSynchronizationHandler implements Synchronization {
Transaction tx = null;
GlobalTransaction gtx = null;
- List<VisitableCommand> modifications = null;
+ List<WriteCommand> modifications = null;
TransactionContext transactionContext = null;
protected InvocationContext ctx; // the context for this call.
Modified: core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/manager/DefaultCacheManager.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -307,17 +307,17 @@
public List<Address> getMembers() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager == null ? null : rpcManager.getMembers();
+ return rpcManager == null ? null : rpcManager.getTransport().getMembers();
}
public Address getAddress() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager == null ? null : rpcManager.getAddress();
+ return rpcManager == null ? null : rpcManager.getTransport().getAddress();
}
public boolean isCoordinator() {
RPCManager rpcManager = globalComponentRegistry.getComponent(RPCManager.class);
- return rpcManager != null && rpcManager.isCoordinator();
+ return rpcManager != null && rpcManager.getTransport().isCoordinator();
}
private Cache createCache(String cacheName) {
Modified: core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/marshall/HorizonMarshaller.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -25,6 +25,7 @@
import org.horizon.atomic.DeltaAware;
import org.horizon.commands.RemoteCommandFactory;
import org.horizon.commands.ReplicableCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.io.ByteBuffer;
import org.horizon.io.ExposedByteArrayOutputStream;
import org.horizon.logging.Log;
@@ -32,6 +33,7 @@
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.jgroups.JGroupsAddress;
import org.horizon.transaction.GlobalTransaction;
+import org.horizon.transaction.TransactionLog;
import org.horizon.util.FastCopyHashMap;
import org.horizon.util.Immutables;
import org.jboss.util.NotImplementedException;
@@ -79,6 +81,7 @@
protected static final int MAGICNUMBER_OBJECT = 22;
protected static final int MAGICNUMBER_SINGLETON_LIST = 23;
protected static final int MAGICNUMBER_COMMAND = 24;
+ protected static final int MAGICNUMBER_TRANSACTION_LOG = 25;
protected static final int MAGICNUMBER_NULL = 99;
protected static final int MAGICNUMBER_SERIALIZABLE = 100;
protected static final int MAGICNUMBER_REF = 101;
@@ -189,6 +192,11 @@
out.writeByte(MAGICNUMBER_STRING);
if (useRefs) writeReference(out, createReference(o, refMap));
marshallString((String) o, out);
+ } else if (o instanceof TransactionLog.LogEntry) {
+ out.writeByte(MAGICNUMBER_TRANSACTION_LOG);
+ TransactionLog.LogEntry le = (TransactionLog.LogEntry) o;
+ marshallObject(le.getTransaction(), out, refMap);
+ marshallObject(le.getModifications(), out, refMap);
} else if (o instanceof Serializable) {
if (trace) {
log.trace("Warning: using object serialization for " + o.getClass());
@@ -305,6 +313,9 @@
case MAGICNUMBER_JG_ADDRESS:
retVal = unmarshallJGroupsAddress(in);
return retVal;
+ case MAGICNUMBER_TRANSACTION_LOG:
+ retVal = new TransactionLog.LogEntry((GlobalTransaction) unmarshallObject(in, refMap), (List<WriteCommand>) unmarshallObject(in, refMap));
+ return retVal;
case MAGICNUMBER_ARRAY:
return unmarshallArray(in, refMap);
case MAGICNUMBER_ARRAY_LIST:
Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -44,8 +44,7 @@
}
if (!cr.getStatus().allowInvocations()) {
- if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in a state to handle invocations. Its state is {1}", cacheName, cr.getStatus());
- return null;
+ throw new IllegalStateException("Cache named " + cacheName + " exists but isn't in a state to handle invocations. Its state is " + cr.getStatus());
}
InterceptorChain ic = cr.getComponent(InterceptorChain.class);
@@ -69,7 +68,7 @@
private StateTransferManager getStateTransferManager(String cacheName) throws StateTransferException {
ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
if (cr == null) {
- String msg = "Cache named "+cacheName+" does not exist on this cache manager!";
+ String msg = "Cache named " + cacheName + " does not exist on this cache manager!";
log.info(msg);
throw new StateTransferException(msg);
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
import org.horizon.factories.scopes.Scopes;
import org.horizon.lifecycle.Lifecycle;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import org.horizon.statetransfer.StateTransferException;
import java.util.List;
@@ -48,78 +49,76 @@
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
- * entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
- * implementations.
- * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
+ * implementations.
+ * @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the
- * entire cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
- * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
- * implementations.
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
+ * implementations.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception;
/**
* Invokes an RPC call on other caches in the cluster.
*
- * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to the entire
- * cluster.
- * @param rpcCommand the cache command to invoke
- * @param mode the response mode to use
- * @param timeout a timeout after which to throw a replication exception.
+ * @param recipients a list of Addresses to invoke the call on. If this is null, the call is broadcast to
+ * the entire cluster.
+ * @param rpcCommand the cache command to invoke
+ * @param mode the response mode to use
+ * @param timeout a timeout after which to throw a replication exception.
+ * @param stateTransferEnabled if true, additional replaying is considered if messages need to be re-sent during the
+ * course of a state transfer
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception;
/**
- * @return true if the current Channel is the coordinator of the cluster.
- */
- boolean isCoordinator();
-
- /**
- * @return the Address of the current coordinator.
- */
- Address getCoordinator();
-
- /**
- * Retrieves the current cache instance's network address
+ * Initiates a state retrieval process from neighbouring caches. This method will block until it either times out,
+ * or state is retrieved and applied.
*
- * @return an Address
+ * @param cacheName name of cache requesting state
+ * @param timeout length of time to try to retrieve state on each peer
+ * @throws org.horizon.statetransfer.StateTransferException
+ * in the event of problems
*/
- Address getAddress();
+ void retrieveState(String cacheName, long timeout) throws StateTransferException;
/**
- * Returns a list of members in the current cluster view.
- *
- * @return a list of members. Typically, this would be defensively copied.
+ * @return a reference to the underlying transport.
*/
- List<Address> getMembers();
+ Transport getTransport();
/**
- * Initiates a state retrieval process from neighbouring caches. This method will block until it either times out,
- * or state is retrieved and applied.
+ * If {@link #retrieveState(String, long)} has been invoked and hasn't yet returned (i.e., a state transfer is in
+ * progress), this method will return the current Address from which a state transfer is being attempted. Otherwise,
+ * this method returns a null.
*
- * @param cacheName name of cache requesting state
- * @param timeout length of time to try to retrieve state on each peer
- * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+ * @return the current Address from which a state transfer is being attempted, if a state transfer is in progress, or
+ * a null otherwise.
*/
- void retrieveState(String cacheName, long timeout) throws StateTransferException;
+ Address getCurrentStateTransferSource();
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -38,6 +38,7 @@
private final AtomicLong replicationFailures = new AtomicLong(0);
boolean statisticsEnabled = false; // by default, don't gather statistics.
private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
+ private volatile Address currentStateTransferSource;
@Inject
public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
@@ -45,7 +46,8 @@
@ComponentName(KnownComponentNames.ASYNC_SERIALIZATION_EXECUTOR) ExecutorService e,
CacheManagerNotifier notifier) {
this.t = t;
- this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
+ this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler,
+ notifier, globalConfiguration.getDistributedSyncTimeout());
}
@Start(priority = 10)
@@ -58,36 +60,20 @@
t.stop();
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean stateTransferEnabled) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, responseFilter, stateTransferEnabled);
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, boolean stateTransferEnabled) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, usePriorityQueue, null, stateTransferEnabled);
}
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout) throws Exception {
- return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null);
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean stateTransferEnabled) throws Exception {
+ return t.invokeRemotely(recipients, rpcCommand, mode, timeout, false, null, stateTransferEnabled);
}
- public boolean isCoordinator() {
- return t.isCoordinator();
- }
-
- public Address getCoordinator() {
- return t.getCoordinator();
- }
-
- public Address getAddress() {
- return t.getAddress();
- }
-
- public List<Address> getMembers() {
- return t.getMembers();
- }
-
public void retrieveState(String cacheName, long timeout) throws StateTransferException {
- List<Address> members = getMembers();
+ List<Address> members = t.getMembers();
if (members.size() < 2) {
if (log.isDebugEnabled())
log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
@@ -95,38 +81,56 @@
}
boolean success = false;
- outer:
- for (int i = 0, wait = 1000; i < 5; i++) {
- for (Address member : members) {
- if (!member.equals(getAddress())) {
- try {
- if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
- if (t.retrieveState(cacheName, member, timeout)) {
- if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
- success = true;
- break outer;
+
+ try {
+
+ outer:
+ for (int i = 0, wait = 1000; i < 5; i++) {
+ for (Address member : members) {
+ if (!member.equals(t.getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+ currentStateTransferSource = member;
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
+ success = true;
+ break outer;
+ }
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+ } finally {
+ currentStateTransferSource = null;
}
- } catch (StateTransferException e) {
- if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
}
- }
- if (!success) {
- if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
+ if (!success) {
+ if (log.isWarnEnabled())
+ log.warn("Could not find available peer for state, backing off and retrying");
- try {
- Thread.sleep(wait <<= 2);
+ try {
+ Thread.sleep(wait <<= 2);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
}
- catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
}
}
+ } finally {
+ currentStateTransferSource = null;
}
if (!success) throw new StateTransferException("Unable to fetch state on startup");
}
+ public Transport getTransport() {
+ return t;
+ }
+
+ public Address getCurrentStateTransferSource() {
+ return currentStateTransferSource;
+ }
+
// -------------------------------------------- JMX information -----------------------------------------------
@ManagedOperation
Modified: core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/ReplicationQueue.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -67,6 +67,7 @@
private Configuration configuration;
private boolean enabled;
private CommandsFactory commandsFactory;
+ private boolean stateTransferEnabled;
public boolean isEnabled() {
return enabled;
@@ -86,6 +87,7 @@
*/
@Start
public synchronized void start() {
+ stateTransferEnabled = configuration.isStateTransferEnabled();
long interval = configuration.getReplQueueInterval();
log.trace("Starting replication queue, with interval {0} and maxElements {1}", interval, maxElements);
this.maxElements = configuration.getReplQueueMaxElements();
@@ -142,7 +144,7 @@
log.trace("Flushing {0} elements", toReplicateSize);
ReplicateCommand replicateCommand = commandsFactory.buildReplicateCommand(toReplicate);
// send to all live caches in the cluster
- rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout());
+ rpcManager.invokeRemotely(null, replicateCommand, ResponseMode.ASYNCHRONOUS, configuration.getSyncReplTimeout(), stateTransferEnabled);
}
catch (Throwable t) {
log.error("failed replicating " + toReplicate.size() + " elements in replication queue", t);
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/DistributedSync.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,68 @@
+package org.horizon.remoting.transport;
+
+import net.jcip.annotations.ThreadSafe;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+/**
+ * This is an abstraction of a cluster-wide synchronization. Its purpose is to maintain a set of locks that are aware
+ * of block and unblock commands issued across a cluster. In addition to these block and unblock phases, sub-phases
+ * such as a start processing
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+@ThreadSafe
+public interface DistributedSync {
+
+ /**
+ * @return the number of syncs that have occured
+ */
+ int getSyncCount();
+
+ /**
+ * Blocks until the start of a sync - either by the current RPCManager instance or a remote one - is received. This
+ * should return immediately if sync is already in progress.
+ *
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for the sync times out.
+ */
+ void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ /**
+ * Blocks until an ongoing sync ends. This is returns immediately if there is no ongoing sync.
+ *
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for an ongoing sync to end times out.
+ */
+ void blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ /**
+ * Acquires the sync. This could be from a local or remote source.
+ */
+ void acquireSync();
+
+ /**
+ * Releases the sync. This could be from a local or remote source.
+ */
+ void releaseSync();
+
+ /**
+ * Acquires a processing lock. This is typically acquired after the sync is acquired, and is meant for local (not
+ * remote) use.
+ *
+ * @param exclusive whether the lock is exclusive
+ * @param timeout timeout after which to give up
+ * @param timeUnit time unit
+ * @throws TimeoutException if waiting for the lock times out
+ */
+ void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException;
+
+ /**
+ * Releases any processing locks that may be held by the current thread.
+ */
+ void releaseProcessingLock();
+}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -31,14 +31,16 @@
/**
* Initializes the transport with global cache configuration and transport-specific properties.
*
- * @param c global cache-wide configuration
- * @param p properties to set
- * @param marshaller marshaller to use for marshalling and unmarshalling
- * @param asyncExecutor executor to use for asynchronous calls
- * @param handler handler for invoking remotely originating calls on the local cache
+ * @param c global cache-wide configuration
+ * @param p properties to set
+ * @param marshaller marshaller to use for marshalling and unmarshalling
+ * @param asyncExecutor executor to use for asynchronous calls
+ * @param handler handler for invoking remotely originating calls on the local cache
+ * @param notifier notifier to use
+ * @param distributedSyncTimeout timeout to wait for distributed syncs
*/
void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
- InboundInvocationHandler handler, CacheManagerNotifier notifier);
+ InboundInvocationHandler handler, CacheManagerNotifier notifier, long distributedSyncTimeout);
/**
* Invokes an RPC call on other caches in the cluster.
@@ -51,10 +53,11 @@
* @param usePriorityQueue if true, a priority queue is used to deliver messages. May not be supported by all
* implementations.
* @param responseFilter a response filter with which to filter out failed/unwanted/invalid responses.
+ * @param supportReplay whether replays of missed messages is supported
* @return a list of responses from each member contacted.
* @throws Exception in the event of problems.
*/
- List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter) throws Exception;
+ List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay) throws Exception;
/**
* @return true if the current Channel is the coordinator of the cluster.
@@ -81,14 +84,34 @@
List<Address> getMembers();
/**
- * Initiates a state retrieval from a specific cache (by typically invoking {@link org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}),
- * and applies this state to the current cache via the {@link InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+ * Initiates a state retrieval from a specific cache (by typically invoking {@link
+ * org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}), and applies this
+ * state to the current cache via the {@link InboundInvocationHandler#applyState(String, java.io.InputStream)}
+ * callback.
*
* @param cacheName name of cache for which to retrieve state
- * @param address address of remote cache from which to retrieve state
- * @param timeout state retrieval timeout in milliseconds
- * @throws org.horizon.statetransfer.StateTransferException if state cannot be retrieved from the specific cache
+ * @param address address of remote cache from which to retrieve state
+ * @param timeout state retrieval timeout in milliseconds
* @return true if state was transferred and applied successfully, false if it timed out.
+ * @throws org.horizon.statetransfer.StateTransferException
+ * if state cannot be retrieved from the specific cache
*/
boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException;
+
+ /**
+ * @return an instance of a DistributedSync that can be used to wait for synchronization events across a cluster.
+ */
+ DistributedSync getDistributedSync();
+
+ /**
+ * Blocks all RPC calls to and between a set of Addresses. If a null is passed in, the entire cluster is blocked.
+ *
+ * @param addresses addresses to block
+ */
+ void blockRPC(Address... addresses);
+
+ /**
+ * Releases a block performed by calling {@link #blockRPC(Address[])}
+ */
+ void unblockRPC();
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/CommandAwareRpcDispatcher.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -27,6 +27,7 @@
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.remoting.InboundInvocationHandler;
+import org.horizon.remoting.transport.DistributedSync;
import org.jgroups.Address;
import org.jgroups.Channel;
import org.jgroups.Message;
@@ -38,10 +39,12 @@
import org.jgroups.util.RspList;
import java.io.NotSerializableException;
+import java.util.Map;
import java.util.Vector;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* A JGroups RPC dispatcher that knows how to deal with {@link ReplicableCommand}s.
@@ -53,7 +56,10 @@
protected boolean trace;
ExecutorService asyncExecutor;
InboundInvocationHandler inboundInvocationHandler;
+ DistributedSync distributedSync;
+ long distributedSyncTimeout;
private Log log = LogFactory.getLog(CommandAwareRpcDispatcher.class);
+ private static final RequestIgnoredResponse REQUEST_IGNORED_RESPONSE = new RequestIgnoredResponse();
public CommandAwareRpcDispatcher() {
}
@@ -61,12 +67,14 @@
public CommandAwareRpcDispatcher(Channel channel,
JGroupsTransport transport,
ExecutorService asyncExecutor,
- InboundInvocationHandler inboundInvocationHandler) {
+ InboundInvocationHandler inboundInvocationHandler,
+ DistributedSync distributedSync, long distributedSyncTimeout) {
super(channel, transport, transport, transport);
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
-
+ this.distributedSync = distributedSync;
trace = log.isTraceEnabled();
+ this.distributedSyncTimeout = distributedSyncTimeout;
}
protected boolean isValid(Message req) {
@@ -83,9 +91,9 @@
* org.jgroups.blocks.RspFilter)} except that this version is aware of {@link ReplicableCommand} objects.
*/
public RspList invokeRemoteCommands(Vector<Address> dests, ReplicableCommand command, int mode, long timeout,
- boolean anycasting, boolean oob, RspFilter filter)
+ boolean anycasting, boolean oob, RspFilter filter, boolean supportReplay)
throws NotSerializableException, ExecutionException, InterruptedException {
- ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter);
+ ReplicationTask task = new ReplicationTask(command, oob, dests, mode, timeout, anycasting, filter, supportReplay);
if (mode == GroupRequest.GET_NONE) {
asyncExecutor.submit(task);
@@ -131,8 +139,40 @@
protected Object executeCommand(RPCCommand cmd, Message req) throws Throwable {
if (cmd == null) throw new NullPointerException("Unable to execute a null command! Message was " + req);
- if (trace) log.trace("Executing command: {0} [sender={1}]", cmd, req.getSrc());
- return inboundInvocationHandler.handle(cmd);
+ if (trace) log.trace("Attempting to execute command: {0} [sender={1}]", cmd, req.getSrc());
+
+ boolean unlock = false;
+ try {
+
+ int flushCount = distributedSync.getSyncCount();
+ distributedSync.acquireProcessingLock(false, distributedSyncTimeout, MILLISECONDS);
+ unlock = true;
+
+ distributedSync.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
+
+ // If this thread blocked during a NBST flush, then inform the sender
+ // it needs to replay ignored messages
+ boolean replayIgnored = distributedSync.getSyncCount() != flushCount;
+
+ Object retval;
+ try {
+ retval = inboundInvocationHandler.handle(cmd);
+ } catch (IllegalStateException ise) {
+ if (trace) log.trace("Unable to execute command, cache not in a receptive state");
+ // cache not in a started state, request replay
+ return REQUEST_IGNORED_RESPONSE;
+ }
+
+ if (replayIgnored) {
+ ExtendedResponse extended = new ExtendedResponse(retval);
+ extended.setReplayIgnoredRequests(true);
+ retval = extended;
+ }
+ return retval;
+
+ } finally {
+ if (unlock) distributedSync.releaseProcessingLock();
+ }
}
@Override
@@ -149,10 +189,11 @@
private long timeout;
private boolean anycasting;
private RspFilter filter;
+ boolean supportReplay = false;
private ReplicationTask(ReplicableCommand command, boolean oob, Vector<Address> dests,
int mode, long timeout,
- boolean anycasting, RspFilter filter) {
+ boolean anycasting, RspFilter filter, boolean supportReplay) {
this.command = command;
this.oob = oob;
this.dests = dests;
@@ -160,6 +201,7 @@
this.timeout = timeout;
this.anycasting = anycasting;
this.filter = filter;
+ this.supportReplay = supportReplay;
}
public RspList call() throws Exception {
@@ -174,6 +216,8 @@
Message msg = new Message();
msg.setBuffer(buf);
if (oob) msg.setFlag(Message.OOB);
+ // Replay capability requires responses from all members!
+ int mode = supportReplay ? GroupRequest.GET_ALL : this.mode;
RspList retval = castMessage(dests, msg, mode, timeout, anycasting, filter);
if (trace) log.trace("responses: {0}", retval);
@@ -183,6 +227,30 @@
if (retval == null)
throw new NotSerializableException("RpcDispatcher returned a null. This is most often caused by args for "
+ command.getClass().getSimpleName() + " not being serializable.");
+
+ if (supportReplay) {
+ boolean replay = false;
+ Vector<Address> ignorers = new Vector<Address>();
+ for (Map.Entry<Address, Rsp> entry : retval.entrySet()) {
+ Object value = entry.getValue().getValue();
+ if (value instanceof RequestIgnoredResponse) {
+ ignorers.add(entry.getKey());
+ } else if (value instanceof ExtendedResponse) {
+ ExtendedResponse extended = (ExtendedResponse) value;
+ replay |= extended.isReplayIgnoredRequests();
+ entry.getValue().setValue(extended.getResponse());
+ }
+ }
+
+ if (replay && ignorers.size() > 0) {
+ if (trace)
+ log.trace("Replaying message to ignoring senders: " + ignorers);
+ RspList responses = castMessage(ignorers, msg, GroupRequest.GET_ALL, timeout, anycasting, filter);
+ if (responses != null)
+ retval.putAll(responses);
+ }
+ }
+
return retval;
}
}
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/ExtendedResponse.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,50 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * A response with extended information
+ *
+ * @author Jason T. Greene
+ */
+public class ExtendedResponse implements Serializable {
+ private boolean replayIgnoredRequests;
+ private final Object response;
+
+ public ExtendedResponse(Object response) {
+ this.response = response;
+ }
+
+ public boolean isReplayIgnoredRequests() {
+ return replayIgnoredRequests;
+ }
+
+ public void setReplayIgnoredRequests(boolean replayIgnoredRequests) {
+ this.replayIgnoredRequests = replayIgnoredRequests;
+ }
+
+ public Object getResponse() {
+ return response;
+ }
+}
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/FlushBasedDistributedSync.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,98 @@
+package org.horizon.remoting.transport.jgroups;
+
+import net.jcip.annotations.ThreadSafe;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.util.Util;
+import org.horizon.util.concurrent.ReclosableLatch;
+
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+/**
+ * A DistributedSync based on JGroups' FLUSH protocol
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+@ThreadSafe
+public class FlushBasedDistributedSync implements DistributedSync {
+
+ private final ReentrantReadWriteLock processingLock = new ReentrantReadWriteLock();
+ private final ReclosableLatch flushBlockGate = new ReclosableLatch();
+ private final AtomicInteger flushCompletionCount = new AtomicInteger();
+ private final ReclosableLatch flushWaitGate = new ReclosableLatch(false);
+ private static final Log log = LogFactory.getLog(FlushBasedDistributedSync.class);
+
+ public int getSyncCount() {
+ return flushCompletionCount.get();
+ }
+
+ public void blockUntilAcquired(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ while (true) {
+ try {
+ if (!flushWaitGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync to be acquired. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void blockUntilReleased(long timeout, TimeUnit timeUnit) throws TimeoutException {
+ while (true) {
+ try {
+ if (!flushBlockGate.await(timeout, timeUnit))
+ throw new TimeoutException("Timed out waiting for a cluster-wide sync to be released. (timeout = " + Util.prettyPrintTime(timeout) + ")");
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void acquireSync() {
+ flushBlockGate.close();
+ flushWaitGate.open();
+ }
+
+ public void releaseSync() {
+ flushWaitGate.close();
+ flushCompletionCount.incrementAndGet();
+ flushBlockGate.open();
+ }
+
+ public void acquireProcessingLock(boolean exclusive, long timeout, TimeUnit timeUnit) throws TimeoutException {
+ Lock lock = exclusive ? processingLock.writeLock() : processingLock.readLock();
+ while (true) {
+ try {
+ if (!lock.tryLock(timeout, timeUnit))
+ throw new TimeoutException("Could not obtain " + (exclusive ? "exclusive" : "shared") + " processing lock");
+
+ return;
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ public void releaseProcessingLock() {
+ try {
+ if (processingLock.isWriteLockedByCurrentThread()) {
+ processingLock.writeLock().unlock();
+ } else {
+ processingLock.readLock().unlock();
+ }
+ } catch (IllegalMonitorStateException imse) {
+ if (log.isTraceEnabled()) log.trace("Did not own lock!");
+ }
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -14,15 +14,16 @@
import org.horizon.remoting.ResponseFilter;
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.DistributedSync;
import org.horizon.remoting.transport.Transport;
import org.horizon.statetransfer.StateTransferException;
import org.horizon.util.FileLookup;
import org.horizon.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMembershipListener;
import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
-import org.jgroups.MembershipListener;
import org.jgroups.Message;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
@@ -40,6 +41,8 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+import java.util.concurrent.atomic.AtomicBoolean;
/**
* An encapsulation of a JGroups transport
@@ -47,7 +50,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class JGroupsTransport implements Transport, MembershipListener, ExtendedMessageListener {
+public class JGroupsTransport implements Transport, ExtendedMembershipListener, ExtendedMessageListener {
public static final String CONFIGURATION_STRING = "configurationString";
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
@@ -68,26 +71,24 @@
ExecutorService asyncExecutor;
CacheManagerNotifier notifier;
final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
+ private final FlushBasedDistributedSync flushTracker = new FlushBasedDistributedSync();
+ volatile List<org.jgroups.Address> membersBlocked;
+ AtomicBoolean flushInProgress = new AtomicBoolean(false);
+ long distributedSyncTimeout;
- /**
- * Reference to an exception that was raised during state installation on this node.
- */
- protected volatile Exception setStateException;
- private final Object stateLock = new Object();
-
-
// ------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
// ------------------------------------------------------------------------------------------------------------------
public void initialize(GlobalConfiguration c, Properties p, Marshaller marshaller, ExecutorService asyncExecutor,
- InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier) {
+ InboundInvocationHandler inboundInvocationHandler, CacheManagerNotifier notifier, long distributedSyncTimeout) {
this.c = c;
this.p = p;
this.marshaller = marshaller;
this.asyncExecutor = asyncExecutor;
this.inboundInvocationHandler = inboundInvocationHandler;
this.notifier = notifier;
+ this.distributedSyncTimeout = distributedSyncTimeout;
}
public void start() {
@@ -137,7 +138,7 @@
channel.setOpt(Channel.AUTO_GETSTATE, false);
channel.setOpt(Channel.BLOCK, true);
dispatcher = new CommandAwareRpcDispatcher(channel, this,
- asyncExecutor, inboundInvocationHandler);
+ asyncExecutor, inboundInvocationHandler, flushTracker, distributedSyncTimeout);
MarshallerAdapter adapter = new MarshallerAdapter(marshaller);
dispatcher.setRequestMarshaller(adapter);
dispatcher.setResponseMarshaller(adapter);
@@ -228,7 +229,7 @@
cleanup = true;
((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout, false);
mon.waitForState();
- return true;
+ return mon.getSetStateException() == null;
} catch (StateTransferException ste) {
throw ste;
} catch (Exception e) {
@@ -239,6 +240,72 @@
}
}
+ public DistributedSync getDistributedSync() {
+ return flushTracker;
+ }
+
+ public void blockRPC(Address... addresses) {
+ if (flushInProgress.compareAndSet(false, true)) {
+ // TODO make these configurable!!
+ int retries = 5;
+ int sleepBetweenRetries = 250;
+ int sleepIncreaseFactor = 2;
+ if (trace) log.trace("Attempting a partial flush on members {0} with up to {1} retries.", members, retries);
+
+ boolean success = false;
+ int i;
+ for (i = 1; i <= retries; i++) {
+ if (trace) log.trace("Attempt number " + i);
+ try {
+
+ if (addresses == null) {
+ success = channel.startFlush(false);
+ } else {
+ membersBlocked = toJGroupsAddressList(addresses);
+ success = channel.startFlush(membersBlocked, false);
+ }
+
+ if (success) break;
+ if (trace) log.trace("Channel.startFlush() returned false!");
+ } catch (Exception e) {
+ if (trace) log.trace("Caught exception attempting a partial flush", e);
+ }
+ try {
+ if (trace)
+ log.trace("Partial state transfer failed. Backing off for " + sleepBetweenRetries + " millis and retrying");
+ Thread.sleep(sleepBetweenRetries);
+ sleepBetweenRetries *= sleepIncreaseFactor;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ if (success) {
+ if (log.isDebugEnabled()) log.debug("Partial flush between {0} succeeded!", membersBlocked);
+ } else {
+ flushInProgress.set(false);
+ throw new CacheException("Could initiate partial flush between " + membersBlocked + "!");
+ }
+ } else {
+ throw new CacheException("Cannot block RPC; a block is already in progress!");
+ }
+ }
+
+ public void unblockRPC() {
+ if (flushInProgress.get()) {
+ try {
+ if (membersBlocked == null) {
+ channel.stopFlush();
+ } else {
+ channel.stopFlush(membersBlocked);
+ membersBlocked = null;
+ }
+ } finally {
+ flushInProgress.set(false);
+ }
+ }
+ }
+
public Address getAddress() {
if (address == null) {
address = new JGroupsAddress(channel.getLocalAddress());
@@ -251,7 +318,8 @@
// outbound RPC
// ------------------------------------------------------------------------------------------------------------------
- public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout, boolean usePriorityQueue, ResponseFilter responseFilter)
+ public List<Object> invokeRemotely(List<Address> recipients, RPCCommand rpcCommand, ResponseMode mode, long timeout,
+ boolean usePriorityQueue, ResponseFilter responseFilter, boolean supportReplay)
throws Exception {
if (recipients != null && recipients.isEmpty()) {
@@ -262,39 +330,51 @@
log.trace("dests={0}, command={1}, mode={2}, timeout={3}", recipients, rpcCommand, mode, timeout);
- RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
- timeout, false, usePriorityQueue,
- toJGroupsFilter(responseFilter));
+ // Acquire a "processing" lock so that any other code is made aware of a network call in progress
+ // make sure this is non-exclusive since concurrent network calls are valid for most situations.
+ flushTracker.acquireProcessingLock(false, distributedSyncTimeout, MILLISECONDS);
+ boolean unlock = true;
+ // if there is a FLUSH in progress, block till it completes
+ flushTracker.blockUntilReleased(distributedSyncTimeout, MILLISECONDS);
- if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async case
+ try {
+ RspList rsps = dispatcher.invokeRemoteCommands(toJGroupsAddressVector(recipients), rpcCommand, toJGroupsMode(mode),
+ timeout, false, usePriorityQueue,
+ toJGroupsFilter(responseFilter), supportReplay);
- if (trace)
- log.trace("Cache [{0}]: responses for command {1}:\n{2}", getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
+ if (mode == ResponseMode.ASYNCHRONOUS) return Collections.emptyList();// async case
- // short-circuit no-return-value calls.
- if (rsps == null) return Collections.emptyList();
- List<Object> retval = new ArrayList<Object>(rsps.size());
+ if (trace)
+ log.trace("Cache [{0}]: responses for command {1}:\n{2}", getAddress(), rpcCommand.getClass().getSimpleName(), rsps);
- for (Rsp rsp : rsps.values()) {
- if (rsp.wasSuspected() || !rsp.wasReceived()) {
- CacheException ex;
- if (rsp.wasSuspected()) {
- ex = new SuspectException("Suspected member: " + rsp.getSender());
+ // short-circuit no-return-value calls.
+ if (rsps == null) return Collections.emptyList();
+ List<Object> retval = new ArrayList<Object>(rsps.size());
+
+ for (Rsp rsp : rsps.values()) {
+ if (rsp.wasSuspected() || !rsp.wasReceived()) {
+ CacheException ex;
+ if (rsp.wasSuspected()) {
+ ex = new SuspectException("Suspected member: " + rsp.getSender());
+ } else {
+ ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+ }
+ retval.add(new ReplicationException("rsp=" + rsp, ex));
} else {
- ex = new TimeoutException("Replication timeout for " + rsp.getSender());
+ Object value = rsp.getValue();
+ if (value instanceof Exception && !(value instanceof ReplicationException)) {
+ // if we have any application-level exceptions make sure we throw them!!
+ if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
+ throw (Exception) value;
+ }
+ retval.add(value);
}
- retval.add(new ReplicationException("rsp=" + rsp, ex));
- } else {
- Object value = rsp.getValue();
- if (value instanceof Exception && !(value instanceof ReplicationException)) {
- // if we have any application-level exceptions make sure we throw them!!
- if (trace) log.trace("Recieved exception'" + value + "' from " + rsp.getSender());
- throw (Exception) value;
- }
- retval.add(value);
}
+ return retval;
+ } finally {
+ // release the "processing" lock so that other threads are aware of the network call having completed
+ if (unlock) flushTracker.releaseProcessingLock();
}
- return retval;
}
private int toJGroupsMode(ResponseMode mode) {
@@ -360,9 +440,13 @@
}
public void block() {
- // a no-op
+ flushTracker.acquireSync();
}
+ public void unblock() {
+ flushTracker.releaseSync();
+ }
+
public void receive(Message msg) {
// no-op
}
@@ -388,13 +472,14 @@
}
public void getState(String cacheName, OutputStream ostream) {
- if (trace) log.trace("Received request to generate state for cache {0}. Attempting to generate state.", cacheName);
+ if (trace)
+ log.trace("Received request to generate state for cache named '{0}'. Attempting to generate state.", cacheName);
try {
inboundInvocationHandler.generateState(cacheName, ostream);
} catch (StateTransferException e) {
log.error("Caught while responding to state transfer request", e);
} finally {
- Util.closeStream(ostream);
+ Util.flushAndCloseStream(ostream);
}
}
@@ -403,14 +488,15 @@
}
public void setState(String cacheName, InputStream istream) {
- if (trace) log.trace("Received state for cache {0}. Attempting to apply state.", cacheName);
- StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+ StateTransferMonitor mon = null;
try {
+ if (trace) log.trace("Received state for cache named '{0}'. Attempting to apply state.", cacheName);
+ mon = stateTransfersInProgress.get(cacheName);
inboundInvocationHandler.applyState(cacheName, istream);
mon.notifyStateReceiptSucceeded();
- } catch (StateTransferException e) {
+ } catch (Exception e) {
log.error("Failed setting state", e);
- mon.notifyStateReceiptFailed(e);
+ mon.notifyStateReceiptFailed(e instanceof StateTransferException ? (StateTransferException) e : new StateTransferException(e));
} finally {
Util.closeStream(istream);
}
@@ -433,6 +519,18 @@
return retval;
}
+ private List<org.jgroups.Address> toJGroupsAddressList(Address... addresses) {
+ if (addresses == null) return null;
+ if (addresses.length == 0) return Collections.emptyList();
+
+ List<org.jgroups.Address> retval = new ArrayList<org.jgroups.Address>(addresses.length);
+ for (Address a : addresses) {
+ JGroupsAddress ja = (JGroupsAddress) a;
+ retval.add(ja.address);
+ }
+ return retval;
+ }
+
private org.jgroups.Address toJGroupsAddress(Address a) {
return ((JGroupsAddress) a).address;
}
Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/RequestIgnoredResponse.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,36 @@
+/*
+* JBoss, Home of Professional Open Source
+* Copyright 2005, JBoss Inc., and individual contributors as indicated
+* by the @authors tag. See the copyright.txt in the distribution for a
+* full listing of individual contributors.
+*
+* This is free software; you can redistribute it and/or modify it
+* under the terms of the GNU Lesser General Public License as
+* published by the Free Software Foundation; either version 2.1 of
+* the License, or (at your option) any later version.
+*
+* This software is distributed in the hope that it will be useful,
+* but WITHOUT ANY WARRANTY; without even the implied warranty of
+* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+* Lesser General Public License for more details.
+*
+* You should have received a copy of the GNU Lesser General Public
+* License along with this software; if not, write to the Free
+* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+*/
+package org.horizon.remoting.transport.jgroups;
+
+import java.io.Serializable;
+
+/**
+ * Indicates that the request was ignored,
+ *
+ * @author Jason T. Greene
+ */
+public class RequestIgnoredResponse implements Serializable {
+ @Override
+ public String toString() {
+ return "RequestIgnoredResponse";
+ }
+}
Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -22,12 +22,18 @@
package org.horizon.statetransfer;
import org.horizon.AdvancedCache;
-import org.horizon.transaction.TransactionLog;
+import org.horizon.commands.tx.PrepareCommand;
+import org.horizon.commands.write.WriteCommand;
import org.horizon.config.Configuration;
import org.horizon.container.DataContainer;
+import org.horizon.context.InvocationContext;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.Start;
+import org.horizon.interceptors.InterceptorChain;
+import org.horizon.invocation.InvocationContextContainer;
import org.horizon.invocation.Options;
+import org.horizon.io.UnclosableObjectInputStream;
+import org.horizon.io.UnclosableObjectOutputStream;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheLoaderManager;
import org.horizon.loader.CacheStore;
@@ -36,6 +42,9 @@
import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.remoting.RPCManager;
+import org.horizon.remoting.transport.DistributedSync;
+import org.horizon.remoting.transport.Transport;
+import org.horizon.transaction.TransactionLog;
import org.horizon.util.Util;
import java.io.IOException;
@@ -44,8 +53,9 @@
import java.io.ObjectOutputStream;
import java.io.OutputStream;
import java.io.Serializable;
+import java.util.List;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
public class StateTransferManagerImpl implements StateTransferManager {
@@ -57,13 +67,18 @@
CacheStore cs;
Marshaller marshaller;
TransactionLog transactionLog;
+ InvocationContextContainer invocationContextContainer;
+ InterceptorChain interceptorChain;
private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+ private static final boolean trace = log.isTraceEnabled();
private static final Delimiter DELIMITER = new Delimiter();
+ boolean transientState, persistentState;
+
@Inject
public void injectDependencies(RPCManager rpcManager, AdvancedCache cache, Configuration configuration,
DataContainer dataContainer, CacheLoaderManager clm, Marshaller marshaller,
- TransactionLog transactionLog) {
+ TransactionLog transactionLog, InterceptorChain interceptorChain, InvocationContextContainer invocationContextContainer) {
this.rpcManager = rpcManager;
this.cache = cache;
this.configuration = configuration;
@@ -71,12 +86,17 @@
this.clm = clm;
this.marshaller = marshaller;
this.transactionLog = transactionLog;
+ this.invocationContextContainer = invocationContextContainer;
+ this.interceptorChain = interceptorChain;
}
@Start(priority = 14)
// it is imperative that this starts *after* the RPCManager does.
public void start() throws StateTransferException {
- cs = clm == null || !clm.isEnabled() || !clm.isFetchPersistentState() ? null : clm.getCacheStore();
+ log.trace("Data container is {0}", System.identityHashCode(dataContainer));
+ cs = clm == null ? null : clm.getCacheStore();
+ transientState = configuration.isFetchInMemoryState();
+ persistentState = cs != null && clm.isEnabled() && clm.isFetchPersistentState() && !clm.isShared();
long startTime = 0;
if (log.isDebugEnabled()) {
@@ -93,43 +113,162 @@
}
public void generateState(OutputStream out) throws StateTransferException {
- if (log.isDebugEnabled()) log.debug("Generating state");
+ ObjectOutputStream oos = null;
+ boolean txLogActivated = false;
+ try {
+ boolean canProvideState = (transientState || persistentState)
+ && (txLogActivated = transactionLog.activate());
+ if (log.isDebugEnabled()) log.debug("Generating state. Can provide? {0}", canProvideState);
+ oos = new ObjectOutputStream(out);
+ marshaller.objectToObjectStream(canProvideState, oos);
+ if (canProvideState) {
+ delimit(oos);
+ if (transientState) generateInMemoryState(oos);
+ delimit(oos);
+ if (persistentState) generatePersistentState(oos);
+ delimit(oos);
+ generateTransactionLog(oos);
+
+ if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
+ } else {
+ if (log.isDebugEnabled()) log.debug("Not providing state!");
+ }
+
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ } finally {
+ Util.flushAndCloseStream(oos);
+ if (txLogActivated) transactionLog.deactivate();
+ }
+ }
+
+ private void generateTransactionLog(ObjectOutputStream oos) throws Exception {
+ // todo this should be configurable
+ int maxNonProgressingLogWrites = 100;
+ int flushTimeout = 60000;
+
+ DistributedSync distributedSync = rpcManager.getTransport().getDistributedSync();
+
try {
- ObjectOutputStream oos = new ObjectOutputStream(out);
+ if (trace) log.trace("Transaction log size is {0}", transactionLog.size());
+ for (int nonProgress = 0, size = transactionLog.size(); size > 0;) {
+ if (trace) log.trace("Tx Log remaining entries = " + size);
+ transactionLog.writeCommitLog(marshaller, oos);
+ int newSize = transactionLog.size();
+
+ // If size did not decrease then we did not make progress, and could be wasting
+ // our time. Limit this to the specified max.
+ if (newSize >= size && ++nonProgress >= maxNonProgressingLogWrites)
+ break;
+
+ size = newSize;
+ }
+
+ // Wait on incoming and outgoing threads to line-up in front of
+ // the distributed sync.
+ distributedSync.acquireProcessingLock(true, configuration.getStateRetrievalTimeout(), MILLISECONDS);
+
+ // Signal to sender that we need a flush to get a consistent view
+ // of the remaining transactions.
delimit(oos);
- generateInMemoryState(oos);
+ oos.flush();
+ if (trace) log.trace("Waiting for a distributed sync block");
+ distributedSync.blockUntilAcquired(flushTimeout, MILLISECONDS);
+ if (trace) log.trace("Distributed sync block received, proceeding with writing commit log");
+ // Write remaining transactions
+ transactionLog.writeCommitLog(marshaller, oos);
delimit(oos);
- generatePersistentState(oos);
+
+ // Write all non-completed prepares
+ transactionLog.writePendingPrepares(marshaller, oos);
delimit(oos);
oos.flush();
- oos.close();
- if (log.isDebugEnabled()) log.debug("State generated, closing object stream");
- // just close the object stream but do NOT close the underlying stream
- } catch (StateTransferException ste) {
- throw ste;
- } catch (Exception e) {
- throw new StateTransferException(e);
}
+ finally {
+ distributedSync.releaseProcessingLock();
+ }
}
+ private void processCommitLog(ObjectInputStream ois) throws Exception {
+ Object object = marshaller.objectFromObjectStream(ois);
+ while (object instanceof TransactionLog.LogEntry) {
+ List<WriteCommand> mods = ((TransactionLog.LogEntry) object).getModifications();
+ if (trace) log.trace("Mods = {0}", mods);
+ for (WriteCommand mod : mods) {
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ ctx.setOptions(Options.CACHE_MODE_LOCAL, Options.SKIP_CACHE_STATUS_CHECK);
+ interceptorChain.invoke(ctx, mod);
+ }
+
+ object = marshaller.objectFromObjectStream(ois);
+ }
+
+ assertDelimited(object);
+ }
+
+ private void applyTransactionLog(ObjectInputStream ois) throws Exception {
+ if (trace) log.trace("Integrating transaction log");
+
+ processCommitLog(ois);
+ Transport t = rpcManager.getTransport();
+ t.blockRPC(rpcManager.getTransport().getAddress(), rpcManager.getCurrentStateTransferSource());
+
+ try {
+ if (trace)
+ log.trace("Retrieving/Applying post-flush commits");
+ processCommitLog(ois);
+
+ if (trace)
+ log.trace("Retrieving/Applying pending prepares");
+ Object object = marshaller.objectFromObjectStream(ois);
+ while (object instanceof PrepareCommand) {
+ PrepareCommand command = (PrepareCommand) object;
+ if (!transactionLog.hasPendingPrepare(command)) {
+ InvocationContext ctx = invocationContextContainer.get();
+ ctx.setOriginLocal(false);
+ ctx.setOptions(Options.CACHE_MODE_LOCAL, Options.SKIP_CACHE_STATUS_CHECK);
+ interceptorChain.invoke(ctx, command);
+ }
+ object = marshaller.objectFromObjectStream(ois);
+ }
+ assertDelimited(object);
+ }
+ finally {
+ if (trace) log.trace("Stopping partial flush");
+ t.unblockRPC();
+ }
+ }
+
public void applyState(InputStream in) throws StateTransferException {
if (log.isDebugEnabled()) log.debug("Applying state");
-
+ ObjectInputStream ois = null;
try {
- ObjectInputStream ois = new ObjectInputStream(in);
- assertDelimited(ois);
- applyInMemoryState(ois);
- assertDelimited(ois);
- applyPersistentState(ois);
- assertDelimited(ois);
- if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
- ois.close();
- // just close the object stream but do NOT close the underlying stream
+ ois = new ObjectInputStream(in);
+ boolean canProvideState = (Boolean) marshaller.objectFromObjectStream(ois);
+ if (canProvideState) {
+ assertDelimited(ois);
+ if (transientState) applyInMemoryState(ois);
+ assertDelimited(ois);
+ if (persistentState) applyPersistentState(ois);
+ assertDelimited(ois);
+ applyTransactionLog(ois);
+ if (log.isDebugEnabled()) log.debug("State applied, closing object stream");
+ } else {
+ String msg = "Provider cannot provide state!";
+ if (log.isDebugEnabled()) log.debug(msg);
+ throw new StateTransferException(msg);
+ }
} catch (StateTransferException ste) {
throw ste;
} catch (Exception e) {
throw new StateTransferException(e);
+ } finally {
+ // just close the object stream but do NOT close the underlying stream
+ Util.closeStream(ois);
}
}
@@ -137,7 +276,8 @@
dataContainer.clear();
try {
Set<StoredEntry> set = (Set<StoredEntry>) marshaller.objectFromObjectStream(i);
- for (StoredEntry se: set) cache.put(se.getKey(), se.getValue(), se.getLifespan(), TimeUnit.MILLISECONDS, Options.CACHE_MODE_LOCAL);
+ for (StoredEntry se : set)
+ cache.put(se.getKey(), se.getValue(), se.getLifespan(), MILLISECONDS, Options.CACHE_MODE_LOCAL);
} catch (Exception e) {
dataContainer.clear();
throw new StateTransferException(e);
@@ -149,6 +289,7 @@
// TODO is it safe enough to get these from the data container directly?
try {
Set<StoredEntry> s = dataContainer.getAllEntriesForStorage();
+ if (log.isDebugEnabled()) log.debug("Writing {0} StoredEntries to stream", s.size());
marshaller.objectToObjectStream(s, o);
} catch (Exception e) {
throw new StateTransferException(e);
@@ -156,40 +297,39 @@
}
private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
- if (cs == null) {
- if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured. Skipping applying persistent state.");
- } else {
- try {
- cs.fromStream(i);
- } catch (CacheLoaderException cle) {
- throw new StateTransferException(cle);
- }
+ try {
+ // always use the unclosable stream delegate to ensure the impl doesn't close the stream
+ cs.fromStream(new UnclosableObjectInputStream(i));
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
}
}
private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
- if (cs == null) {
- if (log.isDebugEnabled()) log.debug("Not configured to fetch persistent state, or no cache store configured. Skipping generating persistent state.");
- } else {
- try {
- cs.toStream(o);
- } catch (CacheLoaderException cle) {
- throw new StateTransferException(cle);
- }
+ try {
+ // always use the unclosable stream delegate to ensure the impl doesn't close the stream
+ cs.toStream(new UnclosableObjectOutputStream(o));
+ } catch (CacheLoaderException cle) {
+ throw new StateTransferException(cle);
}
}
private void delimit(ObjectOutputStream o) throws IOException {
- o.writeObject(DELIMITER);
+ marshaller.objectToObjectStream(DELIMITER, o);
}
private void assertDelimited(ObjectInputStream i) throws StateTransferException {
Object o;
try {
- o = i.readObject();
+ o = marshaller.objectFromObjectStream(i);
} catch (Exception e) {
throw new StateTransferException(e);
}
+ assertDelimited(o);
+ }
+
+ private void assertDelimited(Object o) throws StateTransferException {
+ if (o instanceof Exception) throw new StateTransferException((Exception) o);
if ((o == null) || !(o instanceof Delimiter))
throw new StateTransferException("Expected a delimiter, recieved " + o);
}
Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionLog.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -41,43 +41,36 @@
*
* @author Jason T. Greene
*/
-public class TransactionLog
-{
+public class TransactionLog {
private final Map<GlobalTransaction, PrepareCommand> pendingPrepares = new ConcurrentHashMap<GlobalTransaction, PrepareCommand>();
private final BlockingQueue<LogEntry> entries = new LinkedBlockingQueue<LogEntry>();
private AtomicBoolean active = new AtomicBoolean();
- public static class LogEntry
- {
+ public static class LogEntry {
private final GlobalTransaction transaction;
private final List<WriteCommand> modifications;
- public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications)
- {
+ public LogEntry(GlobalTransaction transaction, List<WriteCommand> modifications) {
this.transaction = transaction;
this.modifications = modifications;
}
- public GlobalTransaction getTransaction()
- {
+ public GlobalTransaction getTransaction() {
return transaction;
}
- public List<WriteCommand> getModifications()
- {
+ public List<WriteCommand> getModifications() {
return modifications;
}
}
private static Log log = LogFactory.getLog(TransactionLog.class);
- public void logPrepare(PrepareCommand command)
- {
+ public void logPrepare(PrepareCommand command) {
pendingPrepares.put(command.getGlobalTransaction(), command);
}
- public void logCommit(GlobalTransaction gtx)
- {
+ public void logCommit(GlobalTransaction gtx) {
PrepareCommand command = pendingPrepares.remove(gtx);
// it is perfectly normal for a prepare not to be logged for this gtx, for example if a transaction did not
// modify anything, then beforeCompletion() is not invoked and logPrepare() will not be called to register the
@@ -85,38 +78,32 @@
if (command != null) addEntry(new LogEntry(gtx, command.getModifications()));
}
- private void addEntry(LogEntry entry)
- {
- if (! isActive())
+ private void addEntry(LogEntry entry) {
+ if (!isActive())
return;
- for (;;)
- {
- try
- {
+ for (; ;) {
+ try {
if (log.isTraceEnabled())
log.trace("Added commit entry to tx log" + entry);
entries.put(entry);
break;
}
- catch (InterruptedException e)
- {
+ catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
- public void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications)
- {
+ public final void logOnePhaseCommit(GlobalTransaction gtx, List<WriteCommand> modifications) {
// Just in case...
if (gtx != null) pendingPrepares.remove(gtx);
if (!modifications.isEmpty()) addEntry(new LogEntry(gtx, modifications));
}
- public void logNoTxWrite(WriteCommand write)
- {
- if (! isActive())
+ public final void logNoTxWrite(WriteCommand write) {
+ if (!isActive())
return;
ArrayList<WriteCommand> list = new ArrayList<WriteCommand>();
@@ -124,55 +111,46 @@
addEntry(new LogEntry(null, list));
}
- public void rollback(GlobalTransaction gtx)
- {
+ public void rollback(GlobalTransaction gtx) {
pendingPrepares.remove(gtx);
}
- public boolean isActive()
- {
+ public final boolean isActive() {
return active.get();
}
- public boolean activate()
- {
+ public final boolean activate() {
return active.compareAndSet(false, true);
}
- public void deactivate()
- {
+ public final void deactivate() {
active.set(false);
if (entries.size() > 0)
- log.error("Unprocessed Transaction Log Entries! = " + entries.size());
+ log.error("Unprocessed Transaction Log Entries! = {0}", entries.size());
entries.clear();
}
- public int size()
- {
+ public int size() {
return entries.size();
}
- public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception
- {
- List<LogEntry> buffer = new ArrayList<LogEntry>(10);
+ public void writeCommitLog(Marshaller marshaller, ObjectOutputStream out) throws Exception {
+ List<LogEntry> buffer = new ArrayList<LogEntry>(10);
- while (entries.drainTo(buffer, 10) > 0)
- {
- for (LogEntry entry : buffer)
- marshaller.objectToObjectStream(entry, out);
+ while (entries.drainTo(buffer, 10) > 0) {
+ for (LogEntry entry : buffer)
+ marshaller.objectToObjectStream(entry, out);
- buffer.clear();
- }
+ buffer.clear();
+ }
}
- public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception
- {
+ public void writePendingPrepares(Marshaller marshaller, ObjectOutputStream out) throws Exception {
for (PrepareCommand entry : pendingPrepares.values())
marshaller.objectToObjectStream(entry, out);
}
- public boolean hasPendingPrepare(PrepareCommand command)
- {
+ public boolean hasPendingPrepare(PrepareCommand command) {
return pendingPrepares.containsKey(command.getGlobalTransaction());
}
}
Modified: core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/transaction/TransactionTable.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -26,11 +26,13 @@
import org.horizon.context.TransactionContext;
import org.horizon.factories.annotations.Inject;
import org.horizon.factories.annotations.NonVolatile;
+import org.horizon.factories.annotations.Start;
import org.horizon.factories.context.ContextFactory;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.remoting.RPCManager;
import org.horizon.remoting.transport.Address;
+import org.horizon.remoting.transport.Transport;
import javax.transaction.Status;
import javax.transaction.SystemException;
@@ -64,6 +66,7 @@
private TransactionManager transactionManager = null;
private RPCManager rpcManager;
+ private Transport transport;
private ContextFactory contextFactory;
@@ -74,6 +77,13 @@
this.contextFactory = contextFactory;
}
+ @Start(priority = 12)
+ // needs to happen after RPCManager
+ public void start() {
+ transport = rpcManager == null ? null : rpcManager.getTransport();
+ }
+
+
/**
* Returns the number of local transactions.
*/
@@ -312,7 +322,7 @@
}
private Address getAddress() {
- return rpcManager == null ? null : rpcManager.getAddress();
+ return transport == null ? null : transport.getAddress();
}
public TransactionContext getTransactionContext(GlobalTransaction gtx) {
Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -157,18 +157,26 @@
}
public static void closeStream(InputStream i) {
+ if (i == null) return;
try {
- if (i != null) i.close();
+ i.close();
} catch (Exception e) {
}
}
- public static void closeStream(OutputStream o) {
- try {
- if (o != null) o.close();
+ public static void flushAndCloseStream(OutputStream o) {
+ if (o == null) return;
+ try {
+ o.flush();
} catch (Exception e) {
}
+
+ try {
+ o.close();
+ } catch (Exception e) {
+
+ }
}
}
Modified: core/branches/flat/src/main/resources/config-samples/all.xml
===================================================================
--- core/branches/flat/src/main/resources/config-samples/all.xml 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/resources/config-samples/all.xml 2009-03-03 14:56:48 UTC (rev 7829)
@@ -33,7 +33,8 @@
There is no added cost to defining a transport but not creating a cache that uses one, since the transport
is created and initialized lazily.
-->
- <transport transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport" clusterName="horizon-cluster">
+ <transport transportClass="org.horizon.remoting.transport.jgroups.JGroupsTransport" clusterName="horizon-cluster"
+ distributedSyncTimeout="50000">
<!-- Note that the JGroups transport uses sensible defaults if no configuration property is defined. -->
<property name="configurationFile" value="udp.xml"/>
<!-- See the JGroupsTransport javadocs for more options -->
Modified: core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd
===================================================================
--- core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/main/resources/schema/horizon-config-1.0.xsd 2009-03-03 14:56:48 UTC (rev 7829)
@@ -139,6 +139,7 @@
</xs:sequence>
<xs:attribute name="transportClass" type="xs:string"/>
<xs:attribute name="clusterName" type="xs:string"/>
+ <xs:attribute name="distributedSyncTimeout" type="xs:long"/>
</xs:complexType>
<xs:complexType name="syncType">
Modified: core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/api/mvcc/PutForExternalReadTest.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -106,7 +106,7 @@
// specify what we expectWithTx called on the mock Rpc Manager. For params we don't care about, just use ANYTHING.
// setting the mock object to expectWithTx the "sync" param to be false.
expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
- eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean(), (ResponseFilter) isNull())).andReturn(null);
+ eq(ResponseMode.ASYNCHRONOUS), anyLong(), anyBoolean(), (ResponseFilter) isNull(), anyBoolean())).andReturn(null);
replay(mockAddress1, mockAddress2, mockTransport);
@@ -160,7 +160,7 @@
expect(mockTransport.invokeRemotely(anyAddresses(), (RPCCommand) anyObject(), anyResponseMode(),
- anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
.andThrow(new RuntimeException("Barf!")).anyTimes();
replay(mockTransport);
Modified: core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/invalidation/BaseInvalidationTest.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -182,7 +182,7 @@
expect(mockTransport.getAddress()).andReturn(addressOne).anyTimes();
expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(),
eq(isSync ? ResponseMode.SYNCHRONOUS : ResponseMode.ASYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter) anyObject())).andReturn(null).anyTimes();
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean())).andReturn(null).anyTimes();
replay(mockTransport);
cache1.put("k", "v");
Modified: core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/replication/SyncReplTest.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -145,7 +145,7 @@
rpcManager.setTransport(mockTransport);
expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.SYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
.andReturn(Collections.emptyList()).once();
replay(mockTransport);
@@ -157,7 +157,7 @@
expect(mockTransport.getAddress()).andReturn(mockAddressOne).anyTimes();
expect(mockTransport.getMembers()).andReturn(addresses).anyTimes();
expect(mockTransport.invokeRemotely((List<Address>) anyObject(), (RPCCommand) anyObject(), eq(ResponseMode.ASYNCHRONOUS),
- anyLong(), anyBoolean(), (ResponseFilter) anyObject()))
+ anyLong(), anyBoolean(), (ResponseFilter) anyObject(), anyBoolean()))
.andReturn(Collections.emptyList()).once();
replay(mockTransport);
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferCacheLoaderFunctionalTest.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -0,0 +1,88 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.CacheLoaderManagerConfig;
+import org.horizon.loader.CacheLoader;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderManager;
+import org.horizon.loader.dummy.DummyInMemoryCacheStore;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+@Test(groups = "functional", testName = "statetransfer.StateTransferCacheLoaderFunctionalTest", enabled = false)
+public class StateTransferCacheLoaderFunctionalTest extends StateTransferFunctionalTest {
+ int id;
+ ThreadLocal<Boolean> sharedCacheLoader = new ThreadLocal<Boolean>() {
+ protected Boolean initialValue() {
+ return false;
+ }
+ };
+
+ @Override
+ protected CacheManager createCacheManager() {
+ // increment the DIMCS store id
+ CacheLoaderManagerConfig clmc = new CacheLoaderManagerConfig();
+ CacheLoaderConfig clc = new DummyInMemoryCacheStore.Cfg("store number " + id++);
+ clmc.addCacheLoaderConfig(clc);
+ clc.setFetchPersistentState(true);
+ clmc.setShared(sharedCacheLoader.get());
+ config.setCacheLoaderManagerConfig(clmc);
+ return super.createCacheManager();
+ }
+
+ @Override
+ protected void writeInitialData(final Cache<Object, Object> c) {
+ super.writeInitialData(c);
+ c.evict(A_B_NAME);
+ c.evict(A_B_AGE);
+ c.evict(A_C_NAME);
+ c.evict(A_C_AGE);
+ c.evict(A_D_NAME);
+ c.evict(A_D_AGE);
+ }
+
+ protected void verifyInitialDataOnLoader(Cache<Object, Object> c) throws Exception {
+ CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+ assert l.containsKey(A_B_AGE);
+ assert l.containsKey(A_B_NAME);
+ assert l.containsKey(A_C_AGE);
+ assert l.containsKey(A_C_NAME);
+ assert l.load(A_B_AGE).getValue().equals(TWENTY);
+ assert l.load(A_B_NAME).getValue().equals(JOE);
+ assert l.load(A_C_AGE).getValue().equals(FORTY);
+ assert l.load(A_C_NAME).getValue().equals(BOB);
+ }
+
+ protected void verifyNoData(Cache<Object, Object> c) {
+ assert c.isEmpty() : "Cache should be empty!";
+ }
+
+ protected void verifyNoDataOnLoader(Cache<Object, Object> c) throws Exception {
+ CacheLoader l = TestingUtil.extractComponent(c, CacheLoaderManager.class).getCacheLoader();
+ assert !l.containsKey(A_B_AGE);
+ assert !l.containsKey(A_B_NAME);
+ assert !l.containsKey(A_C_AGE);
+ assert !l.containsKey(A_C_NAME);
+ assert !l.containsKey(A_D_AGE);
+ assert !l.containsKey(A_D_NAME);
+ }
+
+
+ public void testSharedLoader() throws Exception {
+ sharedCacheLoader.set(true);
+ Cache<Object, Object> c1 = createCacheManager().getCache(cacheName);
+ writeInitialData(c1);
+
+ // starting the second cache would initialize an in-memory state transfer but not a persistent one since the loader is shared
+ Cache<Object, Object> c2 = createCacheManager().getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(60000, c1, c2);
+
+ verifyInitialDataOnLoader(c1);
+ verifyInitialData(c1);
+
+ verifyNoDataOnLoader(c2);
+ verifyNoData(c2);
+ }
+}
Modified: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03 13:01:12 UTC (rev 7828)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-03-03 14:56:48 UTC (rev 7829)
@@ -1,13 +1,13 @@
package org.horizon.statetransfer;
import org.horizon.Cache;
-import org.horizon.transaction.DummyTransactionManagerLookup;
import org.horizon.config.Configuration;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
import org.horizon.manager.CacheManager;
import org.horizon.test.MultipleCacheManagersTest;
import org.horizon.test.TestingUtil;
+import org.horizon.transaction.DummyTransactionManagerLookup;
import org.testng.annotations.Test;
import javax.transaction.TransactionManager;
@@ -34,7 +34,7 @@
public static final Integer FORTY = 40;
Configuration config;
- private static final String cacheName = "nbst";
+ protected static final String cacheName = "nbst";
private volatile int testCount = 0;
@@ -54,9 +54,9 @@
config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
}
- private CacheManager createCacheManager() {
+ protected CacheManager createCacheManager() {
CacheManager cm = addClusterEnabledCacheManager();
- cm.defineCache(cacheName, config);
+ cm.defineCache(cacheName, config.clone());
return cm;
}
@@ -86,17 +86,18 @@
}
- private static class WritingRunner implements Runnable {
+ private static class WritingThread extends Thread {
private final Cache<Object, Object> cache;
private final boolean tx;
private volatile boolean stop;
private volatile int result;
private TransactionManager tm;
- WritingRunner(Cache<Object, Object> cache, boolean tx) {
+ WritingThread(Cache<Object, Object> cache, boolean tx) {
this.cache = cache;
this.tx = tx;
if (tx) tm = TestingUtil.getTransactionManager(cache);
+ setDaemon(true);
}
public int result() {
@@ -107,19 +108,27 @@
int c = 0;
while (!stop) {
try {
- if (tx) tm.begin();
- cache.put("test" + c, c++);
- if (tx) tm.commit();
+ if (c % 1000 == 0) {
+ if (tx) tm.begin();
+ for (int i = 0; i < 1000; i++) cache.remove("test" + c);
+ if (tx) tm.commit();
+ c = 0;
+ } else {
+ if (tx) tm.begin();
+ cache.put("test" + c, c++);
+ if (tx) tm.commit();
+ }
}
catch (Exception e) {
- e.printStackTrace();
- log.error(e);
+// e.printStackTrace();
+// log.error(e);
+ stopThread();
}
}
result = c;
}
- public void stop() {
+ public void stopThread() {
stop = true;
}
}
@@ -163,6 +172,7 @@
cm3.getCache(cacheName);
}
});
+ t1.setName("CacheStarter-Cache3");
t1.start();
Thread t2 = new Thread(new Runnable() {
@@ -170,6 +180,7 @@
cm4.getCache(cacheName);
}
});
+ t2.setName("CacheStarter-Cache4");
t2.start();
t1.join();
@@ -178,7 +189,7 @@
cache3 = cm3.getCache(cacheName);
cache4 = cm4.getCache(cacheName);
- TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+ TestingUtil.blockUntilViewsReceived(120000, cache1, cache2, cache3, cache4);
verifyInitialData(cache3);
verifyInitialData(cache4);
log.info("testConcurrentStateTransfer end - " + testCount);
@@ -205,6 +216,7 @@
log.info("testSTWithWritingNonTxThread end - " + testCount);
}
+ @Test(invocationCount = 10)
public void testSTWithWritingTxThread() throws Exception {
testCount++;
log.info("testSTWithWritingTxThread start - " + testCount);
@@ -222,8 +234,7 @@
// Delay the transient copy, so that we get a more thorough log test
cache1.put("delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache3, tx);
- Thread writerThread = new Thread(writer);
+ WritingThread writerThread = new WritingThread(cache3, tx);
writerThread.start();
cache2 = createCacheManager().getCache(cacheName);
@@ -231,25 +242,25 @@
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
- writer.stop();
+ writerThread.stopThread();
writerThread.join();
verifyInitialData(cache2);
- int count = writer.result();
+ int count = writerThread.result();
for (int c = 0; c < count; c++)
assert cache2.get("test" + c).equals(c);
}
- private void verifyInitialData(Cache<Object, Object> c) {
+ protected void verifyInitialData(Cache<Object, Object> c) {
assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
}
- private void writeInitialData(final Cache<Object, Object> c) {
+ protected void writeInitialData(final Cache<Object, Object> c) {
c.put(A_B_NAME, JOE);
c.put(A_B_AGE, TWENTY);
c.put(A_C_NAME, BOB);
@@ -261,25 +272,24 @@
cache1 = createCacheManager().getCache(cacheName);
writeInitialData(cache1);
-
// Delay the transient copy, so that we get a more thorough log test
cache1.put("delay", new DelayTransfer());
- WritingRunner writer = new WritingRunner(cache1, tx);
- Thread writerThread = new Thread(writer);
+ WritingThread writerThread = new WritingThread(cache1, tx);
writerThread.start();
-
+ verifyInitialData(cache1);
cache2 = createCacheManager().getCache(cacheName);
// Pause to give caches time to see each other
TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
- writer.stop();
+ writerThread.stopThread();
writerThread.join();
+ verifyInitialData(cache1);
verifyInitialData(cache2);
- int count = writer.result();
+ int count = writerThread.result();
for (int c = 0; c < count; c++)
assert cache2.get("test" + c).equals(c);
15 years, 10 months
JBoss Cache SVN: r7828 - in core/branches/flat/src: main/java/org/horizon/loader/bucket and 14 other directories.
by jbosscache-commits@lists.jboss.org
Author: mircea.markus
Date: 2009-03-03 08:01:12 -0500 (Tue, 03 Mar 2009)
New Revision: 7828
Added:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/
core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/
core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactoryConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/PooledConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/SimpleConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/DefaultKey2StringMapper.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/Key2StringMapper.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/UnsupportedKeyTypeException.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/DefaultKey2StringMapperTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcBinaryCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcStringBasedCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/UnitTestDatabaseManager.java
core/branches/flat/src/test/java/org/horizon/lock/StripedLockTest.java
Removed:
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/ConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/NonManagedConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/test/UnitTestDatabaseManager.java
Modified:
core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
Log:
ongoing jdbc cache store work
Modified: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,8 +1,13 @@
package org.horizon.loader;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.Cache;
import org.horizon.loader.modifications.Modification;
import org.horizon.loader.modifications.Remove;
import org.horizon.loader.modifications.Store;
+import org.horizon.marshall.Marshaller;
+import org.horizon.util.concurrent.WithinThreadExecutor;
import javax.transaction.Transaction;
import java.io.InputStream;
@@ -11,6 +16,8 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
/**
* An abstract {@link org.horizon.loader.CacheStore} that holds common implementations for some methods
@@ -22,6 +29,46 @@
private final Map<Transaction, List<? extends Modification>> transactions = new ConcurrentHashMap<Transaction, List<? extends Modification>>();
+ private static Log log = LogFactory.getLog(AbstractCacheStore.class);
+
+ private AbstractCacheStoreConfig config;
+
+ private ExecutorService purgerService;
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ this.config = (AbstractCacheStoreConfig) config;
+ if (config == null) throw new IllegalStateException("Null config!!!");
+ }
+
+ public void start() throws CacheLoaderException {
+ if (config == null) throw new IllegalStateException("Make sure you call super.init() from CacheStore extension");
+ if (config.isPurgeSynchronously()) {
+ purgerService = new WithinThreadExecutor();
+ } else {
+ purgerService = Executors.newSingleThreadExecutor();
+ }
+ }
+
+ public void stop() throws CacheLoaderException {
+ purgerService.shutdownNow();
+ }
+
+ public void purgeExpired() throws CacheLoaderException {
+ if (purgerService == null)
+ throw new IllegalStateException("PurgeService is null (did you call super.start() from cache loader implementation ?");
+ purgerService.execute(new Runnable() {
+ public void run() {
+ try {
+ purgeInternal();
+ } catch (CacheLoaderException e) {
+ log.info("Problems encountered while purging expired", e);
+ }
+ }
+ });
+ }
+
+ protected void purgeInternal() throws CacheLoaderException {}
+
protected void applyModifications(List<? extends Modification> mods) throws CacheLoaderException {
for (Modification m : mods) {
switch (m.getType()) {
Added: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,20 @@
+package org.horizon.loader;
+
+/**
+ * Configuration for {@link AbstractCacheStore}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class AbstractCacheStoreConfig extends AbstractCacheLoaderConfig {
+
+ private boolean purgeSynchronously = false;
+
+ public boolean isPurgeSynchronously() {
+ return purgeSynchronously;
+ }
+
+ public void setPurgeSynchronously(boolean purgeSynchronously) {
+ testImmutability("purgeSynchronously");
+ this.purgeSynchronously = purgeSynchronously;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/AbstractCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,193 @@
+package org.horizon.loader;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.Cache;
+import org.horizon.lock.StripedLock;
+import org.horizon.marshall.Marshaller;
+
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Set;
+
+/**
+ * This class extends {@link org.horizon.loader.AbstractCacheStore} adding lock support for consistently acceessing
+ * stored data.
+ * <p/>
+ * In-memory locking is needed by aggregation operations(e.g. loadAll, toStream, fromStream) to make sure that
+ * manipulated data won't be corrupted by concurrent access to Store. It also assurce atomic data access for each stored
+ * entry.
+ * <p/>
+ * Locking is based on a {@link org.horizon.lock.StripedLock}. You can tune the concurrency level of the striped lock
+ * (see the Javadocs of StripedLock for details on what this is) by using the {@link
+ * org.horizon.loader.LockSupportCacheStore#setLockConcurrencyLevel(int)} setter.
+ * <p/>
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public abstract class LockSupportCacheStore extends AbstractCacheStore {
+
+ private static Log log = LogFactory.getLog(LockSupportCacheStore.class);
+
+ private StripedLock locks;
+ private long globalLockTimeoutMillis;
+ private LockSupportCacheStoreConfig config;
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ super.init(config, cache, m);
+ this.config = (LockSupportCacheStoreConfig) config;
+ }
+
+ public void start() throws CacheLoaderException {
+ super.start();
+ if (config == null)
+ throw new CacheLoaderException("Null config. Possible reason is not calling super.init(...)");
+ locks = new StripedLock(config.getLockConcurrencyLevel());
+ globalLockTimeoutMillis = config.getLockAcquistionTimeout();
+ }
+
+ /**
+ * Release the locks (either read or write).
+ */
+ protected void unlock(String key) {
+ locks.releaseLock(key);
+ }
+
+ /**
+ * Acquires write lock on the given key.
+ */
+ protected void lockForWritting(String key) throws CacheLoaderException {
+ locks.acquireLock(key, true);
+ }
+
+ /**
+ * Acquires read lock on the given key.
+ */
+ protected void lockForReading(String key) throws CacheLoaderException {
+ locks.acquireLock(key, false);
+ }
+
+ /**
+ * Same as {@link #lockForWritting(String)}, but with 0 timeout.
+ */
+ protected boolean immediateLockForWritting(String key) throws CacheLoaderException {
+ return locks.acquireLock(key, true, 0);
+ }
+
+ /**
+ * Based on the supplied param, acquires a global read(false) or write (true) lock.
+ */
+ protected void acquireGlobalLock(boolean exclusive) throws CacheLoaderException {
+ locks.aquireGlobalLock(exclusive, globalLockTimeoutMillis);
+ }
+
+ /**
+ * Based on the supplied param, releases a global read(false) or write (true) lock.
+ */
+ protected void releaseGlobalLock(boolean exclusive) {
+ locks.releaseGlobalLock(exclusive);
+ }
+
+ public final StoredEntry load(Object key) throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("load (" + key + ")");
+ String lockingKey = getLockFromKey(key);
+ lockForReading(lockingKey);
+ try {
+ return loadLockSafe(key, lockingKey);
+ } finally {
+ unlock(lockingKey);
+ if (log.isTraceEnabled()) log.trace("Exit load (" + key + ")");
+ }
+ }
+
+ public final Set<StoredEntry> loadAll() throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("loadAll()");
+ acquireGlobalLock(false);
+ try {
+ return loadAllLockSafe();
+ } finally {
+ releaseGlobalLock(false);
+ if (log.isTraceEnabled()) log.trace("Exit loadAll()");
+ }
+ }
+
+ public final void store(StoredEntry ed) throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("store(" + ed + ")");
+ if (ed == null) return;
+ if (ed.isExpired()) {
+ log.trace("Entry " + ed + " is expired! Not doing anything.");
+ return;
+ }
+
+ String keyHashCode = getLockFromKey(ed.getKey());
+ lockForWritting(keyHashCode);
+ try {
+ storeLockSafe(ed, keyHashCode);
+ } finally {
+ unlock(keyHashCode);
+ }
+ if (log.isTraceEnabled()) log.trace("exit store(" + ed + ")");
+ }
+
+ public final boolean remove(Object key) throws CacheLoaderException {
+ if (log.isTraceEnabled()) log.trace("remove(" + key + ")");
+ String keyHashCodeStr = getLockFromKey(key);
+ try {
+ lockForWritting(keyHashCodeStr);
+ return removeLockSafe(key, keyHashCodeStr);
+ } finally {
+ unlock(keyHashCodeStr);
+ if (log.isTraceEnabled()) log.trace("Exit remove(" + key + ")");
+ }
+ }
+
+ public final void fromStream(ObjectInput objectInput) throws CacheLoaderException {
+ try {
+ acquireGlobalLock(true);
+ // first clear all local state
+ clear();
+ fromStreamLockSafe(objectInput);
+ } finally {
+ releaseGlobalLock(true);
+ }
+ }
+
+ public void toStream(ObjectOutput objectOutput) throws CacheLoaderException {
+ try {
+ acquireGlobalLock(false);
+ toStreamLockSafe(objectOutput);
+ } finally {
+ releaseGlobalLock(false);
+ }
+ }
+
+ public final void clear() throws CacheLoaderException {
+ log.trace("Clearing store");
+ try {
+ acquireGlobalLock(true);
+ clearLockSafe();
+ } finally {
+ releaseGlobalLock(true);
+ }
+ }
+
+ public int getTotalLockCount() {
+ return locks.getTotalLockCount();
+ }
+
+ protected abstract void clearLockSafe() throws CacheLoaderException;
+
+ protected abstract Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException;
+
+ protected abstract void toStreamLockSafe(ObjectOutput oos) throws CacheLoaderException;
+
+ protected abstract void fromStreamLockSafe(ObjectInput ois) throws CacheLoaderException;
+
+ protected abstract boolean removeLockSafe(Object key, String lockingKey) throws CacheLoaderException;
+
+ protected abstract void storeLockSafe(StoredEntry ed, String lockingKey) throws CacheLoaderException;
+
+ protected abstract StoredEntry loadLockSafe(Object key, String lockingKey) throws CacheLoaderException;
+
+ protected abstract String getLockFromKey(Object key) throws CacheLoaderException;
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,35 @@
+package org.horizon.loader;
+
+/**
+ * Adds configuration support for {@link LockSupportCacheStore}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class LockSupportCacheStoreConfig extends AbstractCacheStoreConfig {
+ private int lockConcurrencyLevel = 2048;
+ private long lockAcquistionTimeout = 60000;
+
+ /**
+ * Returns number of threads expected to use this class concurrently.
+ */
+ public int getLockConcurrencyLevel() {
+ return lockConcurrencyLevel;
+ }
+
+ /**
+ * Sets number of threads expected to use this class concurrently.
+ */
+ public void setLockConcurrencyLevel(int lockConcurrencyLevel) {
+ testImmutability("lockConcurrencyLevel");
+ this.lockConcurrencyLevel = lockConcurrencyLevel;
+ }
+
+ public long getLockAcquistionTimeout() {
+ return lockAcquistionTimeout;
+ }
+
+ public void setLockAcquistionTimeout(long lockAcquistionTimeout) {
+ testImmutability("lockAcquistionTimeout");
+ this.lockAcquistionTimeout = lockAcquistionTimeout;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/LockSupportCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,247 +1,100 @@
package org.horizon.loader.bucket;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.horizon.Cache;
-import org.horizon.loader.AbstractCacheStore;
-import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.LockSupportCacheStore;
import org.horizon.loader.StoredEntry;
-import org.horizon.lock.StripedLock;
-import org.horizon.marshall.Marshaller;
-import org.horizon.util.concurrent.WithinThreadExecutor;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
/**
- * //TODO comment this
+ * Base class for cache store that want to use the 'buckets approach' for storing data.
+ * <p/>
+ * A hashing algorithm is used to map keys to buckets, and a bucket consists of a collection of key/value pairs.
+ * <p/>
+ * This approach, while adding an overhead of having to search buckets for keys, means that we can use any serializable
+ * object we like as keys and not just Strings or objects that translate to something meaningful for a store(e.g. file
+ * system).
+ * <p/>
*
* @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
-public abstract class BucketBasedCacheStore extends AbstractCacheStore {
+public abstract class BucketBasedCacheStore extends LockSupportCacheStore {
- private static Log log = LogFactory.getLog(BucketBasedCacheStore.class);
-
- private StripedLock bucketLocks;
- private BucketBasedCacheStoreConfig config;
- private long globalLockTimeoutMillis;
- private ExecutorService purgerService;
-
/**
- * This global lock guards against direct store access via clear() and the stream APIs. These three methods would
- * need exclusive (write) access to this lock while all others can use shared (read) access to this lock since other
- * methods will use finer grained bucket locks.
+ * Loads the bucket coresponding to the given key, and lookups the key within it. if the bucket is found and the key
+ * is expired, then it won't be returned.
+ *
+ * @param key the passed in key, from {@link super#load(Object)}
+ * @param lockingKey the hash of the key, as returned by {@link super#getLockFromKey(Object)}. This is present here
+ * in order to avoid hash recomputation.
*/
- private final ReentrantReadWriteLock globalLock = new ReentrantReadWriteLock();
-
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
- this.config = (BucketBasedCacheStoreConfig) config;
- }
-
- public void start() throws CacheLoaderException {
- bucketLocks = new StripedLock(config.getLockConcurrencyLevel());
- globalLockTimeoutMillis = config.getLockAcquistionTimeout();
- if (config.isPurgeSynchronously()) {
- purgerService = new WithinThreadExecutor();
+ protected StoredEntry loadLockSafe(Object key, String lockingKey) throws CacheLoaderException {
+ Bucket bucket = loadBucket(lockingKey);
+ if (bucket == null) return null;
+ StoredEntry se = bucket.getEntry(key);
+ if (se != null && se.isExpired()) {
+ return null;
} else {
- purgerService = Executors.newSingleThreadExecutor();
+ return se;
}
}
- public void stop() throws CacheLoaderException {
- purgerService.shutdownNow();
- }
-
- public StoredEntry load(Object key) throws CacheLoaderException {
- if (log.isTraceEnabled()) log.trace("Loading entry " + key);
- String keyHashCode = String.valueOf(key.hashCode());
- lockForReading(keyHashCode);
- Bucket bucket;
- try {
- bucket = loadBucket(keyHashCode);
- if (bucket == null) return null;
- StoredEntry se = bucket.getEntry(key);
- if (se != null && se.isExpired()) {
- bucket.removeEntry(key);
- saveBucket(bucket);
- return null;
- } else {
- return se;
- }
- } finally {
- unlock(keyHashCode);
+ /**
+ * Tries to find a bucket corresponding to storedEntry's key, and updates it with the storedEntry. If no bucket is
+ * found, a new one is created.
+ *
+ * @param lockingKey the hash of the key, as returned by {@link super#getLockFromKey(Object)}. This is present here
+ * in order to avoid hash recomputation.
+ */
+ protected void storeLockSafe(StoredEntry ed, String lockingKey) throws CacheLoaderException {
+ Bucket bucket = loadBucket(lockingKey);
+ if (bucket != null) {
+ bucket.addEntry(ed);
+ saveBucket(bucket);
+ } else {
+ bucket = new Bucket();
+ bucket.setBucketName(lockingKey);
+ bucket.addEntry(ed);
+ insertBucket(bucket);
}
}
- public void store(StoredEntry ed) throws CacheLoaderException {
- if (ed == null) return;
- if (ed.isExpired()) {
- log.trace("Entry " + ed + " is expired! Not doing anything.");
- return;
+ /**
+ * Lookups a bucket where the given key is stored. Then removes the StoredEntry having with gven key from there (if
+ * such a bucket exists).
+ *
+ * @param lockingKey the hash of the key, as returned by {@link super#getLockFromKey(Object)}. This is present here
+ * in order to avoid hash recomputation.
+ */
+ protected boolean removeLockSafe(Object key, String keyHashCodeStr) throws CacheLoaderException {
+ Bucket bucket = loadBucket(keyHashCodeStr);
+ if (bucket == null) {
+ return false;
+ } else {
+ boolean success = bucket.removeEntry(key);
+ if (success) saveBucket(bucket);
+ return success;
}
- if (log.isTraceEnabled()) log.trace("Storing entry " + ed);
- String keyHashCode = String.valueOf(ed.getKey().hashCode());
-
- lockForWritting(keyHashCode);
-
- try {
- Bucket bucket = loadBucket(keyHashCode);
- if (bucket != null) {
- bucket.addEntry(ed);
- saveBucket(bucket);
- } else {
- bucket = new Bucket();
- bucket.setBucketName(keyHashCode);
- bucket.addEntry(ed);
- insertBucket(bucket);
- }
- } finally {
- unlock(keyHashCode);
- }
}
- public boolean remove(Object key) throws CacheLoaderException {
- if (log.isTraceEnabled()) log.trace("Removing key " + key);
- String keyHashCodeStr = String.valueOf(key.hashCode());
- Bucket bucket;
- try {
- lockForWritting(keyHashCodeStr);
- bucket = loadBucket(keyHashCodeStr);
- if (bucket == null) {
- return false;
- } else {
- boolean success = bucket.removeEntry(key);
- if (success) saveBucket(bucket);
- return success;
- }
- } finally {
- unlock(keyHashCodeStr);
- }
+ /**
+ * For {@link org.horizon.loader.bucket.BucketBasedCacheStore}s the lock should be acquired at bucket level. So we're
+ * locking based on the hashCode of the key, as all keys having same hascode will be mapped to same bucket.
+ */
+ protected String getLockFromKey(Object key) {
+ return String.valueOf(key.hashCode());
}
- public void fromStream(ObjectInput inputStream) throws CacheLoaderException {
- try {
- // first clear all local state
- acquireGlobalLock(true);
- clear();
- fromStreamInternal(inputStream);
- } finally {
- releaseGlobalLock(true);
- }
- }
-
- public void toStream(ObjectOutput outputStream) throws CacheLoaderException {
- try {
- acquireGlobalLock(true);
- toStreamInternal(outputStream);
- } finally {
- releaseGlobalLock(true);
- }
- }
-
- public void clear() throws CacheLoaderException {
- log.trace("Clearing store");
- try {
- acquireGlobalLock(true);
- clearInternal();
- } finally {
- releaseGlobalLock(true);
- }
- }
-
- public void purgeExpired() {
- purgerService.execute(new Runnable() {
- public void run() {
- try {
- purgeInternal();
- } catch (CacheLoaderException e) {
- log.info("Problems encountered while purging expired", e);
- }
- }
- });
- }
-
- protected void unlock(String keyHashCode) {
- bucketLocks.releaseLock(keyHashCode);
- globalLock.readLock().unlock();
- }
-
- protected void lockForWritting(String keyHashCode) throws CacheLoaderException {
- try {
- globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.warn("Received interrupt signal while waiting for global lock aquisition");
- throw new CacheLoaderException(e);
- }
- bucketLocks.acquireLock(keyHashCode, true);
- }
-
- protected boolean immediateLockForWritting(String keyHashCode) throws CacheLoaderException {
- try {
- if (!globalLock.readLock().tryLock(0, TimeUnit.MILLISECONDS)) {
- return false;
- }
- } catch (InterruptedException e) {
- log.warn("Received interrupt signal while waiting for global lock aquisition");
- throw new CacheLoaderException(e);
- }
- return bucketLocks.acquireLock(keyHashCode, true, 0);
- }
-
- protected void lockForReading(String keyHashCode) throws CacheLoaderException {
- try {
- globalLock.readLock().tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS);
- } catch (InterruptedException e) {
- log.warn("Received interrupt signal while waiting for global lock aquisition");
- throw new CacheLoaderException(e);
- }
- bucketLocks.acquireLock(keyHashCode, false);
- }
-
- protected void acquireGlobalLock(boolean exclusive) throws CacheLoaderException {
- Lock l = exclusive ? globalLock.writeLock() : globalLock.readLock();
- try {
- if (!l.tryLock(globalLockTimeoutMillis, TimeUnit.MILLISECONDS))
- throw new CacheLoaderException("Timed out trying to acquire " + (exclusive ? "exclusive" : "shared") + " global lock after " + globalLockTimeoutMillis + " millis. Lock is " + l);
- } catch (InterruptedException e) {
- throw new CacheLoaderException(e);
- }
- }
-
- protected void releaseGlobalLock(boolean exclusive) {
- Lock lock = exclusive ? globalLock.writeLock() : globalLock.readLock();
- lock.unlock();
- }
-
- public int getGlobalLockCount() {
- return globalLock.getReadLockCount() + (globalLock.isWriteLocked() ? +1 : 0);
- }
-
- public int getBucketLockCount() {
- return bucketLocks.getTotalLockCount();
- }
-
protected abstract void insertBucket(Bucket bucket) throws CacheLoaderException;
/**
* This method assumes that the bucket is already persisted in the database.
+ *
+ * @throws CacheLoaderException if the bucket is not already present, or something happens while persisting.
*/
protected abstract void saveBucket(Bucket bucket) throws CacheLoaderException;
+ /**
+ * Loads the bucket from the store, base on the hashcode.
+ */
protected abstract Bucket loadBucket(String keyHashCode) throws CacheLoaderException;
-
- protected abstract void toStreamInternal(ObjectOutput oos) throws CacheLoaderException;
-
- protected abstract void fromStreamInternal(ObjectInput ois) throws CacheLoaderException;
-
- protected abstract void clearInternal() throws CacheLoaderException;
-
- protected abstract void purgeInternal() throws CacheLoaderException;
}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/bucket/BucketBasedCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,42 +0,0 @@
-package org.horizon.loader.bucket;
-
-import org.horizon.loader.AbstractCacheLoaderConfig;
-
-/**
- * // TODO: Mircea: Document this!
- *
- * @author
- */
-public class BucketBasedCacheStoreConfig extends AbstractCacheLoaderConfig {
-
- private int lockConcurrencyLevel = 2048;
- private long lockAcquistionTimeout = 60000;
- private boolean purgeSynchronously = false;
-
- public int getLockConcurrencyLevel() {
- return lockConcurrencyLevel;
- }
-
- public void setLockConcurrencyLevel(int lockConcurrencyLevel) {
- testImmutability("lockConcurrencyLevel");
- this.lockConcurrencyLevel = lockConcurrencyLevel;
- }
-
- public long getLockAcquistionTimeout() {
- return lockAcquistionTimeout;
- }
-
- public void setLockAcquistionTimeout(long lockAcquistionTimeout) {
- testImmutability("lockAcquistionTimeout");
- this.lockAcquistionTimeout = lockAcquistionTimeout;
- }
-
- public boolean isPurgeSynchronously() {
- return purgeSynchronously;
- }
-
- public void setPurgeSynchronously(boolean purgeSynchronously) {
- testImmutability("purgeSynchronously");
- this.purgeSynchronously = purgeSynchronously;
- }
-}
Modified: core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/decorators/AbstractDelegatingStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,7 +1,6 @@
package org.horizon.loader.decorators;
import org.horizon.Cache;
-import org.horizon.loader.AbstractCacheStore;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.CacheStore;
@@ -22,7 +21,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class AbstractDelegatingStore extends AbstractCacheStore {
+public class AbstractDelegatingStore implements CacheStore {
CacheStore delegate;
@@ -38,6 +37,10 @@
return delegate;
}
+ public void removeAll(Set<Object> keys) throws CacheLoaderException {
+ delegate.removeAll(keys);
+ }
+
public void store(StoredEntry ed) throws CacheLoaderException {
delegate.store(ed);
}
@@ -62,17 +65,14 @@
delegate.purgeExpired();
}
- @Override
public void commit(Transaction tx) throws CacheLoaderException {
delegate.commit(tx);
}
- @Override
public void rollback(Transaction tx) {
delegate.rollback(tx);
}
- @Override
public void prepare(List<? extends Modification> list, Transaction tx, boolean isOnePhase) throws CacheLoaderException {
delegate.prepare(list, tx, isOnePhase);
}
@@ -89,7 +89,6 @@
return delegate.loadAll();
}
- @Override
public boolean containsKey(Object key) throws CacheLoaderException {
return delegate.containsKey(key);
}
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -16,77 +16,58 @@
import java.util.Set;
/**
- * A filesystem-based implementation of a {@link org.horizon.loader.CacheStore}. This file store stores stuff in the
+ * A filesystem-based implementation of a {@link org.horizon.loader.bucket.BucketBasedCacheStore}. This file store stores stuff in the
* following format: <tt>/{location}/cache name/bucket_number.bucket</tt>
- * <p/>
- * A hashing algorithm is used to map keys to buckets, and a bucket consists of a collection of key/value pairs.
- * <p/>
- * This approach, while adding an overhead of having to search buckets for keys, means that we can use any serializable
- * object we like as keys and not just Strings or objects that translate to something meaningful on a file system. Also,
- * the implementation uses up to {@link Integer#MAX_VALUE} bucket files, which makes it very unlikely that cached
- * entries would have to share buckets provided the {@link Object#hashCode()} implementations of keys used is well
- * spread.
- * <p/>
- * Locking is based on a {@link org.horizon.lock.StripedLock}, and granularity is per-bucket to prevent file system
- * corruption with concurrent writes. You can tune the concurrency level of the striped lock (see the Javadocs of
- * StripedLock for details on what this is) by using the {@link org.horizon.loader.file.FileCacheStoreConfig#setLockConcurrencyLevel(int)}
- * setter.
- * <p/>
*
* @author Manik Surtani
+ * @author Mircea.Markus(a)jboss.com
* @since 1.0
*/
public class FileCacheStore extends BucketBasedCacheStore {
+
private static final Log log = LogFactory.getLog(FileCacheStore.class);
private int streamBufferSize;
-
- FileCacheStoreConfig cfg;
+ FileCacheStoreConfig config;
Cache cache;
- Marshaller m;
+ Marshaller marshaller;
File root;
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
super.init(config, cache, m);
- this.cfg = (FileCacheStoreConfig) config;
+ this.config = (FileCacheStoreConfig) config;
this.cache = cache;
- this.m = m;
+ this.marshaller = m;
}
- public Set<StoredEntry> loadAll() throws CacheLoaderException {
- log.trace("Loading all entries");
+ protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
Set<StoredEntry> result = new HashSet<StoredEntry>();
for (File bucketFile : root.listFiles()) {
String bucketName = bucketFile.getName();
- try {
- lockForReading(bucketName);
- Bucket bucket = loadBucket(bucketFile);
- if (bucket != null) {
- if (bucket.removeExpiredEntries()) {
- saveBucket(bucket);
- }
- result.addAll(bucket.getStoredEntries());
+ Bucket bucket = loadBucket(bucketFile);
+ if (bucket != null) {
+ if (bucket.removeExpiredEntries()) {
+ saveBucket(bucket);
}
- } finally {
- unlock(bucketName);
+ result.addAll(bucket.getStoredEntries());
}
}
return result;
}
- protected void fromStreamInternal(ObjectInput ois) throws CacheLoaderException {
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
try {
- int numFiles = ois.readInt();
+ int numFiles = objectInput.readInt();
byte[] buffer = new byte[streamBufferSize];
int bytesRead, totalBytesRead = 0;
for (int i = 0; i < numFiles; i++) {
- String fName = (String) ois.readObject();
- int numBytes = ois.readInt();
+ String fName = (String) objectInput.readObject();
+ int numBytes = objectInput.readInt();
FileOutputStream fos = new FileOutputStream(root.getAbsolutePath() + File.separator + fName);
BufferedOutputStream bos = new BufferedOutputStream(fos, streamBufferSize);
while (numBytes > totalBytesRead) {
- bytesRead = ois.read(buffer, 0, streamBufferSize);
+ bytesRead = objectInput.read(buffer, 0, streamBufferSize);
if (bytesRead == -1) break;
totalBytesRead += bytesRead;
bos.write(buffer, 0, bytesRead);
@@ -104,24 +85,24 @@
}
}
- protected void toStreamInternal(ObjectOutput oos) throws CacheLoaderException {
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
try {
File[] files = root.listFiles();
- oos.writeInt(files.length);
+ objectOutput.writeInt(files.length);
byte[] buffer = new byte[streamBufferSize];
for (File file : files) {
int bytesRead, totalBytesRead = 0;
FileInputStream fileInStream = new FileInputStream(file);
int sz = fileInStream.available();
BufferedInputStream bis = new BufferedInputStream(fileInStream);
- oos.writeObject(file.getName());
- oos.writeInt(sz);
+ objectOutput.writeObject(file.getName());
+ objectOutput.writeInt(sz);
while (sz > totalBytesRead) {
bytesRead = bis.read(buffer, 0, streamBufferSize);
if (bytesRead == -1) break;
totalBytesRead += bytesRead;
- oos.write(buffer, 0, bytesRead);
+ objectOutput.write(buffer, 0, bytesRead);
}
bis.close();
fileInStream.close();
@@ -131,7 +112,7 @@
}
}
- protected void clearInternal() throws CacheLoaderException {
+ protected void clearLockSafe() throws CacheLoaderException {
for (File f : root.listFiles()) {
if (!f.delete()) log.warn("Had problems removing file {0}", f);
}
@@ -148,6 +129,7 @@
protected Bucket loadBucket(File bucketFile) throws CacheLoaderException {
Bucket bucket = null;
if (bucketFile.exists()) {
+ if (log.isTraceEnabled()) log.trace("Found bucket file: '" + bucketFile + "'");
FileInputStream is = null;
ObjectInputStream ois = null;
try {
@@ -156,11 +138,11 @@
bucket = (Bucket) ois.readObject();
} catch (Exception e) {
String message = "Error while reading from file: " + bucketFile.getAbsoluteFile();
- log.error(message);
+ log.error(message, e);
throw new CacheLoaderException(message, e);
} finally {
- safeClose(is);
safeClose(ois);
+ safeClose(is);
}
}
if (bucket != null) {
@@ -177,6 +159,8 @@
File f = new File(root, b.getBucketName());
if (f.exists()) {
if (!f.delete()) log.warn("Had problems removing file {0}", f);
+ } else if (log.isTraceEnabled()) {
+ log.trace("Successfully deleted file: '" + f.getName() + "'");
}
if (!b.getEntries().isEmpty()) {
@@ -205,7 +189,7 @@
public void start() throws CacheLoaderException {
super.start();
- String location = cfg.getLocation();
+ String location = config.getLocation();
if (location == null || location.trim().length() == 0) location = "Horizon-FileCacheStore"; // use relative path!
location += File.separator + cache.getName();
root = new File(location);
@@ -213,7 +197,7 @@
if (!root.mkdirs())
throw new ConfigurationException("Directory " + root.getAbsolutePath() + " does not exist and cannot be created!");
}
- streamBufferSize = cfg.getStreamBufferSize();
+ streamBufferSize = config.getStreamBufferSize();
}
public Bucket loadBucketContainingKey(String key) throws CacheLoaderException {
Modified: core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/file/FileCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,6 +1,6 @@
package org.horizon.loader.file;
-import org.horizon.loader.bucket.BucketBasedCacheStoreConfig;
+import org.horizon.loader.LockSupportCacheStoreConfig;
/**
* Configures {@link org.horizon.loader.file.FileCacheStore}. This allows you to tune a number of characteristics of
@@ -20,7 +20,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class FileCacheStoreConfig extends BucketBasedCacheStoreConfig {
+public class FileCacheStoreConfig extends LockSupportCacheStoreConfig {
String location = "Horizon-FileCacheStore";
private int streamBufferSize = 8192;
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/ConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/ConnectionFactory.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/ConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,30 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.horizon.loader.CacheLoaderException;
-import org.horizon.util.Util;
-
-import java.sql.Connection;
-
-/**
- * // TODO: Mircea: Document this!
- *
- * @author
- */
-public abstract class ConnectionFactory {
-
- public static ConnectionFactory getConnectionFactory(JdbcCacheStoreConfig config) throws CacheLoaderException {
- try {
- return (ConnectionFactory) Util.getInstance(config.getConnectionFactoryClass());
- } catch (Exception e) {
- throw new CacheLoaderException(e);
- }
- }
-
- public abstract void start(JdbcCacheStoreConfig config) throws CacheLoaderException;
-
- public abstract void stop();
-
- public abstract Connection getConnection() throws CacheLoaderException;
-
- public abstract void releaseConnection(Connection conn);
-}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,445 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.horizon.Cache;
-import org.horizon.io.ByteBuffer;
-import org.horizon.loader.CacheLoaderConfig;
-import org.horizon.loader.CacheLoaderException;
-import org.horizon.loader.StoredEntry;
-import org.horizon.loader.bucket.Bucket;
-import org.horizon.loader.bucket.BucketBasedCacheStore;
-import org.horizon.marshall.Marshaller;
-
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.ObjectInput;
-import java.io.ObjectOutput;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.Set;
-
-/**
- * // TODO: Manik: Document this!
- *
- * @author Mircea.Markus(a)jboss.com
- */
-public class JdbcCacheStore extends BucketBasedCacheStore {
-
- private static final Log log = LogFactory.getLog(JdbcCacheStore.class);
- public final static String STREAM_DELIMITER = "__jdbcCacheLoader_done__";
-
- private JdbcCacheStoreConfig config;
- private ConnectionFactory connectionFactory;
- private Marshaller marshaller;
-
- public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
- if (log.isTraceEnabled())
- log.trace("Initializing JdbcCacheStore " + config);
- super.init(config, cache, m);
- this.config = (JdbcCacheStoreConfig) config;
- this.marshaller = m;
- }
-
- public void start() throws CacheLoaderException {
- super.start();
- this.connectionFactory = ConnectionFactory.getConnectionFactory(config);
- connectionFactory.start(config);
-
- //create table if needed
- if (config.isCreateTableOnStart()) {
- Connection conn = getConnection();
- try {
- TableManipulation tm = new TableManipulation(conn, config);
- if (!tm.tableExists()) {
- tm.createTable();
- }
- } finally {
- releaseConnection(conn);
- }
- }
- }
-
- public void stop() throws CacheLoaderException {
- if (config.isDropTableOnExit()) {
- Connection connection = getConnection();
- try {
- TableManipulation tm = new TableManipulation(connection, config);
- tm.dropTable();
- } finally {
- releaseConnection(connection);
- }
- }
- connectionFactory.stop();
- }
-
- protected void insertBucket(Bucket bucket) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- String sql = config.getInsertBucketSql();
- if (log.isTraceEnabled()) {
- log.trace("Running insertBucket. Sql: '" + sql + "', on bucket: " + bucket);
- }
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- ps.setString(1, bucket.getBucketName());
- ByteBuffer byteBuffer = marshall(bucket);
- ps.setBinaryStream(2, byteBuffer.getStream(), byteBuffer.getLength());
- ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
- int insertedRows = ps.executeUpdate();
- if (insertedRows != 1) {
- throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
- }
- } catch (SQLException ex) {
- logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
- } finally {
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected void saveBucket(Bucket bucket) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- String sql = config.getSaveBucketSql();
- if (log.isTraceEnabled()) {
- log.trace("Running saveBucket. Sql: '" + sql + "', on bucket: " + bucket);
- }
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- ByteBuffer buffer = marshall(bucket);
- ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
- ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
- ps.setString(3, bucket.getBucketName());
- int updatedRows = ps.executeUpdate();
- if (updatedRows != 1) {
- throw new CacheLoaderException("Unexpected update result: '" + updatedRows + "'. Expected values is 1");
- }
- } catch (SQLException e) {
- logAndThrow(e, "sql failure while updating bucket: " + bucket);
- } finally {
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected Bucket loadBucket(String keyHashCode) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- String sql = config.getLoadBucketSql();
- if (log.isTraceEnabled()) {
- log.trace("Running loadBucket. Sql: '" + sql + "', on key: " + keyHashCode);
- }
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- ps.setString(1, keyHashCode);
- rs = ps.executeQuery();
- if (!rs.next()) return null;
- String bucketName = rs.getString(1);
- InputStream inputStream = rs.getBinaryStream(2);
- Bucket bucket = unmarshall(inputStream);
- bucket.setBucketName(bucketName);//bucket name is volatile, so not persisted.
- return bucket;
- } catch (SQLException e) {
- String message = "sql failure while loading key: " + keyHashCode;
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- public Set<StoredEntry> loadAll() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- String sql = config.getLoadAllSql();
- if (log.isTraceEnabled()) {
- log.trace("Running loadAll. Sql: '" + sql + "'");
- }
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- rs = ps.executeQuery();
- Set<StoredEntry> result = new HashSet<StoredEntry>();
- while (rs.next()) {
- InputStream binaryStream = rs.getBinaryStream(1);
- Bucket bucket = unmarshall(binaryStream);
- result.addAll(bucket.getStoredEntries());
- }
- return result;
- } catch (SQLException e) {
- String message = "sql failure while loading key: ";
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected void fromStreamInternal(ObjectInput ois) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- conn = getConnection();
- String sql = config.getInsertBucketSql();
- ps = conn.prepareStatement(sql);
-
- int readBuckets = 0;
- int batchSize = 100;
- String bucketName = (String) ois.readObject();
- while (!bucketName.equals(STREAM_DELIMITER)) {
- Bucket bucket = (Bucket) ois.readObject();
- readBuckets++;
- ps.setString(1, bucketName);
- ByteBuffer buffer = marshall(bucket);
- ps.setBinaryStream(2, buffer.getStream(), buffer.getLength());
- ps.setLong(3, bucket.timestampOfFirstEntryToExpire());
- if (readBuckets % batchSize == 0) {
- ps.executeBatch();
- if (log.isTraceEnabled())
- log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
- } else {
- ps.addBatch();
- }
- bucketName = (String) ois.readObject();
- }
- if (readBuckets % batchSize != 0)
- ps.executeBatch();//flush the batch
- if (log.isTraceEnabled())
- log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
- } catch (IOException ex) {
- logAndThrow(ex, "I/O failure while integrating state into store");
- } catch (SQLException e) {
- logAndThrow(e, "SQL failure while integrating state into store");
- } catch (ClassNotFoundException e) {
- logAndThrow(e, "Unexpected failure while integrating state into store");
- } finally {
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected void toStreamInternal(ObjectOutput oos) throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- try {
- conn = getConnection();
- String sql = config.getLoadAllSql();
- ps = conn.prepareStatement(sql);
- rs = ps.executeQuery();
- rs.setFetchSize(100);
- while (rs.next()) {
- InputStream inputStream = rs.getBinaryStream(1);
- Bucket bucket = unmarshall(inputStream);
- String bucketName = rs.getString(2);
- oos.writeObject(bucketName);
- oos.writeObject(bucket);
- }
- oos.writeObject(STREAM_DELIMITER);
- } catch (SQLException ex) {
- logAndThrow(ex, "SQL failure while writing store's content to stream");
- }
- catch (IOException e) {
- logAndThrow(e, "IO failure while writing store's content to stream");
- } finally {
- JdbcUtil.safeClose(rs);
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected void clearInternal() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- try {
- String sql = config.getClearSql();
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- int result = ps.executeUpdate();
- if (log.isTraceEnabled())
- log.trace("Successfully removed " + result + " rows.");
- } catch (SQLException ex) {
- logAndThrow(ex, "Failed clearing JdbcCacheStore");
- } finally {
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- protected void purgeInternal() throws CacheLoaderException {
- Connection conn = null;
- PreparedStatement ps = null;
- ResultSet rs = null;
- Set<Bucket> expiredBuckets = new HashSet<Bucket>();
- final int batchSize = 100;
- try {
- String sql = config.getSelectExpiredBucketsSql();
- conn = getConnection();
- ps = conn.prepareStatement(sql);
- ps.setLong(1, System.currentTimeMillis());
- rs = ps.executeQuery();
- while (rs.next()) {
- String key = rs.getString(2);
- if (immediateLockForWritting(key)) {
- if (log.isTraceEnabled()) log.trace("Adding bucket keyed " + key + " for purging.");
- InputStream binaryStream = rs.getBinaryStream(1);
- Bucket bucket = unmarshall(binaryStream);
- bucket.setBucketName(key);
- expiredBuckets.add(bucket);
- } else {
- if (log.isTraceEnabled())
- log.trace("Could not acquire write lock for " + key + ", this won't be purged even though it has expired elements");
- }
- }
- } catch (SQLException ex) {
- //if something happens make sure buckets locks are being release
- releaseLocks(expiredBuckets);
- releaseConnection(conn);
- logAndThrow(ex, "Failed clearing JdbcCacheStore");
- } finally {
- JdbcUtil.safeClose(ps);
- JdbcUtil.safeClose(rs);
- }
-
- if (log.isTraceEnabled())
- log.trace("Found following buckets: " + expiredBuckets + " which are about to be expired");
-
- if (expiredBuckets.isEmpty()) return;
- Set<Bucket> emptyBuckets = new HashSet<Bucket>();
- //now update all the buckets in batch
- try {
- String sql = config.getSaveBucketSql();
- ps = conn.prepareStatement(sql);
- int updateCount = 0;
- Iterator<Bucket> it = expiredBuckets.iterator();
- while (it.hasNext()) {
- Bucket bucket = it.next();
- bucket.removeExpiredEntries();
- if (!bucket.isEmpty()) {
- ByteBuffer byteBuffer = marshall(bucket);
- ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
- ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
- ps.addBatch();
- updateCount++;
- if (updateCount % batchSize == 0) {
- ps.executeBatch();
- if (log.isTraceEnabled()) log.trace("Flushing batch, update count is: " + updateCount);
- }
- } else {
- it.remove();
- emptyBuckets.add(bucket);
- }
- }
- //flush the batch
- if (updateCount % batchSize != 0) {
- ps.executeBatch();
- }
- if (log.isTraceEnabled()) log.trace("Updated " + updateCount + " buckets.");
- } catch (SQLException ex) {
- //if something happens make sure buckets locks are being release
- releaseLocks(emptyBuckets);
- releaseConnection(conn);
- logAndThrow(ex, "Failed clearing JdbcCacheStore");
- } finally {
- //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
- releaseLocks(expiredBuckets);
- JdbcUtil.safeClose(ps);
- }
-
-
- if (log.isTraceEnabled()) log.trace("About to remove empty buckets " + emptyBuckets);
-
- if (emptyBuckets.isEmpty()) return;
- //then remove the empty buckets
- try {
- String sql = config.getDeleteBucketSql();
- ps = conn.prepareStatement(sql);
- int deletionCount = 0;
- for (Bucket bucket : emptyBuckets) {
- ps.setString(1, bucket.getBucketName());
- ps.addBatch();
- deletionCount++;
- if (deletionCount % batchSize == 0) {
- if (log.isTraceEnabled())
- log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
- ps.executeBatch();
- }
- }
- if (deletionCount % batchSize != 0) {
- int[] batchResult = ps.executeBatch();
- if (log.isTraceEnabled())
- log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
- }
- } catch (SQLException ex) {
- //if something happens make sure buckets locks are being release
- logAndThrow(ex, "Failed clearing JdbcCacheStore");
- } finally {
- releaseLocks(emptyBuckets);
- JdbcUtil.safeClose(ps);
- releaseConnection(conn);
- }
- }
-
- private void releaseLocks(Set<Bucket> expiredBucketKeys) throws CacheLoaderException {
- for (Bucket bucket : expiredBucketKeys) {
- unlock(bucket.getBucketName());
- }
- }
-
- public Class<? extends CacheLoaderConfig> getConfigurationClass() {
- return JdbcCacheStoreConfig.class;
- }
-
- private Connection getConnection() throws CacheLoaderException {
- return connectionFactory.getConnection();
- }
-
- private void releaseConnection(Connection conn) {
- if (conn != null)//connection might be null as we only release it in finally blocks
- connectionFactory.releaseConnection(conn);
- }
-
- private ByteBuffer marshall(Bucket bucket) throws CacheLoaderException {
- try {
- return marshaller.objectToBuffer(bucket);
- } catch (IOException e) {
- String message = "I/O failure while marshalling " + bucket;
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- }
- }
-
- private Bucket unmarshall(InputStream inputStream) throws CacheLoaderException {
- try {
- return (Bucket) marshaller.objectFromStream(inputStream);
- } catch (IOException e) {
- String message = "I/O error while unmarshalling from stram";
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- } catch (ClassNotFoundException e) {
- String message = "*UNEXPECTED* ClassNotFoundException. This should not happen as Bucket class exists";
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- }
- }
-
- private void logAndThrow(Exception e, String message) throws CacheLoaderException {
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- }
-}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,224 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.horizon.loader.bucket.BucketBasedCacheStoreConfig;
-
-/**
- * // TODO: Manik: Document this!
- *
- * @author Manik Surtani
- */
-public class JdbcCacheStoreConfig extends BucketBasedCacheStoreConfig {
-
- /*
- * following two params manage creation and destruction during start up/shutdown.
- */
- boolean createTableOnStart = true;
- boolean dropTableOnExit = false;
-
- private String connectionFactoryClass;
-
- /* required by SimpleConnectionFactory */
- private String connectionUrl;
- private String userName;
- private String password;
- private String driverClass;
-
- /* attributes defining the table where data will be persisted */
- private String tableName;
- private String primaryKey;
- private String keyColumnName;
- private String keyColumnType;
- private String dataColumnName;
- private String dataColumnType;
- private String timestampColumnName;
- private String timestampColumnType;
-
- /* cache for sql commands */
- private String insertBucketSql;
- private String saveBucketSql;
- private String loadBucketSql;
- private String loadAllSql;
- private String clearSql;
- private String selectExpiredBucketsSql;
- private String deleteBucketSql;
-
- public JdbcCacheStoreConfig() {
- className = JdbcCacheStore.class.getName();
- }
-
- public void setCreateTableOnStart(boolean createTableOnStart) {
- this.createTableOnStart = createTableOnStart;
- }
-
- public boolean isCreateTableOnStart() {
- return createTableOnStart;
- }
-
- public boolean isDropTableOnExit() {
- return dropTableOnExit;
- }
-
- public void setDropTableOnExit(boolean dropTableOnExit) {
- this.dropTableOnExit = dropTableOnExit;
- }
-
-
- public String getConnectionUrl() {
- return connectionUrl;
- }
-
- public void setConnectionUrl(String connectionUrl) {
- this.connectionUrl = connectionUrl;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public void setUserName(String userName) {
- this.userName = userName;
- }
-
- public String getPassword() {
- return password;
- }
-
- public void setPassword(String password) {
- this.password = password;
- }
-
- public String getDriverClass() {
- return driverClass;
- }
-
- public void setDriverClass(String driverClass) {
- this.driverClass = driverClass;
- }
-
- public String getConnectionFactoryClass() {
- return connectionFactoryClass;
- }
-
- public void setConnectionFactoryClass(String connectionFactoryClass) {
- this.connectionFactoryClass = connectionFactoryClass;
- }
-
- public void setTableName(String tableName) {
- this.tableName = tableName;
- }
-
- public String getTableName() {
- return tableName;
- }
-
- public String getPrimaryKey() {
- return primaryKey;
- }
-
- public void setPrimaryKey(String primaryKey) {
- this.primaryKey = primaryKey;
- }
-
- public String getKeyColumnName() {
- return keyColumnName;
- }
-
- public void setKeyColumnName(String keyColumnName) {
- this.keyColumnName = keyColumnName;
- }
-
- public String getKeyColumnType() {
- return keyColumnType;
- }
-
- public void setKeyColumnType(String keyColumnType) {
- this.keyColumnType = keyColumnType;
- }
-
- public String getDataColumnName() {
- return dataColumnName;
- }
-
- public void setDataColumnName(String dataColumnName) {
- this.dataColumnName = dataColumnName;
- }
-
- public String getDataColumnType() {
- return dataColumnType;
- }
-
- public void setDataColumnType(String dataColumnType) {
- this.dataColumnType = dataColumnType;
- }
-
- public String getTimestampColumnName() {
- return timestampColumnName;
- }
-
- public void setTimestampColumnName(String timestampColumnName) {
- this.timestampColumnName = timestampColumnName;
- }
-
- public String getTimestampColumnType() {
- return timestampColumnType;
- }
-
- public void setTimestampColumnType(String timestampColumnType) {
- this.timestampColumnType = timestampColumnType;
- }
-
- @Override
- public JdbcCacheStoreConfig clone() {
- //don't have to assign any variables as all are primitives, and cannot change
- return (JdbcCacheStoreConfig) super.clone();
- }
-
- public String getInsertBucketSql() {
- if (insertBucketSql == null) {
- insertBucketSql = "INSERT INTO " + tableName + " (" + keyColumnName + ", " + dataColumnName + ", " + timestampColumnName + ") VALUES(?,?,?)";
- }
- return insertBucketSql;
- }
-
- public String getSaveBucketSql() {
- if (saveBucketSql == null) {
- saveBucketSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? , " + timestampColumnName + "=? WHERE " + keyColumnName + " = ?";
- }
- return saveBucketSql;
- }
-
- public String getLoadBucketSql() {
- if (loadBucketSql == null) {
- loadBucketSql = "SELECT " + keyColumnName + ", " + dataColumnName + " FROM " + tableName + " WHERE " + keyColumnName + " = ?";
- }
- return loadBucketSql;
- }
-
- public String getDeleteBucketSql() {
- if (deleteBucketSql == null) {
- deleteBucketSql = "DELETE FROM " + tableName + " WHERE " + keyColumnName + " = ?";
- }
- return deleteBucketSql;
- }
-
- public String getLoadAllSql() {
- if (loadAllSql == null) {
- loadAllSql = "SELECT " + dataColumnName + "," + keyColumnName + " FROM " + tableName;
- }
- return loadAllSql;
- }
-
- public String getClearSql() {
- if (clearSql == null) {
- clearSql = "DELETE FROM " + tableName;
- }
- return clearSql;
- }
-
- public String getSelectExpiredBucketsSql() {
- if (selectExpiredBucketsSql == null) {
- selectExpiredBucketsSql = getLoadAllSql() + " WHERE " + timestampColumnName + "< ?";
- }
- return selectExpiredBucketsSql;
- }
-}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcUtil.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,16 +1,26 @@
package org.horizon.loader.jdbc;
+import org.horizon.io.ByteBuffer;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.IOException;
+import java.io.InputStream;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
/**
- * // TODO: Mircea: Document this!
+ * Contains common methods used by jdbc CacheStores.
*
- * @author
+ * @author Mircea.Markus(a)jboss.com
*/
public class JdbcUtil {
+
+ private static Log log = LogFactory.getLog(JdbcUtil.class);
public static void safeClose(Statement ps) {
if (ps != null) {
try {
@@ -40,4 +50,28 @@
}
}
}
+
+ public static ByteBuffer marshall(Marshaller marshaller, Object bucket) throws CacheLoaderException {
+ try {
+ return marshaller.objectToBuffer(bucket);
+ } catch (IOException e) {
+ String message = "I/O failure while marshalling " + bucket;
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ }
+
+ public static Object unmarshall(Marshaller marshaller, InputStream inputStream) throws CacheLoaderException {
+ try {
+ return marshaller.objectFromStream(inputStream);
+ } catch (IOException e) {
+ String message = "I/O error while unmarshalling from stram";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } catch (ClassNotFoundException e) {
+ String message = "*UNEXPECTED* ClassNotFoundException. This should not happen as Bucket class exists";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ }
}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/NonManagedConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/NonManagedConnectionFactory.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/NonManagedConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,64 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.horizon.loader.CacheLoaderException;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-/**
- * // TODO: Mircea: Document this!
- *
- * @author
- */
-public class NonManagedConnectionFactory extends ConnectionFactory {
-
- private static Log log = LogFactory.getLog(NonManagedConnectionFactory.class);
-
- private String connectionUrl;
- private String userName;
- private String password;
-
- public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
- loadDriver(config.getDriverClass());
- this.connectionUrl = config.getConnectionUrl();
- this.userName = config.getUserName();
- this.password = config.getPassword();
- }
-
- public void stop() {
- //do nothing
- }
-
- public Connection getConnection() throws CacheLoaderException {
- try {
- return DriverManager.getConnection(connectionUrl, userName, password);
- } catch (SQLException e) {
- throw new CacheLoaderException("Could not obtain a new connection", e);
- }
- }
-
- public void releaseConnection(Connection conn) {
- try {
- conn.close();
- } catch (SQLException e) {
- log.warn("Failure while closing the connection to the database ", e);
- }
- }
-
- private void loadDriver(String driverClass) throws CacheLoaderException {
- try {
- if (log.isTraceEnabled()) {
- log.trace("Attempting to load driver " + driverClass);
- }
- Class.forName(driverClass).newInstance();
- }
- catch (Throwable th) {
- String message = "Failed loading driver with class: '" + driverClass + "'";
- log.error(message, th);
- throw new CacheLoaderException(message, th);
- }
- }
-}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,101 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import com.mchange.v2.c3p0.ComboPooledDataSource;
-import com.mchange.v2.c3p0.DataSources;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.horizon.loader.CacheLoaderException;
-
-import java.beans.PropertyVetoException;
-import java.net.URL;
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.Properties;
-
-/**
- * Pooled connection factory based on C3P0. For a complete configuration reference, look <a
- * href="http://www.mchange.com/projects/c3p0/index.html#configuration">here</a>. The connection pool can be configured
- * in various ways, as described <a href="http://www.mchange.com/projects/c3p0/index.html#configuration_files">here</a>.
- * The simplest way is by having an <tt>c3p0.properties</tt> file in the classpath. If no such file is found, default,
- * hardcoded valus will be used.
- *
- * @author Mircea.Markus(a)jboss.com
- */
-public class PooledConnectionFactory extends ConnectionFactory {
-
- private static Log log = LogFactory.getLog(PooledConnectionFactory.class);
- private ComboPooledDataSource pooledDataSource;
-
- @Override
- public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
- logFileOverride();
- pooledDataSource = new ComboPooledDataSource();
- pooledDataSource.setProperties(new Properties());
- try {
- pooledDataSource.setDriverClass(config.getDriverClass()); //loads the jdbc driver
- } catch (PropertyVetoException e) {
- String message = "Error while instatianting JDBC driver: '" + config.getDriverClass();
- log.error(message, e);
- throw new CacheLoaderException(message, e);
- }
- pooledDataSource.setJdbcUrl(config.getConnectionUrl());
- pooledDataSource.setUser(config.getUserName());
- pooledDataSource.setPassword(config.getPassword());
- }
-
- private void logFileOverride() {
- URL propsUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0.properties");
- URL xmlUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0-config.xml");
- if (log.isInfoEnabled() && propsUrl != null) {
- log.info("Found 'c3p0.properties' in classpath: " + propsUrl);
- }
- if (log.isInfoEnabled() && xmlUrl != null) {
- log.info("Found 'c3p0-config.xml' in classpath: " + xmlUrl);
- }
- }
-
- @Override
- public void stop() {
- try {
- DataSources.destroy(pooledDataSource);
- if (log.isTraceEnabled()) {
- log.debug("Sucessfully stopped PooledConnectionFactory.");
- }
- }
- catch (SQLException sqle) {
- log.warn("Could not destroy C3P0 connection pool: " + pooledDataSource, sqle);
- }
- }
-
- @Override
- public Connection getConnection() throws CacheLoaderException {
- try {
- if (log.isTraceEnabled()) {
- log.trace("DataSource before checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
- log.trace("DataSource before checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
- }
- Connection connection = pooledDataSource.getConnection();
- if (log.isTraceEnabled()) {
- log.trace("DataSource after checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
- log.trace("DataSource after checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
- log.trace("Connection checked out: " + connection);
- }
- return connection;
- } catch (SQLException e) {
- throw new CacheLoaderException("Failed obtaining connection from PooledDataSource", e);
- }
- }
-
- @Override
- public void releaseConnection(Connection conn) {
- try {
- conn.close();
- } catch (SQLException e) {
- log.warn("Issues closing connection", e);
- }
- }
-
- ComboPooledDataSource getPooledDataSource() {
- return pooledDataSource;
- }
-}
Deleted: core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,79 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.horizon.loader.CacheLoaderException;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-
-/**
- * // TODO: Mircea: Document this!
- *
- * @author
- */
-public class SimpleConnectionFactory extends ConnectionFactory {
-
- private static Log log = LogFactory.getLog(SimpleConnectionFactory.class);
-
- private String connectionUrl;
- private String userName;
- private String password;
-
- public void start(JdbcCacheStoreConfig config) throws CacheLoaderException {
- loadDriver(config.getDriverClass());
- this.connectionUrl = config.getConnectionUrl();
- this.userName = config.getUserName();
- this.password = config.getPassword();
- }
-
- public void stop() {
- //do nothing
- }
-
- public Connection getConnection() throws CacheLoaderException {
- try {
- Connection connection = DriverManager.getConnection(connectionUrl, userName, password);
- if (connection == null)
- throw new CacheLoaderException("Received null connection from the DriverManager!");
- return connection;
- } catch (SQLException e) {
- throw new CacheLoaderException("Could not obtain a new connection", e);
- }
- }
-
- public void releaseConnection(Connection conn) {
- try {
- conn.close();
- } catch (SQLException e) {
- log.warn("Failure while closing the connection to the database ", e);
- }
- }
-
- private void loadDriver(String driverClass) throws CacheLoaderException {
- try {
- if (log.isTraceEnabled()) {
- log.trace("Attempting to load driver " + driverClass);
- }
- Class.forName(driverClass).newInstance();
- }
- catch (Throwable th) {
- String message = "Failed loading driver with class: '" + driverClass + "'";
- log.error(message, th);
- throw new CacheLoaderException(message, th);
- }
- }
-
- public String getConnectionUrl() {
- return connectionUrl;
- }
-
- public String getUserName() {
- return userName;
- }
-
- public String getPassword() {
- return password;
- }
-}
Modified: core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/TableManipulation.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -3,6 +3,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactory;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
@@ -12,27 +13,56 @@
import java.util.Locale;
/**
- * // TODO: Mircea: Document this!
+ * Contains all the logic of manipulating the table, including creating it if needed and access operations like
+ * inserting, selecting etc. Used by JDBC based cache loaders.
*
* @author Mircea.Markus(a)jboss.com
*/
-public class TableManipulation {
+public class TableManipulation implements Cloneable {
- private Connection connection;
- private JdbcCacheStoreConfig config;
private static Log log = LogFactory.getLog(TableManipulation.class);
- public TableManipulation(Connection connection, JdbcCacheStoreConfig config) {
- this.connection = connection;
- this.config = config;
+ private String idColumnName;
+ private String idColumnType;
+ private String tableName;
+ private String dataColumnName;
+ private String dataColumnType;
+ private String timestampColumnName;
+ private String timestampColumnType;
+
+ /*
+ * following two params manage creation and destruction during start up/shutdown.
+ */
+ boolean createTableOnStart = true;
+ boolean dropTableOnExit = false;
+ private ConnectionFactory connectionFactory;
+
+ /* Cache the sql for managing data */
+ private String insertRowSql;
+ private String updateRowSql;
+ private String selectRowSql;
+ private String deleteRowSql;
+ private String loadAllRowsSql;
+ private String deleteAllRows;
+ private String selectExpiredBucketsSql;
+ private String deleteExpiredBucketsSql;
+
+ public TableManipulation(String idColumnName, String idColumnType, String tableName, String dataColumnName,
+ String dataColumnType, String timestampColumnName, String timestampColumnType) {
+ this.idColumnName = idColumnName;
+ this.idColumnType = idColumnType;
+ this.tableName = tableName;
+ this.dataColumnName = dataColumnName;
+ this.dataColumnType = dataColumnType;
+ this.timestampColumnName = timestampColumnName;
+ this.timestampColumnType = timestampColumnType;
}
- public boolean tableExists() throws CacheLoaderException {
- return tableExists(config.getTableName());
+ public TableManipulation() {
}
- public boolean tableExists(String tableName) throws CacheLoaderException {
- assrtNotNull(config.getTableName(), "table name is mandatory");
+ public boolean tableExists(Connection connection, String tableName) throws CacheLoaderException {
+ assrtNotNull(tableName, "table name is mandatory");
ResultSet rs = null;
try {
// (a j2ee spec compatible jdbc driver has to fully
@@ -80,53 +110,52 @@
}
}
- public void createTable() throws CacheLoaderException {
+ public void createTable(Connection conn) throws CacheLoaderException {
// removed CONSTRAINT clause as this causes problems with some databases, like Informix.
assertMandatoryElemenetsPresent();
- String creatTableDdl = "CREATE TABLE " + config.getTableName() + "(" + config.getKeyColumnName() + " " + config.getKeyColumnType()
- + " NOT NULL, " + config.getDataColumnName() + " " + config.getDataColumnType() + ", "
- + config.getTimestampColumnName() + " " + config.getTimestampColumnType() +
- ", PRIMARY KEY (" + config.getKeyColumnName() + "))";
+ String creatTableDdl = "CREATE TABLE " + tableName + "(" + idColumnName + " " + idColumnType
+ + " NOT NULL, " + dataColumnName + " " + dataColumnType + ", "
+ + timestampColumnName + " " + timestampColumnType +
+ ", PRIMARY KEY (" + idColumnName + "))";
if (log.isTraceEnabled())
log.trace("Creating table with following DDL: '" + creatTableDdl + "'.");
- executeUpdateSql(creatTableDdl);
+ executeUpdateSql(conn, creatTableDdl);
}
private void assertMandatoryElemenetsPresent() throws CacheLoaderException {
- assrtNotNull(config.getKeyColumnType(), "keyColumnType needed in order to create table");
- assrtNotNull(config.getKeyColumnName(), "keyColumnName needed in order to create table");
- assrtNotNull(config.getTableName(), "tableName needed in order to create table");
- assrtNotNull(config.getDataColumnName(), "dataColumnName needed in order to create table");
- assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
- assrtNotNull(config.getDataColumnType(), "dataColumnType needed in order to create table");
- assrtNotNull(config.getTimestampColumnName(), "timestampColumnName needed in order to create table");
- assrtNotNull(config.getTimestampColumnType(), "timestampColumnType needed in order to create table");
+ assrtNotNull(idColumnType, "idColumnType needed in order to create table");
+ assrtNotNull(idColumnName, "idColumnName needed in order to create table");
+ assrtNotNull(tableName, "tableName needed in order to create table");
+ assrtNotNull(dataColumnName, "dataColumnName needed in order to create table");
+ assrtNotNull(dataColumnType, "dataColumnType needed in order to create table");
+ assrtNotNull(timestampColumnName, "timestampColumnName needed in order to create table");
+ assrtNotNull(timestampColumnType, "timestampColumnType needed in order to create table");
}
private void assrtNotNull(String keyColumnType, String message) throws CacheLoaderException {
if (keyColumnType == null || keyColumnType.trim().length() == 0) throw new CacheLoaderException(message);
}
- private void executeUpdateSql(String sql) throws CacheLoaderException {
+ private void executeUpdateSql(Connection conn, String sql) throws CacheLoaderException {
Statement statement = null;
try {
- statement = connection.createStatement();
+ statement = conn.createStatement();
statement.executeUpdate(sql);
} catch (SQLException e) {
- log.error("Error while creating table",e);
+ log.error("Error while creating table", e);
throw new CacheLoaderException(e);
} finally {
JdbcUtil.safeClose(statement);
}
}
- public void dropTable() throws CacheLoaderException {
- String dropTableDdl = "DROP TABLE " + config.getTableName();
- String clearTable = "DELETE FROM " + config.getTableName();
- executeUpdateSql(clearTable);
+ public void dropTable(Connection conn) throws CacheLoaderException {
+ String dropTableDdl = "DROP TABLE " + tableName;
+ String clearTable = "DELETE FROM " + tableName;
+ executeUpdateSql(conn, clearTable);
if (log.isTraceEnabled())
log.trace("Dropping table with following DDL '" + dropTableDdl + "\'");
- executeUpdateSql(dropTableDdl);
+ executeUpdateSql(conn, dropTableDdl);
}
private static String toLowerCase(String s) {
@@ -136,4 +165,146 @@
private static String toUpperCase(String s) {
return s.toUpperCase(Locale.ENGLISH);
}
+
+ public void setIdColumnName(String idColumnName) {
+ this.idColumnName = idColumnName;
+ }
+
+ public void setIdColumnType(String idColumnType) {
+ this.idColumnType = idColumnType;
+ }
+
+ public void setTableName(String tableName) {
+ this.tableName = tableName;
+ }
+
+ public void setDataColumnName(String dataColumnName) {
+ this.dataColumnName = dataColumnName;
+ }
+
+ public void setDataColumnType(String dataColumnType) {
+ this.dataColumnType = dataColumnType;
+ }
+
+ public void setTimestampColumnName(String timestampColumnName) {
+ this.timestampColumnName = timestampColumnName;
+ }
+
+ public void setTimestampColumnType(String timestampColumnType) {
+ this.timestampColumnType = timestampColumnType;
+ }
+
+ public boolean isCreateTableOnStart() {
+ return createTableOnStart;
+ }
+
+ public void setCreateTableOnStart(boolean createTableOnStart) {
+ this.createTableOnStart = createTableOnStart;
+ }
+
+ public boolean isDropTableOnExit() {
+ return dropTableOnExit;
+ }
+
+ public void setDropTableOnExit(boolean dropTableOnExit) {
+ this.dropTableOnExit = dropTableOnExit;
+ }
+
+ public void start(ConnectionFactory connectionFactory) throws CacheLoaderException {
+ this.connectionFactory = connectionFactory;
+ if (isCreateTableOnStart()) {
+ Connection conn = this.connectionFactory.getConnection();
+ try {
+ if (!tableExists(conn, tableName)) {
+ createTable(conn);
+ }
+ } finally {
+ this.connectionFactory.releaseConnection(conn);
+ }
+ }
+ }
+
+ public void stop() throws CacheLoaderException {
+ if (isDropTableOnExit()) {
+ Connection conn = connectionFactory.getConnection();
+ try {
+ dropTable(conn);
+ } finally {
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+ }
+
+ public String getInsertRowSql() {
+ if (insertRowSql == null) {
+ insertRowSql = "INSERT INTO " + tableName + " (" + dataColumnName + ", " + timestampColumnName + ", " + idColumnName + ") VALUES(?,?,?)";
+ }
+ return insertRowSql;
+ }
+
+ public String getUpdateRowSql() {
+ if (updateRowSql == null) {
+ updateRowSql = "UPDATE " + tableName + " SET " + dataColumnName + " = ? , " + timestampColumnName + "=? WHERE " + idColumnName + " = ?";
+ }
+ return updateRowSql;
+ }
+
+ public String getSelectRowSql() {
+ if (selectRowSql == null) {
+ selectRowSql = "SELECT " + idColumnName + ", " + dataColumnName + " FROM " + tableName + " WHERE " + idColumnName + " = ?";
+ }
+ return selectRowSql;
+ }
+
+ public String getDeleteRowSql() {
+ if (deleteRowSql == null) {
+ deleteRowSql = "DELETE FROM " + tableName + " WHERE " + idColumnName + " = ?";
+ }
+ return deleteRowSql;
+ }
+
+ public String getLoadAllRowsSql() {
+ if (loadAllRowsSql == null) {
+ loadAllRowsSql = "SELECT " + dataColumnName + "," + idColumnName + " FROM " + tableName;
+ }
+ return loadAllRowsSql;
+ }
+
+ public String getDeleteAllRowsSql() {
+ if (deleteAllRows == null) {
+ deleteAllRows = "DELETE FROM " + tableName;
+ }
+ return deleteAllRows;
+ }
+
+ public String getSelectExpiredRowsSql() {
+ if (selectExpiredBucketsSql == null) {
+ selectExpiredBucketsSql = getLoadAllRowsSql() + " WHERE " + timestampColumnName + "< ?";
+ }
+ return selectExpiredBucketsSql;
+ }
+
+ public String getDeleteExpiredRowsSql() {
+ if (deleteExpiredBucketsSql == null) {
+ deleteExpiredBucketsSql = "DELETE FROM" + tableName + " WHERE " + timestampColumnName + "< ?";
+ }
+ return deleteExpiredBucketsSql;
+ }
+
+ @Override
+ public TableManipulation clone() {
+ try {
+ return (TableManipulation) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+
+ public String getTableName() {
+ return tableName;
+ }
+
+ public boolean tableExists(Connection connection) throws CacheLoaderException {
+ return tableExists(connection, tableName);
+ }
}
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStore.java (from rev 7791, core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,407 @@
+package org.horizon.loader.jdbc.binary;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.Cache;
+import org.horizon.io.ByteBuffer;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.bucket.Bucket;
+import org.horizon.loader.bucket.BucketBasedCacheStore;
+import org.horizon.loader.jdbc.JdbcUtil;
+import org.horizon.loader.jdbc.TableManipulation;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * {@link BucketBasedCacheStore} implementation that will store all the buckets as rows in database, each row
+ * coresponding to a bucket. This is in contrast to {@link org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore}
+ * which stores each StoredEntry as an row in the database.
+ * <p/>
+ * This class has the benefit of being able to store StoredEntries that do not have String keys, at the cost of coarser
+ * grained access granularity, and inherently performance.
+ * <p/>
+ * All the DB releated configurations are described in {@link org.horizon.loader.jdbc.binary.JdbcBinaryCacheStoreConfig}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @see JdbcBinaryCacheStoreConfig
+ * @see org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore
+ */
+public class JdbcBinaryCacheStore extends BucketBasedCacheStore {
+
+ private static final Log log = LogFactory.getLog(JdbcBinaryCacheStore.class);
+ private final static String BINARY_STREAM_DELIMITER = "__JdbcBinaryCacheStore_done__";
+
+ private JdbcBinaryCacheStoreConfig config;
+ private ConnectionFactory connectionFactory;
+ private Marshaller marshaller;
+ private TableManipulation tableManipulation;
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ if (log.isTraceEnabled())
+ log.trace("Initializing JdbcBinaryCacheStore " + config);
+ super.init(config, cache, m);
+ this.config = (JdbcBinaryCacheStoreConfig) config;
+ this.marshaller = m;
+ }
+
+ public void start() throws CacheLoaderException {
+ super.start();
+ String connectionFactoryClass = config.getConnectionFactoryConfig().getConnectionFactoryClass();
+ this.connectionFactory = ConnectionFactory.getConnectionFactory(connectionFactoryClass);
+ connectionFactory.start(config.getConnectionFactoryConfig());
+ tableManipulation = config.getTableManipulation();
+ tableManipulation.start(connectionFactory);
+ }
+
+ public void stop() throws CacheLoaderException {
+ tableManipulation.stop();
+ connectionFactory.stop();
+ }
+
+ protected void insertBucket(Bucket bucket) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getInsertRowSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running insertBucket. Sql: '" + sql + "', on bucket: " + bucket);
+ }
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ ByteBuffer byteBuffer = JdbcUtil.marshall(marshaller, bucket);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, bucket.getBucketName());
+ int insertedRows = ps.executeUpdate();
+ if (insertedRows != 1) {
+ throw new CacheLoaderException("Unexpected insert result: '" + insertedRows + "'. Expected values is 1");
+ }
+ } catch (SQLException ex) {
+ logAndThrow(ex, "sql failure while inserting bucket: " + bucket);
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void saveBucket(Bucket bucket) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getUpdateRowSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running saveBucket. Sql: '" + sql + "', on bucket: " + bucket);
+ }
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ ByteBuffer buffer = JdbcUtil.marshall(marshaller, bucket);
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, bucket.getBucketName());
+ int updatedRows = ps.executeUpdate();
+ if (updatedRows != 1) {
+ throw new CacheLoaderException("Unexpected update result: '" + updatedRows + "'. Expected values is 1");
+ }
+ } catch (SQLException e) {
+ logAndThrow(e, "sql failure while updating bucket: " + bucket);
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected Bucket loadBucket(String keyHashCode) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getSelectRowSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running loadBucket. Sql: '" + sql + "', on key: " + keyHashCode);
+ }
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, keyHashCode);
+ rs = ps.executeQuery();
+ if (!rs.next()) return null;
+ String bucketName = rs.getString(1);
+ InputStream inputStream = rs.getBinaryStream(2);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(marshaller, inputStream);
+ bucket.setBucketName(bucketName);//bucket name is volatile, so not persisted.
+ return bucket;
+ } catch (SQLException e) {
+ String message = "sql failure while loading key: " + keyHashCode;
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ public Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getLoadAllRowsSql();
+ if (log.isTraceEnabled()) {
+ log.trace("Running loadAll. Sql: '" + sql + "'");
+ }
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ while (rs.next()) {
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(marshaller, binaryStream);
+ result.addAll(bucket.getStoredEntries());
+ }
+ return result;
+ } catch (SQLException e) {
+ String message = "sql failure while loading key: ";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = connectionFactory.getConnection();
+ String sql = tableManipulation.getInsertRowSql();
+ ps = conn.prepareStatement(sql);
+
+ int readBuckets = 0;
+ int batchSize = 100;
+ String bucketName = (String) objectInput.readObject();
+ while (!bucketName.equals(BINARY_STREAM_DELIMITER)) {
+ Bucket bucket = (Bucket) objectInput.readObject();
+ readBuckets++;
+ ByteBuffer buffer = JdbcUtil.marshall(marshaller, bucket);
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.setString(3, bucketName);
+ if (readBuckets % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Executing batch " + (readBuckets / batchSize) + ", batch size is " + batchSize);
+ } else {
+ ps.addBatch();
+ }
+ bucketName = (String) objectInput.readObject();
+ }
+ if (readBuckets % batchSize != 0)
+ ps.executeBatch();//flush the batch
+ if (log.isTraceEnabled())
+ log.trace("Successfully inserted " + readBuckets + " buckets into the database, batch size is " + batchSize);
+ } catch (IOException ex) {
+ logAndThrow(ex, "I/O failure while integrating state into store");
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL failure while integrating state into store");
+ } catch (ClassNotFoundException e) {
+ logAndThrow(e, "Unexpected failure while integrating state into store");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ conn = connectionFactory.getConnection();
+ String sql = tableManipulation.getLoadAllRowsSql();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(100);
+ while (rs.next()) {
+ InputStream inputStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(marshaller, inputStream);
+ String bucketName = rs.getString(2);
+ objectOutput.writeObject(bucketName);
+ objectOutput.writeObject(bucket);
+ }
+ objectOutput.writeObject(BINARY_STREAM_DELIMITER);
+ } catch (SQLException ex) {
+ logAndThrow(ex, "SQL failure while writing store's content to stream");
+ }
+ catch (IOException e) {
+ logAndThrow(e, "IO failure while writing store's content to stream");
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void clearLockSafe() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getDeleteAllRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully removed " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void purgeInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ Set<Bucket> expiredBuckets = new HashSet<Bucket>();
+ final int batchSize = 100;
+ try {
+ String sql = tableManipulation.getSelectExpiredRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setLong(1, System.currentTimeMillis());
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ String key = rs.getString(2);
+ if (immediateLockForWritting(key)) {
+ if (log.isTraceEnabled()) log.trace("Adding bucket keyed " + key + " for purging.");
+ InputStream binaryStream = rs.getBinaryStream(1);
+ Bucket bucket = (Bucket) JdbcUtil.unmarshall(marshaller, binaryStream);
+ bucket.setBucketName(key);
+ expiredBuckets.add(bucket);
+ } else {
+ if (log.isTraceEnabled())
+ log.trace("Could not acquire write lock for " + key + ", this won't be purged even though it has expired elements");
+ }
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(expiredBuckets);
+ connectionFactory.releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ JdbcUtil.safeClose(rs);
+ }
+
+ if (log.isTraceEnabled())
+ log.trace("Found following buckets: " + expiredBuckets + " which are about to be expired");
+
+ if (expiredBuckets.isEmpty()) return;
+ Set<Bucket> emptyBuckets = new HashSet<Bucket>();
+ //now update all the buckets in batch
+ try {
+ String sql = tableManipulation.getUpdateRowSql();
+ ps = conn.prepareStatement(sql);
+ int updateCount = 0;
+ Iterator<Bucket> it = expiredBuckets.iterator();
+ while (it.hasNext()) {
+ Bucket bucket = it.next();
+ bucket.removeExpiredEntries();
+ if (!bucket.isEmpty()) {
+ ByteBuffer byteBuffer = JdbcUtil.marshall(marshaller, bucket);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(2, bucket.timestampOfFirstEntryToExpire());
+ ps.addBatch();
+ updateCount++;
+ if (updateCount % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled()) log.trace("Flushing batch, update count is: " + updateCount);
+ }
+ } else {
+ it.remove();
+ emptyBuckets.add(bucket);
+ }
+ }
+ //flush the batch
+ if (updateCount % batchSize != 0) {
+ ps.executeBatch();
+ }
+ if (log.isTraceEnabled()) log.trace("Updated " + updateCount + " buckets.");
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ releaseLocks(emptyBuckets);
+ connectionFactory.releaseConnection(conn);
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ //release locks for the updated buckets.This won't include empty buckets, as these were migrated to emptyBuckets
+ releaseLocks(expiredBuckets);
+ JdbcUtil.safeClose(ps);
+ }
+
+
+ if (log.isTraceEnabled()) log.trace("About to remove empty buckets " + emptyBuckets);
+
+ if (emptyBuckets.isEmpty()) return;
+ //then remove the empty buckets
+ try {
+ String sql = tableManipulation.getDeleteRowSql();
+ ps = conn.prepareStatement(sql);
+ int deletionCount = 0;
+ for (Bucket bucket : emptyBuckets) {
+ ps.setString(1, bucket.getBucketName());
+ ps.addBatch();
+ deletionCount++;
+ if (deletionCount % batchSize == 0) {
+ if (log.isTraceEnabled())
+ log.trace("Flushing deletion batch, total deletion count so far is " + deletionCount);
+ ps.executeBatch();
+ }
+ }
+ if (deletionCount % batchSize != 0) {
+ int[] batchResult = ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Flushed the batch and received following results: " + Arrays.toString(batchResult));
+ }
+ } catch (SQLException ex) {
+ //if something happens make sure buckets locks are being release
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ releaseLocks(emptyBuckets);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ private void releaseLocks(Set<Bucket> expiredBucketKeys) throws CacheLoaderException {
+ for (Bucket bucket : expiredBucketKeys) {
+ this.unlock(bucket.getBucketName());
+ }
+ }
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return JdbcBinaryCacheStoreConfig.class;
+ }
+
+ protected void logAndThrow(Exception e, String message) throws CacheLoaderException {
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStoreConfig.java (from rev 7791, core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,134 @@
+package org.horizon.loader.jdbc.binary;
+
+import org.horizon.loader.LockSupportCacheStoreConfig;
+import org.horizon.loader.jdbc.TableManipulation;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+
+/**
+ * Defines available configuration elements for {@link org.horizon.loader.jdbc.binary.JdbcBinaryCacheStore}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class JdbcBinaryCacheStoreConfig extends LockSupportCacheStoreConfig {
+
+ private ConnectionFactoryConfig connectionFactoryConfig = new ConnectionFactoryConfig();
+ private TableManipulation tableManipulation = new TableManipulation();
+
+ public JdbcBinaryCacheStoreConfig(ConnectionFactoryConfig connectionFactoryConfig, TableManipulation tm) {
+ this.connectionFactoryConfig = connectionFactoryConfig;
+ this.tableManipulation = tm;
+ }
+
+ public JdbcBinaryCacheStoreConfig() {
+ className = JdbcBinaryCacheStore.class.getName();
+ }
+
+ /**
+ * If true, and the table is missing it will be created when starting the cache store. Default to <tt>true</tt>.
+ */
+ public void setCreateTableOnStart(boolean createTableOnStart) {
+ testImmutability("tableManipulation");
+ tableManipulation.setCreateTableOnStart(createTableOnStart);
+ }
+
+ /**
+ * If true, the table will be created when cache store is stopped. Default to <tt>false</tt>.
+ */
+ public void setDropTableOnExit(boolean dropTableOnExit) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setDropTableOnExit(dropTableOnExit);
+ }
+
+ public void setBucketTableName(String bucketTableName) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setTableName(bucketTableName);
+ }
+
+ public void setIdColumnName(String idColumnName) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setIdColumnName(idColumnName);
+ }
+
+ public void setIdColumnType(String idColumnType) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setIdColumnType(idColumnType);
+ }
+
+ public void setDataColumnName(String dataColumnName) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setDataColumnName(dataColumnName);
+ }
+
+ public void setDataColumnType(String dataColumnType) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setDataColumnType(dataColumnType);
+ }
+
+ public void setTimestampColumnName(String timestampColumnName) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setTimestampColumnName(timestampColumnName);
+ }
+
+ public void setTimestampColumnType(String timestampColumnType) {
+ testImmutability("tableManipulation");
+ this.tableManipulation.setTimestampColumnType(timestampColumnType);
+ }
+
+ /**
+ * Url connection to the database.
+ */
+ public void setConnectionUrl(String connectionUrl) {
+ testImmutability("connectionFactoryConfig");
+ this.connectionFactoryConfig.setConnectionUrl(connectionUrl);
+ }
+
+ /**
+ * Databse user name.
+ */
+ public void setUserName(String userName) {
+ testImmutability("connectionFactoryConfig");
+ this.connectionFactoryConfig.setUserName(userName);
+ }
+
+ /**
+ * Database username's password.
+ */
+ public void setPassword(String password) {
+ testImmutability("connectionFactoryConfig");
+ this.connectionFactoryConfig.setPassword(password);
+ }
+
+ /**
+ * Driver class, will be loaded before initializing the {@link org.horizon.loader.jdbc.connectionfactory.ConnectionFactory}
+ */
+ public void setDriverClass(String driverClass) {
+ testImmutability("connectionFactoryConfig");
+ this.connectionFactoryConfig.setDriverClass(driverClass);
+ }
+
+ /**
+ * Name of the connection factory class.
+ * @see org.horizon.loader.jdbc.connectionfactory.ConnectionFactory
+ */
+ public void setConnectionFactoryClass(String connectionFactoryClass) {
+ testImmutability("connectionFactoryConfig");
+ this.connectionFactoryConfig.setConnectionFactoryClass(connectionFactoryClass);
+ }
+
+
+ @Override
+ public JdbcBinaryCacheStoreConfig clone() {
+ JdbcBinaryCacheStoreConfig result = (JdbcBinaryCacheStoreConfig) super.clone();
+ result.connectionFactoryConfig = connectionFactoryConfig.clone();
+ result.tableManipulation = tableManipulation.clone();
+ return result;
+ }
+
+ public ConnectionFactoryConfig getConnectionFactoryConfig() {
+ return connectionFactoryConfig;
+ }
+
+ public TableManipulation getTableManipulation() {
+ return tableManipulation;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/binary/JdbcBinaryCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactory.java (from rev 7790, core/branches/flat/src/main/java/org/horizon/loader/jdbc/ConnectionFactory.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,45 @@
+package org.horizon.loader.jdbc.connectionfactory;
+
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.util.Util;
+
+import java.sql.Connection;
+
+/**
+ * Defines the functionality a connection factory should implement.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public abstract class ConnectionFactory {
+
+ /**
+ * Constructs a {@link org.horizon.loader.jdbc.connectionfactory.ConnectionFactory} based on the supplied class name.
+ */
+ public static ConnectionFactory getConnectionFactory(String connectionFactoryClass) throws CacheLoaderException {
+ try {
+ return (ConnectionFactory) Util.getInstance(connectionFactoryClass);
+ } catch (Exception e) {
+ throw new CacheLoaderException(e);
+ }
+ }
+
+ /**
+ * Starts the connection factory. A pooled factory might be create connections here.
+ */
+ public abstract void start(ConnectionFactoryConfig config) throws CacheLoaderException;
+
+ /**
+ * Closes the connection factory, including all alocated connections etc.
+ */
+ public abstract void stop();
+
+ /**
+ * Fetches a connection from the factory.
+ */
+ public abstract Connection getConnection() throws CacheLoaderException;
+
+ /**
+ * Destroys a connection. Important: null might be passed in, as an valid argument.
+ */
+ public abstract void releaseConnection(Connection conn);
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactoryConfig.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactoryConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactoryConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,76 @@
+package org.horizon.loader.jdbc.connectionfactory;
+
+/**
+ * Contains configuration elements for a {@link org.horizon.loader.jdbc.connectionfactory.ConnectionFactory}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class ConnectionFactoryConfig implements Cloneable {
+
+ private String connectionFactoryClass;
+ private String driverClass;
+ private String connectionUrl;
+ private String userName;
+ private String password;
+
+ public ConnectionFactoryConfig(String connectionFactoryClass, String driverClass, String connectionUrl,
+ String userName, String password) {
+ this.connectionFactoryClass = connectionFactoryClass;
+ this.driverClass = driverClass;
+ this.connectionUrl = connectionUrl;
+ this.userName = userName;
+ this.password = password;
+ }
+
+ public ConnectionFactoryConfig() {
+ }
+
+ public String getDriverClass() {
+ return driverClass;
+ }
+
+ public String getConnectionUrl() {
+ return connectionUrl;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setDriverClass(String driverClass) {
+ this.driverClass = driverClass;
+ }
+
+ public void setConnectionUrl(String connectionUrl) {
+ this.connectionUrl = connectionUrl;
+ }
+
+ public void setUserName(String userName) {
+ this.userName = userName;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public void setConnectionFactoryClass(String connectionFactoryClass) {
+ this.connectionFactoryClass = connectionFactoryClass;
+ }
+
+ public String getConnectionFactoryClass() {
+ return connectionFactoryClass;
+ }
+
+ @Override
+ public ConnectionFactoryConfig clone() {
+ try {
+ return (ConnectionFactoryConfig) super.clone();
+ } catch (CloneNotSupportedException e) {
+ throw new IllegalStateException(e);
+ }
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/ConnectionFactoryConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/PooledConnectionFactory.java (from rev 7792, core/branches/flat/src/main/java/org/horizon/loader/jdbc/PooledConnectionFactory.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/PooledConnectionFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/PooledConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,98 @@
+package org.horizon.loader.jdbc.connectionfactory;
+
+import com.mchange.v2.c3p0.ComboPooledDataSource;
+import com.mchange.v2.c3p0.DataSources;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.jdbc.JdbcUtil;
+
+import java.beans.PropertyVetoException;
+import java.net.URL;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.util.Properties;
+
+/**
+ * Pooled connection factory based on C3P0. For a complete configuration reference, look <a
+ * href="http://www.mchange.com/projects/c3p0/index.html#configuration">here</a>. The connection pool can be configured
+ * in various ways, as described <a href="http://www.mchange.com/projects/c3p0/index.html#configuration_files">here</a>.
+ * The simplest way is by having an <tt>c3p0.properties</tt> file in the classpath. If no such file is found, default,
+ * hardcoded valus will be used.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class PooledConnectionFactory extends ConnectionFactory {
+
+ private static Log log = LogFactory.getLog(PooledConnectionFactory.class);
+ private ComboPooledDataSource pooledDataSource;
+
+ @Override
+ public void start(ConnectionFactoryConfig config) throws CacheLoaderException {
+ logFileOverride();
+ pooledDataSource = new ComboPooledDataSource();
+ pooledDataSource.setProperties(new Properties());
+ try {
+ pooledDataSource.setDriverClass(config.getDriverClass()); //loads the jdbc driver
+ } catch (PropertyVetoException e) {
+ String message = "Error while instatianting JDBC driver: '" + config.getDriverClass();
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+ pooledDataSource.setJdbcUrl(config.getConnectionUrl());
+ pooledDataSource.setUser(config.getUserName());
+ pooledDataSource.setPassword(config.getPassword());
+ }
+
+ private void logFileOverride() {
+ URL propsUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0.properties");
+ URL xmlUrl = Thread.currentThread().getContextClassLoader().getResource("c3p0-config.xml");
+ if (log.isInfoEnabled() && propsUrl != null) {
+ log.info("Found 'c3p0.properties' in classpath: " + propsUrl);
+ }
+ if (log.isInfoEnabled() && xmlUrl != null) {
+ log.info("Found 'c3p0-config.xml' in classpath: " + xmlUrl);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ DataSources.destroy(pooledDataSource);
+ if (log.isTraceEnabled()) {
+ log.debug("Sucessfully stopped PooledConnectionFactory.");
+ }
+ }
+ catch (SQLException sqle) {
+ log.warn("Could not destroy C3P0 connection pool: " + pooledDataSource, sqle);
+ }
+ }
+
+ @Override
+ public Connection getConnection() throws CacheLoaderException {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("DataSource before checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
+ log.trace("DataSource before checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
+ }
+ Connection connection = pooledDataSource.getConnection();
+ if (log.isTraceEnabled()) {
+ log.trace("DataSource after checkout (NumBusyConnectionsAllUsers) : " + pooledDataSource.getNumBusyConnectionsAllUsers());
+ log.trace("DataSource after checkout (NumConnectionsAllUsers) : " + pooledDataSource.getNumConnectionsAllUsers());
+ log.trace("Connection checked out: " + connection);
+ }
+ return connection;
+ } catch (SQLException e) {
+ throw new CacheLoaderException("Failed obtaining connection from PooledDataSource", e);
+ }
+ }
+
+ @Override
+ public void releaseConnection(Connection conn) {
+ JdbcUtil.safeClose(conn);
+ }
+
+ public ComboPooledDataSource getPooledDataSource() {
+ return pooledDataSource;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/PooledConnectionFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/SimpleConnectionFactory.java (from rev 7792, core/branches/flat/src/main/java/org/horizon/loader/jdbc/SimpleConnectionFactory.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/SimpleConnectionFactory.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/SimpleConnectionFactory.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,81 @@
+package org.horizon.loader.jdbc.connectionfactory;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.loader.CacheLoaderException;
+
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+/**
+ * Connection factory implementation that will create database connection on a per invocation basis.
+ * Not recommanded in production, {@link org.horizon.loader.jdbc.connectionfactory.PooledConnectionFactory} should rather
+ * be used.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class SimpleConnectionFactory extends ConnectionFactory {
+
+ private static Log log = LogFactory.getLog(SimpleConnectionFactory.class);
+
+ private String connectionUrl;
+ private String userName;
+ private String password;
+
+ public void start(ConnectionFactoryConfig config) throws CacheLoaderException {
+ loadDriver(config.getDriverClass());
+ this.connectionUrl = config.getConnectionUrl();
+ this.userName = config.getUserName();
+ this.password = config.getPassword();
+ }
+
+ public void stop() {
+ //do nothing
+ }
+
+ public Connection getConnection() throws CacheLoaderException {
+ try {
+ Connection connection = DriverManager.getConnection(connectionUrl, userName, password);
+ if (connection == null)
+ throw new CacheLoaderException("Received null connection from the DriverManager!");
+ return connection;
+ } catch (SQLException e) {
+ throw new CacheLoaderException("Could not obtain a new connection", e);
+ }
+ }
+
+ public void releaseConnection(Connection conn) {
+ try {
+ conn.close();
+ } catch (SQLException e) {
+ log.warn("Failure while closing the connection to the database ", e);
+ }
+ }
+
+ private void loadDriver(String driverClass) throws CacheLoaderException {
+ try {
+ if (log.isTraceEnabled()) {
+ log.trace("Attempting to load driver " + driverClass);
+ }
+ Class.forName(driverClass).newInstance();
+ }
+ catch (Throwable th) {
+ String message = "Failed loading driver with class: '" + driverClass + "'";
+ log.error(message, th);
+ throw new CacheLoaderException(message, th);
+ }
+ }
+
+ public String getConnectionUrl() {
+ return connectionUrl;
+ }
+
+ public String getUserName() {
+ return userName;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/connectionfactory/SimpleConnectionFactory.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/DefaultKey2StringMapper.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/DefaultKey2StringMapper.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/DefaultKey2StringMapper.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,35 @@
+package org.horizon.loader.jdbc.stringbased;
+
+/**
+ * Default implementation for {@link org.horizon.loader.jdbc.stringbased.Key2StringMapper}. It supports all the
+ * primitive wrappers(e.g. Integer, Long etc).
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class DefaultKey2StringMapper implements Key2StringMapper {
+
+ /**
+ * Returns true if this is an primitive wrapper, false otherwise.
+ */
+ public boolean isSupportedType(Object key) {
+ Class clazz = key.getClass();
+ return key == String.class ||
+ key == Short.class ||
+ key == Byte.class ||
+ key == Long.class ||
+ key == Integer.class ||
+ key == Double.class ||
+ key == Float.class ||
+ key == Boolean.class;
+ }
+
+ /**
+ * Returns key.toString. As key being a primitive wrapper, this will ensure that it is unique.
+ */
+ public String getStringMapping(Object key) {
+ if (key == null) {
+ throw new NullPointerException("Not supporting null keys");
+ }
+ return key.toString();
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/DefaultKey2StringMapper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStore.java (from rev 7791, core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStore.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStore.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,310 @@
+package org.horizon.loader.jdbc.stringbased;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.horizon.Cache;
+import org.horizon.io.ByteBuffer;
+import org.horizon.loader.CacheLoaderConfig;
+import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.LockSupportCacheStore;
+import org.horizon.loader.StoredEntry;
+import org.horizon.loader.jdbc.JdbcUtil;
+import org.horizon.loader.jdbc.TableManipulation;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactory;
+import org.horizon.marshall.Marshaller;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.sql.Connection;
+import java.sql.PreparedStatement;
+import java.sql.ResultSet;
+import java.sql.SQLException;
+import java.util.HashSet;
+import java.util.Set;
+
+/**
+ * {@link org.horizon.loader.CacheStore} implementation that stores the entries in a database. In contrast to the {@link
+ * org.horizon.loader.jdbc.binary.JdbcBinaryCacheStore}, this cache store will store each entry within a row in the
+ * table (rather than grouping multiple entries into an row). This assures a finer graned granularity for all operation,
+ * and better performance. In order to be able to store non-string keys, it relies on an {@link Key2StringMapper}.
+ * <p/>
+ * The actual storage table is defined through configuration {@link JdbcStringBasedCacheStore}. The table can be
+ * created/dropped on-the-fly, at deployment time. For more details consult javadoc for {@link
+ * JdbcStringBasedCacheStore}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @see Key2StringMapper
+ * @see DefaultKey2StringMapper
+ */
+public class JdbcStringBasedCacheStore extends LockSupportCacheStore {
+
+ private static Log log = LogFactory.getLog(JdbcStringBasedCacheStore.class);
+
+ /** delimits the stram for stream trasfer operations */
+ private static final String STRING_STREAM_DELIMITER = "__JdbcCacheStore_done__";
+
+ private JdbcStringBasedCacheStoreConfig config;
+ private Key2StringMapper key2StringMapper;
+ private ConnectionFactory connectionFactory;
+ private TableManipulation tableManipulation;
+ private Marshaller marshaller;
+
+ public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ super.init(config, cache, m);
+ this.config = (JdbcStringBasedCacheStoreConfig) config;
+ this.marshaller = m;
+ }
+
+ @Override
+ public void start() throws CacheLoaderException {
+ super.start();
+ String connectionFactoryClass = config.getConnectionFactoryConfig().getConnectionFactoryClass();
+ connectionFactory = ConnectionFactory.getConnectionFactory(connectionFactoryClass);
+ connectionFactory.start(config.getConnectionFactoryConfig());
+ this.tableManipulation = config.getTableManipulation();
+ tableManipulation.start(connectionFactory);
+ this.key2StringMapper = config.getKey2StringMapper();
+ }
+
+ public void stop() throws CacheLoaderException {
+ tableManipulation.stop();
+ connectionFactory.stop();
+ }
+
+ protected String getLockFromKey(Object key) throws CacheLoaderException {
+ if (!key2StringMapper.isSupportedType(key.getClass())) {
+ throw new UnsupportedKeyTypeException(key);
+ }
+ return key2StringMapper.getStringMapping(key);
+ }
+
+ public void storeLockSafe(StoredEntry ed, String lockingKey) throws CacheLoaderException {
+ StoredEntry existingOne = loadLockSafe(ed, lockingKey);
+ String sql;
+ if (existingOne == null) {
+ sql = tableManipulation.getInsertRowSql();
+ } else {
+ sql = tableManipulation.getUpdateRowSql();
+ }
+ if (log.isTraceEnabled())
+ log.trace("Running sql '" + sql + "' on " + ed + ". Key string is '" + lockingKey + "'");
+ Connection connection = null;
+ PreparedStatement ps = null;
+ try {
+ connection = connectionFactory.getConnection();
+ ps = connection.prepareStatement(sql);
+ ByteBuffer byteBuffer = JdbcUtil.marshall(marshaller, ed);
+ ps.setBinaryStream(1, byteBuffer.getStream(), byteBuffer.getLength());
+ ps.setLong(2, ed.getExpiryTime());
+ ps.setString(3, lockingKey);
+ ps.executeUpdate();
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Error while storing string keys to database");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(connection);
+ }
+ }
+
+ public boolean removeLockSafe(Object key, String keyStr) throws CacheLoaderException {
+ Connection connection = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getDeleteRowSql();
+ if (log.isTraceEnabled()) log.trace("Running sql '" + sql + "' on " + keyStr);
+ connection = connectionFactory.getConnection();
+ ps = connection.prepareStatement(sql);
+ ps.setString(1, keyStr);
+ return ps.executeUpdate() == 1;
+ } catch (SQLException ex) {
+ String message = "Error while storing string keys to database";
+ log.error(message, ex);
+ throw new CacheLoaderException(message, ex);
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(connection);
+ }
+ }
+
+ public void fromStreamLockSafe(ObjectInput objectInput) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ conn = connectionFactory.getConnection();
+ String sql = tableManipulation.getInsertRowSql();
+ ps = conn.prepareStatement(sql);
+
+ int readStoredEntries = 0;
+ int batchSize = 100;
+ Object objFromStream = objectInput.readObject();
+ while (!objFromStream.equals(STRING_STREAM_DELIMITER)) {
+ StoredEntry se = (StoredEntry) objFromStream;
+ readStoredEntries++;
+ String key = key2StringMapper.getStringMapping(se.getKey());
+ ByteBuffer buffer = JdbcUtil.marshall(marshaller, se);
+ ps.setBinaryStream(1, buffer.getStream(), buffer.getLength());
+ ps.setLong(2, se.getExpiryTime());
+ ps.setString(3, key);
+ ps.addBatch();
+ if (readStoredEntries % batchSize == 0) {
+ ps.executeBatch();
+ if (log.isTraceEnabled())
+ log.trace("Executing batch " + (readStoredEntries / batchSize) + ", batch size is " + batchSize);
+ }
+ objFromStream = objectInput.readObject();
+ }
+ if (readStoredEntries % batchSize != 0)
+ ps.executeBatch();//flush the batch
+ if (log.isTraceEnabled())
+ log.trace("Successfully inserted " + readStoredEntries + " buckets into the database, batch size is " + batchSize);
+ } catch (IOException ex) {
+ logAndThrow(ex, "I/O failure while integrating state into store");
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL failure while integrating state into store");
+ } catch (ClassNotFoundException e) {
+ logAndThrow(e, "Unexpected failure while integrating state into store");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected void toStreamLockSafe(ObjectOutput objectOutput) throws CacheLoaderException {
+ //now write our data
+ Connection connection = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getLoadAllRowsSql();
+ if (log.isTraceEnabled()) log.trace("Running sql '" + sql);
+ connection = connectionFactory.getConnection();
+ ps = connection.prepareStatement(sql);
+ rs = ps.executeQuery();
+ while (rs.next()) {
+ InputStream is = rs.getBinaryStream(1);
+ StoredEntry se = (StoredEntry) JdbcUtil.unmarshall(marshaller, is);
+ objectOutput.writeObject(se);
+ }
+ objectOutput.writeObject(STRING_STREAM_DELIMITER);
+ } catch (SQLException e) {
+ logAndThrow(e, "SQL Error while storing string keys to database");
+ } catch (IOException e) {
+ logAndThrow(e, "I/O Error while storing string keys to database");
+ }
+ finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(connection);
+ }
+ }
+
+ @Override
+ protected void clearLockSafe() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getDeleteAllRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully removed " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed clearing JdbcBinaryCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ @Override
+ protected void purgeInternal() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ try {
+ String sql = tableManipulation.getDeleteExpiredRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ int result = ps.executeUpdate();
+ if (log.isTraceEnabled())
+ log.trace("Successfully purged " + result + " rows.");
+ } catch (SQLException ex) {
+ logAndThrow(ex, "Failed purging JdbcBinaryCacheStore");
+ } finally {
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected Set<StoredEntry> loadAllLockSafe() throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getLoadAllRowsSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ rs = ps.executeQuery();
+ rs.setFetchSize(100);
+ Set<StoredEntry> result = new HashSet<StoredEntry>();
+ while (rs.next()) {
+ InputStream inputStream = rs.getBinaryStream(1);
+ StoredEntry se = (StoredEntry) JdbcUtil.unmarshall(marshaller, inputStream);
+ result.add(se);
+ }
+ return result;
+ } catch (SQLException e) {
+ String message = "SQL error while fetching all StoredEntries";
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ protected StoredEntry loadLockSafe(Object key, String lockingKey) throws CacheLoaderException {
+ Connection conn = null;
+ PreparedStatement ps = null;
+ ResultSet rs = null;
+ try {
+ String sql = tableManipulation.getSelectRowSql();
+ conn = connectionFactory.getConnection();
+ ps = conn.prepareStatement(sql);
+ ps.setString(1, lockingKey);
+ rs = ps.executeQuery();
+ if (rs.next()) {
+ InputStream inputStream = rs.getBinaryStream(2);
+ StoredEntry storedEntry = (StoredEntry) JdbcUtil.unmarshall(marshaller, inputStream);
+ if (storedEntry.isExpired()) {
+ if (log.isTraceEnabled()) {
+ log.trace("Not returning '" + storedEntry + "' as it is expired. It will be removed from DB by purging thread!");
+ }
+ return null;
+ }
+ return storedEntry;
+ }
+ return null;
+ } catch (SQLException e) {
+ String message = "SQL error while fetching strored entry with key:" + key + " lockingKey: " + lockingKey;
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ } finally {
+ JdbcUtil.safeClose(rs);
+ JdbcUtil.safeClose(ps);
+ connectionFactory.releaseConnection(conn);
+ }
+ }
+
+ public Class<? extends CacheLoaderConfig> getConfigurationClass() {
+ return JdbcStringBasedCacheStoreConfig.class;
+ }
+
+ protected void logAndThrow(Exception e, String message) throws CacheLoaderException {
+ log.error(message, e);
+ throw new CacheLoaderException(message, e);
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStore.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStoreConfig.java (from rev 7791, core/branches/flat/src/main/java/org/horizon/loader/jdbc/JdbcCacheStoreConfig.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStoreConfig.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStoreConfig.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,140 @@
+package org.horizon.loader.jdbc.stringbased;
+
+import org.horizon.loader.LockSupportCacheStoreConfig;
+import org.horizon.loader.jdbc.TableManipulation;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.util.Util;
+
+/**
+ * Configuration for {@link org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore} cache store.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ * @see org.horizon.loader.jdbc.stringbased.Key2StringMapper
+ */
+public class JdbcStringBasedCacheStoreConfig extends LockSupportCacheStoreConfig {
+
+ private Key2StringMapper key2StringMapper;
+
+ private ConnectionFactoryConfig connectionFactoryConfig = new ConnectionFactoryConfig();
+ private TableManipulation tableManipulation = new TableManipulation();
+
+ public JdbcStringBasedCacheStoreConfig(ConnectionFactoryConfig connectionFactoryConfig, TableManipulation tableManipulation) {
+ this.connectionFactoryConfig = connectionFactoryConfig;
+ this.tableManipulation = tableManipulation;
+ }
+
+ public JdbcStringBasedCacheStoreConfig() {
+ className = JdbcStringBasedCacheStore.class.getName();
+ }
+
+ public Key2StringMapper getKey2StringMapper() {
+ if (key2StringMapper == null) {
+ try {
+ key2StringMapper = DefaultKey2StringMapper.class.newInstance();
+ } catch (Exception e) {
+ throw new IllegalStateException("This should never happen", e);
+ }
+ }
+ return key2StringMapper;
+ }
+
+ /**
+ * Name of the class implementing Key2StringMapper. The default value is {@link org.horizon.loader.jdbc.stringbased.DefaultKey2StringMapper}
+ *
+ * @see org.horizon.loader.jdbc.stringbased.Key2StringMapper
+ */
+ public void setKey2StringMapperClass(String className) {
+ try {
+ key2StringMapper = (Key2StringMapper) Util.getInstance(className);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Could not load Key2StringMapper :'" + className + "'", e);
+ }
+ }
+
+ /**
+ * Sets the name of the table where data will be stored.
+ */
+ public void setStringsTableName(String stringsTableName) {
+ this.tableManipulation.setTableName(stringsTableName);
+ }
+
+ /**
+ * Sets the name of the column where the id will be stored. The id is obtained through:
+ * <pre>
+ * key2StringMapper.getStringMapping(storedEntry.getKey());
+ * </pre>
+ * Mandatory.
+ */
+ public void setIdColumnName(String idColumnName) {
+ this.tableManipulation.setIdColumnName(idColumnName);
+ }
+
+ /**
+ * Sets the name of the column where the StoredEntry will be binary stored. Mandatory.
+ */
+ public void setDataColumnName(String dataColumnName) {
+ this.tableManipulation.setDataColumnName(dataColumnName);
+ }
+
+ /**
+ * Sets the name of the column where the timestamp (Long in java) will be stored. Mandatory.
+ */
+ public void setTimestampColumnName(String timestampColumnName) {
+ this.tableManipulation.setTimestampColumnName(timestampColumnName);
+ }
+
+ public void setConnectionFactoryClass(String connectionFactoryClass) {
+ this.connectionFactoryConfig.setConnectionFactoryClass(connectionFactoryClass);
+ }
+
+ public ConnectionFactoryConfig getConnectionFactoryConfig() {
+ return connectionFactoryConfig;
+ }
+
+ public TableManipulation getTableManipulation() {
+ return tableManipulation;
+ }
+
+ /**
+ * Jdbc connection string for connecting to the database. Mandatory.
+ */
+ public void setConnectionUrl(String connectionUrl) {
+ this.connectionFactoryConfig.setConnectionUrl(connectionUrl);
+ }
+
+ /**
+ * Database username.
+ */
+ public void setUserName(String userName) {
+ this.connectionFactoryConfig.setUserName(userName);
+ }
+
+ /**
+ * Database username's password.
+ */
+ public void setPassword(String password) {
+ this.connectionFactoryConfig.setPassword(password);
+ }
+
+ /**
+ * The name of the driver used for connecting to the database. Mandatory, will be loaded before initiating the
+ * first connection.
+ */
+ public void setDriverClass(String driverClassName) {
+ this.connectionFactoryConfig.setDriverClass(driverClassName);
+ }
+
+ /**
+ * sql equivalent for java's String. Mandatory.
+ */
+ public void setIdColumnType(String idColumnType) {
+ this.tableManipulation.setIdColumnType(idColumnType);
+ }
+
+ /**
+ * Sets the type of the column where data will be binary stored. BLOB-like type, DBMS dependent. Mandatory.
+ */
+ public void setDataColumnType(String dataColumnType) {
+ this.tableManipulation.setDataColumnType(dataColumnType);
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/JdbcStringBasedCacheStoreConfig.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/Key2StringMapper.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/Key2StringMapper.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/Key2StringMapper.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,22 @@
+package org.horizon.loader.jdbc.stringbased;
+
+/**
+ * Defines the logic of mapping an key object to a String. This is required {@link
+ * org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore}, in order to map each {@link
+ * org.horizon.loader.StoredEntry} as an single row within a database. It bassically should generate an unique String PK
+ * based on the supplied key.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public interface Key2StringMapper {
+
+ /**
+ * Do we support this key type?
+ */
+ boolean isSupportedType(Object key);
+
+ /**
+ * Must return an unique String for the supplied key.
+ */
+ String getStringMapping(Object key);
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/Key2StringMapper.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/UnsupportedKeyTypeException.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/UnsupportedKeyTypeException.java (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/UnsupportedKeyTypeException.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,29 @@
+package org.horizon.loader.jdbc.stringbased;
+
+import org.horizon.loader.CacheLoaderException;
+
+/**
+ * Exception thrown by {@link org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore} when one tries to persist
+ * a StoredEntry with an unsupported key type.
+ *
+ * @see org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class UnsupportedKeyTypeException extends CacheLoaderException {
+
+ public UnsupportedKeyTypeException(Object key) {
+ this("Unsupported key type: '" + key.getClass().getName() + "' on key: " + key );
+ }
+
+ public UnsupportedKeyTypeException(String message) {
+ super(message);
+ }
+
+ public UnsupportedKeyTypeException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public UnsupportedKeyTypeException(Throwable cause) {
+ super(cause);
+ }
+}
Property changes on: core/branches/flat/src/main/java/org/horizon/loader/jdbc/stringbased/UnsupportedKeyTypeException.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/main/java/org/horizon/lock/StripedLock.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -27,6 +27,7 @@
import java.util.List;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
@@ -91,8 +92,10 @@
ReentrantReadWriteLock lock = getLock(key);
if (exclusive) {
lock.writeLock().lock();
+ if (log.isTraceEnabled()) log.trace("WL acquired for '" + key + "'");
} else {
lock.readLock().lock();
+ if (log.isTraceEnabled()) log.trace("RL acquired for '" + key + "'");
}
}
@@ -117,8 +120,10 @@
ReentrantReadWriteLock lock = getLock(key);
if (lock.isWriteLockedByCurrentThread()) {
lock.writeLock().unlock();
+ if (log.isTraceEnabled()) log.trace("WL released for '" + key + "'");
} else {
lock.readLock().unlock();
+ if (log.isTraceEnabled()) log.trace("RL released for '" + key + "'");
}
}
@@ -175,5 +180,59 @@
return count;
}
+ /**
+ * Acquires RL on all locks agregated by this StripedLock, in the given timeout.
+ */
+ public boolean aquireGlobalLock(boolean exclusive, long timeout) {
+ boolean success = true;
+ for (int i = 0; i < sharedLocks.length; i++) {
+ Lock toAcquire = exclusive ? sharedLocks[i].writeLock() : sharedLocks[i].readLock();
+ try {
+ success = toAcquire.tryLock(timeout, TimeUnit.MILLISECONDS);
+ if (!success) {
+ if (log.isTraceEnabled())
+ log.trace("Could not aquire lock on " + toAcquire + ". Exclusive?" + exclusive);
+ break;
+ }
+ } catch (InterruptedException e) {
+ if (log.isTraceEnabled()) log.trace("Cought InterruptedException while trying to aquire global lock", e);
+ success = false;
+ } finally {
+ if (!success) {
+ for (int j = 0; j < i; j++) {
+ Lock toRelease = exclusive ? sharedLocks[j].writeLock() : sharedLocks[j].readLock();
+ toRelease.unlock();
+ }
+ }
+ }
+ }
+ return success;
+ }
+ public void releaseGlobalLock(boolean exclusive) {
+ for (ReentrantReadWriteLock lock : sharedLocks) {
+ Lock toRelease = exclusive ? lock.writeLock() : lock.readLock();
+ toRelease.unlock();
+ }
+ }
+
+ public int getTotalReadLockCount() {
+ int count = 0;
+ for (ReentrantReadWriteLock lock : sharedLocks) {
+ count += lock.getReadLockCount();
+ }
+ return count;
+ }
+
+ public int getSharedLockCount() {
+ return sharedLocks.length;
+ }
+
+ public int getTotalWriteLockCount() {
+ int count = 0;
+ for (ReentrantReadWriteLock lock : sharedLocks) {
+ count += lock.isWriteLocked() ? 1 : 0;
+ }
+ return count;
+ }
}
Modified: core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/config/parsing/ConfigurationParserTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -8,7 +8,7 @@
import org.horizon.eviction.algorithms.fifo.FIFOAlgorithmConfig;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.decorators.SingletonStoreConfig;
-import org.horizon.loader.jdbc.JdbcCacheStoreConfig;
+import org.horizon.loader.jdbc.binary.JdbcBinaryCacheStoreConfig;
import org.horizon.lock.IsolationLevel;
import org.horizon.transaction.GenericTransactionManagerLookup;
import org.testng.annotations.Test;
@@ -156,7 +156,7 @@
public void testCacheLoaders() throws Exception {
XmlConfigurationParserImpl parser = new XmlConfigurationParserImpl();
String xml = "<loaders passivation=\"true\" shared=\"true\" preload=\"true\">\n" +
- " <loader class=\"org.horizon.loader.jdbc.JdbcCacheStore\" fetchPersistentState=\"true\"\n" +
+ " <loader class=\"org.horizon.loader.jdbc.JdbcBucketCacheStore\" fetchPersistentState=\"true\"\n" +
" ignoreModifications=\"false\" purgeOnStartup=\"false\">\n" +
" <properties>\n" +
" dataSource=HorizonDS\n" +
@@ -181,7 +181,7 @@
assert clc.isPreload();
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
- assert iclc.getClassName().equals("org.horizon.loader.jdbc.JdbcCacheStore");
+ assert iclc.getClassName().equals("org.horizon.loader.jdbc.JdbcBucketCacheStore");
assert iclc.getAsyncStoreConfig().isEnabled();
assert iclc.getAsyncStoreConfig().getBatchSize() == 15;
assert iclc.getAsyncStoreConfig().getPollWait() == 100;
@@ -191,12 +191,12 @@
assert !iclc.isIgnoreModifications();
assert !iclc.isPurgeOnStartup();
- JdbcCacheStoreConfig jdbcclc = (JdbcCacheStoreConfig) iclc;
+ JdbcBinaryCacheStoreConfig jdbcclc = (JdbcBinaryCacheStoreConfig) iclc;
// assert jdbcclc.getDataSource().equals("HorizonDS");
// assert jdbcclc.getTableNamePrefix().equals("horizon");
assert false : "todo update test according to config";
- assert jdbcclc.isCreateTableOnStart();
- assert !jdbcclc.isDropTableOnExit();
+// assert jdbcclc.isCreateTableOnStart();
+// assert !jdbcclc.isDropTableOnExit();
SingletonStoreConfig ssc = iclc.getSingletonStoreConfig();
assert ssc.isSingletonStoreEnabled();
@@ -207,7 +207,7 @@
public void testCacheLoadersDefaults() throws Exception {
XmlConfigurationParserImpl parser = new XmlConfigurationParserImpl();
String xml = "<loaders>\n" +
- " <loader class=\"org.horizon.loader.jdbc.JdbcCacheStore\">\n" +
+ " <loader class=\"org.horizon.loader.jdbc.binary.JdbcBinaryCacheStore\">\n" +
" <properties />\n" +
" </loader>\n" +
" </loaders>";
@@ -224,7 +224,7 @@
assert !clc.isPreload();
CacheLoaderConfig iclc = clc.getFirstCacheLoaderConfig();
- assert iclc.getClassName().equals("org.horizon.loader.jdbc.JdbcCacheStore");
+ assert iclc.getClassName().equals("org.horizon.loader.jdbc.binary.JdbcBinaryCacheStore");
assert !iclc.getAsyncStoreConfig().isEnabled();
assert !iclc.isFetchPersistentState();
assert !iclc.isIgnoreModifications();
Modified: core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/BaseCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -39,7 +39,13 @@
@BeforeMethod
public void setUp() throws Exception {
- cs = createCacheStore();
+ try {
+ cs = createCacheStore();
+ } catch (Exception e) {
+ //in IDEs this won't be printed which makes debugging harder
+ e.printStackTrace();
+ throw e;
+ }
}
@AfterMethod
@@ -51,6 +57,14 @@
cs = null;
}
+ @AfterMethod
+ public void assertNoLocksHeld() {
+ //doesn't really make sense to add a subclass for this check only
+ if (cs instanceof LockSupportCacheStore) {
+ assert ((LockSupportCacheStore)cs).getTotalLockCount() == 0;
+ }
+ }
+
/**
* @return a mock cache for use with the cache store impls
*/
Modified: core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStore.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,8 +1,8 @@
package org.horizon.loader.dummy;
import org.horizon.Cache;
-import org.horizon.loader.AbstractCacheLoaderConfig;
import org.horizon.loader.AbstractCacheStore;
+import org.horizon.loader.AbstractCacheStoreConfig;
import org.horizon.loader.CacheLoaderConfig;
import org.horizon.loader.CacheLoaderException;
import org.horizon.loader.StoredEntry;
@@ -64,7 +64,7 @@
return store.remove(key) != null;
}
- public void purgeExpired() {
+ protected void purgeInternal() throws CacheLoaderException {
for (Iterator<StoredEntry> i = store.values().iterator(); i.hasNext();) {
StoredEntry se = i.next();
if (se.isExpired()) i.remove();
@@ -72,6 +72,7 @@
}
public void init(CacheLoaderConfig config, Cache cache, Marshaller m) {
+ super.init(config, cache, m);
this.config = (Cfg) config;
this.cache = cache;
this.marshaller = m;
@@ -109,7 +110,8 @@
}
@SuppressWarnings("unchecked")
- public void start() {
+ public void start() throws CacheLoaderException {
+ super.start();
storeName = config.getStore();
if (cache != null) storeName += "_" + cache.getName();
Map m = new ConcurrentHashMap();
@@ -120,7 +122,7 @@
public void stop() {
}
- public static class Cfg extends AbstractCacheLoaderConfig {
+ public static class Cfg extends AbstractCacheStoreConfig {
boolean debug;
String store = "__DEFAULT_STORE__";
Modified: core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStoreTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/dummy/DummyInMemoryCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -2,12 +2,13 @@
import org.horizon.loader.BaseCacheStoreTest;
import org.horizon.loader.CacheStore;
+import org.horizon.loader.CacheLoaderException;
import org.testng.annotations.Test;
@Test(groups = "unit", testName = "loader.dummy.DummyInMemoryCacheStoreTest")
public class DummyInMemoryCacheStoreTest extends BaseCacheStoreTest {
- protected CacheStore createCacheStore() {
+ protected CacheStore createCacheStore() throws CacheLoaderException {
DummyInMemoryCacheStore cl = new DummyInMemoryCacheStore();
DummyInMemoryCacheStore.Cfg cfg = new DummyInMemoryCacheStore.Cfg();
cfg.setStore(DummyInMemoryCacheStoreTest.class.getName());
Modified: core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/file/FileCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -7,7 +7,6 @@
import org.horizon.loader.StoredEntry;
import org.horizon.loader.bucket.Bucket;
import org.horizon.test.TestingUtil;
-import org.testng.annotations.AfterMethod;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -34,12 +33,6 @@
return fcs;
}
- @AfterMethod
- public void assertNoLocksHeldAfterTest() {
- assert fcs.getBucketLockCount() == 0;
- assert fcs.getGlobalLockCount() == 0;
- }
-
@AfterTest
@BeforeTest
public void removeTempDirectory() {
Added: core/branches/flat/src/test/java/org/horizon/loader/jdbc/DefaultKey2StringMapperTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/DefaultKey2StringMapperTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/DefaultKey2StringMapperTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,34 @@
+package org.horizon.loader.jdbc;
+
+import org.testng.annotations.Test;
+import org.horizon.loader.jdbc.stringbased.DefaultKey2StringMapper;
+
+/**
+ * Tester for {@link org.horizon.loader.jdbc.stringbased.Key2StringMapper}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+@Test(groups = "unit", testName = "loader.jdbc.DefaultKey2StringMapperTest")
+public class DefaultKey2StringMapperTest {
+
+ DefaultKey2StringMapper mapper = new DefaultKey2StringMapper();
+
+ public void testPrimitivesAreSupported() {
+ assert mapper.isSupportedType(Integer.class);
+ assert mapper.isSupportedType(Byte.class);
+ assert mapper.isSupportedType(Short.class);
+ assert mapper.isSupportedType(Long.class);
+ assert mapper.isSupportedType(Double.class);
+ assert mapper.isSupportedType(Float.class);
+ assert mapper.isSupportedType(Boolean.class);
+ assert mapper.isSupportedType(String.class);
+ }
+
+ @SuppressWarnings(value = "all")
+ public void testGetStingMapping() {
+ Object[] toTest = {new Integer(0), new Byte("1"), new Short("2"), new Long(3), new Double("3.4"), new Float("3.5"), Boolean.FALSE, "some string"};
+ for (Object o : toTest) {
+ assert mapper.getStringMapping(o).equals(o.toString());
+ }
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/DefaultKey2StringMapperTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Copied: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcBinaryCacheStoreTest.java (from rev 7791, core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcBinaryCacheStoreTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcBinaryCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,29 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.jdbc.binary.JdbcBinaryCacheStore;
+import org.horizon.loader.jdbc.binary.JdbcBinaryCacheStoreConfig;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.marshall.ObjectStreamMarshaller;
+import org.horizon.loader.jdbc.UnitTestDatabaseManager;
+import org.testng.annotations.Test;
+
+/**
+ * Tester class for {@link JdbcBinaryCacheStore}
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+@Test(groups = "functional", testName = "loader.jdbc.JdbcBinaryCacheStoreTest")
+public class JdbcBinaryCacheStoreTest extends BaseCacheStoreTest {
+
+ protected CacheStore createCacheStore() throws Exception {
+ ConnectionFactoryConfig connectionFactoryConfig = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
+ TableManipulation tm = UnitTestDatabaseManager.buildDefaultTableManipulation();
+ JdbcBinaryCacheStoreConfig config = new JdbcBinaryCacheStoreConfig(connectionFactoryConfig, tm);
+ JdbcBinaryCacheStore jdbcBucketCacheStore = new JdbcBinaryCacheStore();
+ jdbcBucketCacheStore.init(config, null, new ObjectStreamMarshaller());
+ jdbcBucketCacheStore.start();
+ return jdbcBucketCacheStore;
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcBinaryCacheStoreTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,66 +0,0 @@
-package org.horizon.loader.jdbc;
-
-import org.horizon.loader.CacheStore;
-import org.horizon.marshall.ObjectStreamMarshaller;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.Test;
-
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-
-/**
- * // TODO: Mircea: Document this!
- *
- * @author
- */
-@Test(groups = "functional", testName = "loader.jdbc.JdbcCacheStoreTest", enabled = false)
-public class JdbcCacheStoreTest /*extends BaseCacheStoreTest*/ {
-
- private JdbcCacheStore jdbcCacheStore;
-
- protected CacheStore createCacheStore() throws Exception {
- try {
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- Connection connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
- Statement st = connection.createStatement();
- try {
- st.executeUpdate("DROP TABLE horizon_jdbc");
- } catch (SQLException e) {
- //ignore, might be the table does not exist
- }
- JdbcUtil.safeClose(st);
- JdbcUtil.safeClose(connection);
-
- jdbcCacheStore = new JdbcCacheStore();
- JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
- config.setConnectionFactoryClass(PooledConnectionFactory.class.getName());
- config.setConnectionUrl("jdbc:mysql://localhost/horizon");
- config.setUserName("root");
- config.setPassword("root");
- config.setDriverClass("com.mysql.jdbc.Driver");
- config.setTableName("horizon_jdbc");
- config.setKeyColumnName("key_name");
- config.setKeyColumnType("varchar(255)");
- config.setDataColumnName("BUCKET");
- config.setDataColumnType("BLOB");
- config.setTimestampColumnName("TIMESTAMP");
- config.setTimestampColumnType("BIGINT");
- config.setPurgeSynchronously(true);
- jdbcCacheStore.init(config, null, new ObjectStreamMarshaller());
- jdbcCacheStore.start();
- return jdbcCacheStore;
- } catch (Throwable e) {
- e.printStackTrace(); // TODO: Mircea: Customise this generated block
- throw (Exception) e;
- }
- }
-
- //todo move this in upper class
- @AfterMethod
- public void assertNoLocksHeldAfterTest() {
- assert jdbcCacheStore.getBucketLockCount() == 0;
- assert jdbcCacheStore.getGlobalLockCount() == 0;
- }
-}
Copied: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcStringBasedCacheStoreTest.java (from rev 7791, core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcCacheStoreTest.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcStringBasedCacheStoreTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcStringBasedCacheStoreTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,29 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.loader.BaseCacheStoreTest;
+import org.horizon.loader.CacheStore;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore;
+import org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStoreConfig;
+import org.horizon.marshall.ObjectStreamMarshaller;
+import org.horizon.loader.jdbc.UnitTestDatabaseManager;
+import org.testng.annotations.Test;
+
+/**
+ * Tester class for {@link org.horizon.loader.jdbc.stringbased.JdbcStringBasedCacheStore}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+@Test(groups = "functional", testName = "loader.jdbc.JdbcStringBasedCacheStoreTest")
+public class JdbcStringBasedCacheStoreTest extends BaseCacheStoreTest {
+
+ protected CacheStore createCacheStore() throws Exception {
+ ConnectionFactoryConfig connectionFactoryConfig = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
+ TableManipulation tm = UnitTestDatabaseManager.buildDefaultTableManipulation();
+ JdbcStringBasedCacheStoreConfig config = new JdbcStringBasedCacheStoreConfig(connectionFactoryConfig, tm);
+ JdbcStringBasedCacheStore jdbcBucketCacheStore = new JdbcStringBasedCacheStore();
+ jdbcBucketCacheStore.init(config, null, new ObjectStreamMarshaller());
+ jdbcBucketCacheStore.start();
+ return jdbcBucketCacheStore;
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/JdbcStringBasedCacheStoreTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/PooledConnectionFactoryTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,16 +1,17 @@
package org.horizon.loader.jdbc;
-import org.horizon.test.UnitTestDatabaseManager;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.loader.jdbc.connectionfactory.PooledConnectionFactory;
+import org.horizon.loader.jdbc.UnitTestDatabaseManager;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.Test;
-import java.io.File;
import java.sql.Connection;
import java.util.HashSet;
import java.util.Set;
/**
- * // TODO: Mircea: Document this!
+ * Tester class for {@link org.horizon.loader.jdbc.connectionfactory.PooledConnectionFactory}.
*
* @author
*/
@@ -26,7 +27,7 @@
public void testValuesNoOverrides() throws Exception {
factory = new PooledConnectionFactory();
- JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
+ ConnectionFactoryConfig config = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
factory.start(config);
int hadcodedMaxPoolSize = factory.getPooledDataSource().getMaxPoolSize();
Set<Connection> connections = new HashSet<Connection>();
@@ -43,17 +44,6 @@
if (factory.getPooledDataSource().getNumBusyConnections() == 0) break;
}
//this must happen eventually
- assert factory.getPooledDataSource().getNumBusyConnections() == 0;
+ assert factory.getPooledDataSource().getNumBusyConnections() == 0;
}
-
- public void testWithPorpertyOverrides() throws Exception {
- String prevVal = System.setProperty("c3p0.maxPoolSize", "3");
- System.out.println(new File(".").getAbsolutePath());
- factory = new PooledConnectionFactory();
- JdbcCacheStoreConfig config = UnitTestDatabaseManager.getUniqueJdbcCacheStoreConfig();
- factory.start(config);
- assert factory.getPooledDataSource().getMaxPoolSize() == 3 : "expected 3, received " + factory.getPooledDataSource().getMaxPoolSize();
- if (prevVal != null) System.setProperty("c3p0.maxPoolSize", prevVal);
-
- }
}
Modified: core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/TableManipulationTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -2,6 +2,8 @@
import static org.easymock.EasyMock.*;
import org.horizon.loader.CacheLoaderException;
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.loader.jdbc.UnitTestDatabaseManager;
import org.testng.annotations.AfterTest;
import org.testng.annotations.BeforeTest;
import org.testng.annotations.Test;
@@ -14,148 +16,134 @@
import java.sql.Statement;
/**
- * // TODO: Mircea: Document this!
+ * Tester class for {@link org.horizon.loader.jdbc.TableManipulation}.
*
- * @author
+ * @author Mircea.Markus(a)jboss.com
*/
-@Test(groups = "functional", testName = "loader.jdbc.TableManipulationTest", enabled = false)
+@Test(groups = "functional", testName = "loader.jdbc.TableManipulationTest")
public class TableManipulationTest {
Connection connection;
TableManipulation tableManipulation;
- JdbcCacheStoreConfig config;
+ private ConnectionFactoryConfig cfg;
@BeforeTest
public void createConnection() throws Exception {
- Class.forName("com.mysql.jdbc.Driver").newInstance();
- connection = DriverManager.getConnection("jdbc:mysql://localhost/horizon", "root", "root");
- Statement st = connection.createStatement();
- try {
- st.executeUpdate("DELETE FROM horizon_test");
- st.executeUpdate("DROP TABLE horizon_test");
- } catch (SQLException e) {
- //ignore, might be the table does not exist
- }
- JdbcUtil.safeClose(st);
- config = new JdbcCacheStoreConfig();
- config.setKeyColumnType("VARCHAR(255)");
- config.setDataColumnType("BLOB");
- config.setTableName("horizon_test");
- config.setKeyColumnName("KEY_HASH");
- config.setDataColumnName("BUCKET");
- config.setTimestampColumnName("TIMESTAMP");
- config.setTimestampColumnType("BIGINT");
- tableManipulation = new TableManipulation(connection, config);
+ cfg = UnitTestDatabaseManager.getUniqueConnectionFactoryConfig();
+ connection = DriverManager.getConnection(cfg.getConnectionUrl(), cfg.getUserName(), cfg.getPassword());
+ tableManipulation = UnitTestDatabaseManager.buildDefaultTableManipulation();
}
@AfterTest
public void closeConnection() throws SQLException {
connection.close();
+ UnitTestDatabaseManager.shutdownInMemoryDatabase(cfg);
}
public void testInsufficientConfigParams() throws Exception {
- JdbcCacheStoreConfig config = new JdbcCacheStoreConfig();
- config.setKeyColumnType("VARCHAR(255)");
- config.setDataColumnType("BLOB");
- config.setTableName("horizon");
- config.setKeyColumnName("dsadsa");
- config.setDataColumnName("dsadsa");
- config.setTimestampColumnName("timestamp");
- config.setTimestampColumnType("BIGINT");
- Connection mockConnection = createMock(Connection.class);
+ Connection mockConnection = createNiceMock(Connection.class);
Statement mockStatement = createNiceMock(Statement.class);
expect(mockConnection.createStatement()).andReturn(mockStatement);
+ expectLastCall().anyTimes();
replay(mockConnection, mockStatement);
- TableManipulation other = new TableManipulation(mockConnection, config);
+ TableManipulation other = tableManipulation.clone();
try {
- other.createTable();
+ other.createTable(mockConnection);
} catch (CacheLoaderException e) {
assert false : "We do not expect a failure here";
}
- config.setKeyColumnType(null);
+ other.setDataColumnType(null);
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setKeyColumnType("VARCHAR(255)");
+ other.setDataColumnType("VARCHAR(255)");
assert true : "We do not expect a failure here";
}
- config.setKeyColumnName("");
+ other.createTable(mockConnection);
+
+ other.setIdColumnName("");
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setKeyColumnName("abc");
+ other.setIdColumnName("abc");
assert true : "We do not expect a failure here";
}
- config.setTableName(null);
+ other.createTable(mockConnection);
+
+ other.setDataColumnName(null);
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setTableName("abc");
+ other.setDataColumnName("abc");
assert true : "We do not expect a failure here";
}
- config.setDataColumnName(null);
+ other.createTable(mockConnection);
+
+ other.setDataColumnName(null);
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setDataColumnName("abc");
+ other.setDataColumnName("abc");
assert true : "We do not expect a failure here";
}
- config.setTimestampColumnName(null);
+ other.createTable(mockConnection);
+
+ other.setTimestampColumnName(null);
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setDataColumnName("timestamp");
+ other.setDataColumnName("timestamp");
assert true : "We do not expect a failure here";
}
- config.setTimestampColumnType(null);
+ other.setTimestampColumnType(null);
try {
- other.createTable();
+ other.createTable(mockConnection);
assert false : "missing config param, exception expected";
} catch (CacheLoaderException e) {
- config.setDataColumnName("BIGINT");
+ other.setIdColumnType("BIGINT");
assert true : "We do not expect a failure here";
}
}
public void testCreateTable() throws Exception {
- assert !existsTable(config.getTableName());
- tableManipulation.createTable();
- assert existsTable(config.getTableName());
+ assert !existsTable(connection, tableManipulation.getTableName());
+ tableManipulation.createTable(connection);
+ assert existsTable(connection, tableManipulation.getTableName());
}
@Test(dependsOnMethods = "testCreateTable")
public void testExists() throws CacheLoaderException {
- assert tableManipulation.tableExists();
- assert !tableManipulation.tableExists("does_not_exist");
+ assert tableManipulation.tableExists(connection);
+ assert !tableManipulation.tableExists(connection, "does_not_exist");
}
@Test(dependsOnMethods = "testExists")
public void testDrop() throws Exception {
- assert tableManipulation.tableExists();
+ assert tableManipulation.tableExists(connection);
PreparedStatement ps = null;
try {
- ps = connection.prepareStatement("INSERT INTO horizon_test(KEY_HASH) values(?)");
+ ps = connection.prepareStatement("INSERT INTO horizon_jdbc(ID_COLUMN) values(?)");
ps.setString(1, System.currentTimeMillis() + "");
assert 1 == ps.executeUpdate();
} finally {
JdbcUtil.safeClose(ps);
}
- tableManipulation.dropTable();
- assert !tableManipulation.tableExists();
+ tableManipulation.dropTable(connection);
+ assert !tableManipulation.tableExists(connection);
}
- private boolean existsTable(String tableName) throws Exception {
+ static boolean existsTable(Connection connection, String tableName) throws Exception {
Statement st = connection.createStatement();
ResultSet rs = null;
try {
Copied: core/branches/flat/src/test/java/org/horizon/loader/jdbc/UnitTestDatabaseManager.java (from rev 7827, core/branches/flat/src/test/java/org/horizon/test/UnitTestDatabaseManager.java)
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/jdbc/UnitTestDatabaseManager.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/loader/jdbc/UnitTestDatabaseManager.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,121 @@
+package org.horizon.loader.jdbc;
+
+import org.horizon.loader.jdbc.connectionfactory.ConnectionFactoryConfig;
+import org.horizon.loader.jdbc.connectionfactory.PooledConnectionFactory;
+import org.horizon.test.TestingUtil;
+
+import java.io.File;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.Properties;
+import java.util.StringTokenizer;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * Class that assures concurrent access to the in memory database.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+public class UnitTestDatabaseManager {
+ private static final ConnectionFactoryConfig realConfig = new ConnectionFactoryConfig();
+
+ private static AtomicInteger userIndex = new AtomicInteger(0);
+
+ static {
+ try {
+ Class.forName("org.hsqldb.jdbcDriver");
+ } catch (ClassNotFoundException e) {
+ throw new RuntimeException(e);
+ }
+ realConfig.setDriverClass("org.hsqldb.jdbcDriver");
+ realConfig.setConnectionUrl("jdbc:hsqldb:mem:horizon");
+ realConfig.setConnectionFactoryClass(PooledConnectionFactory.class.getName());
+ realConfig.setUserName("sa");
+ }
+
+ public static ConnectionFactoryConfig getUniqueConnectionFactoryConfig() {
+ synchronized (realConfig) {
+ return returnBasedOnDifferentInstance();
+ }
+ }
+
+ public static void shutdownInMemoryDatabase(ConnectionFactoryConfig config) {
+ Connection conn = null;
+ Statement st = null;
+ try {
+ String shutDownConnection = getShutdownUrl(config);
+ String url = config.getConnectionUrl();
+ assert url != null;
+ conn = DriverManager.getConnection(shutDownConnection);
+ st = conn.createStatement();
+ st.execute("SHUTDOWN");
+ }
+ catch (Throwable e) {
+ throw new IllegalStateException(e);
+ }
+ finally {
+ try {
+ conn.close();
+ st.close();
+ }
+ catch (SQLException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void clearDatabaseFiles(Properties props) {
+ //now delete the disk folder
+ String dbName = getDatabaseName(props);
+ String toDel = TestingUtil.TEST_FILES + File.separator + dbName;
+ TestingUtil.recursiveFileRemove(toDel);
+ }
+
+ private static String getDatabaseName(Properties prop) {
+ StringTokenizer tokenizer = new StringTokenizer(prop.getProperty("cache.jdbc.url"), ":");
+ tokenizer.nextToken();
+ tokenizer.nextToken();
+ tokenizer.nextToken();
+ return tokenizer.nextToken();
+ }
+
+ private static String getShutdownUrl(ConnectionFactoryConfig props) {
+ String url = props.getConnectionUrl();
+ assert url != null;
+ StringTokenizer tokenizer = new StringTokenizer(url, ";");
+ String result = tokenizer.nextToken() + ";" + "shutdown=true";
+ return result;
+ }
+
+ private static ConnectionFactoryConfig returnBasedOnDifferentInstance() {
+ ConnectionFactoryConfig result = realConfig.clone();
+ String jdbcUrl = result.getConnectionUrl();
+ Pattern pattern = Pattern.compile("horizon");
+ Matcher matcher = pattern.matcher(jdbcUrl);
+ boolean found = matcher.find();
+ assert found;
+ String newJdbcUrl = matcher.replaceFirst(extractTestName() + userIndex.incrementAndGet());
+ result.setConnectionUrl(newJdbcUrl);
+ return result;
+ }
+
+ private static String extractTestName() {
+ StackTraceElement[] stack = Thread.currentThread().getStackTrace();
+ if (stack.length == 0) return null;
+ for (int i = stack.length - 1; i > 0; i--) {
+ StackTraceElement e = stack[i];
+ String className = e.getClassName();
+ if (className.indexOf("org.horizon") != -1) return className.replace('.', '_') + "_" + e.getMethodName();
+ }
+ return null;
+ }
+
+ public static TableManipulation buildDefaultTableManipulation() {
+ return new TableManipulation("ID_COLUMN", "VARCHAR(255)", "HORIZON_JDBC", "DATA_COLUMN",
+ "BINARY", "TIMESTAMP_COLUMN", "BIGINT");
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/loader/jdbc/UnitTestDatabaseManager.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Added: core/branches/flat/src/test/java/org/horizon/lock/StripedLockTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/lock/StripedLockTest.java (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/lock/StripedLockTest.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -0,0 +1,153 @@
+package org.horizon.lock;
+
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * Tester class for {@link org.horizon.lock.StripedLock}.
+ *
+ * @author Mircea.Markus(a)jboss.com
+ */
+@Test (groups = "unit", testName = "lock.StripedLockTest")
+public class StripedLockTest {
+
+ StripedLock stripedLock;
+
+ public static final int CAN_ACQUIRE_WL = 1;
+ public static final int CAN_ACQUIRE_RL = 2;
+ public static final int ACQUIRE_WL = 3;
+ public static final int ACQUIRE_RL = 4;
+ /* this value will make sure that the index of the underlying shared lock is not 0*/
+ private static final String KEY = "21321321321321321";
+
+
+ @BeforeMethod
+ public void cretateStripedLock() {
+ stripedLock = new StripedLock(5);
+ }
+
+ public void testGlobalReadLockSimple() throws Exception {
+ assert canAquireWL();
+ assert canAquireRL();
+ assert stripedLock.aquireGlobalLock(false, 0);
+ assert stripedLock.getTotalReadLockCount() == stripedLock.getSharedLockCount();
+ assert !canAquireWL();
+ assert canAquireRL();
+ }
+
+ public void testGlobalReadLockIsAtomic() throws Exception {
+ assert aquireWL();
+ assert 1 == stripedLock.getTotalWriteLockCount();
+ assert !stripedLock.aquireGlobalLock(false, 0);
+ assert stripedLock.getTotalReadLockCount() == 0 : "No read locks should be held if the operation failed";
+ }
+
+ public void testGlobalReadLockOverExistingReadLocks() throws Exception {
+ assert aquireRL();
+ assert aquireRL();
+ assert stripedLock.getTotalReadLockCount() == 2;
+ assert stripedLock.aquireGlobalLock(false, 0);
+ assert stripedLock.getTotalReadLockCount() == stripedLock.getSharedLockCount() + 2;
+ }
+
+ public void testAquireGlobalAndRelease() {
+ assert stripedLock.aquireGlobalLock(false, 0);
+ assert stripedLock.getTotalReadLockCount() == stripedLock.getSharedLockCount();
+ assert stripedLock.getTotalWriteLockCount() == 0;
+ try {
+ stripedLock.releaseGlobalLock(true); //this should not fail
+ assert false : "this should fail as we do not have a monitor over the locks";
+ } catch (Exception e) {
+ //expected
+ }
+ stripedLock.releaseGlobalLock(false);
+ assert stripedLock.getTotalReadLockCount() == 0;
+ assert stripedLock.getTotalWriteLockCount() == 0;
+
+ assert stripedLock.aquireGlobalLock(true, 0);
+ assert stripedLock.getTotalReadLockCount() == 0;
+ assert stripedLock.getTotalWriteLockCount() == stripedLock.getSharedLockCount();
+
+ try {
+ stripedLock.releaseGlobalLock(false); //this should not fail
+ assert false : "this should fail as we do not have a monitor over the locks";
+ } catch (Exception e) {
+ //expected
+ }
+ stripedLock.releaseGlobalLock(true);
+ assert stripedLock.getTotalReadLockCount() == 0;
+ assert stripedLock.getTotalWriteLockCount() == 0;
+
+
+
+ }
+
+ private boolean aquireWL() throws Exception {
+ OtherThread otherThread = new OtherThread();
+ otherThread.start();
+ otherThread.operationQueue.put(ACQUIRE_WL);
+ return otherThread.responseQueue.take();
+ }
+
+ private boolean aquireRL() throws Exception {
+ OtherThread otherThread = new OtherThread();
+ otherThread.start();
+ otherThread.operationQueue.put(ACQUIRE_RL);
+ return otherThread.responseQueue.take();
+ }
+
+ private boolean canAquireRL() throws Exception {
+ OtherThread otherThread = new OtherThread();
+ otherThread.start();
+ otherThread.operationQueue.put(CAN_ACQUIRE_RL);
+ return otherThread.responseQueue.take();
+ }
+
+ private boolean canAquireWL() throws Exception {
+ OtherThread otherThread = new OtherThread();
+ otherThread.start();
+ otherThread.operationQueue.put(CAN_ACQUIRE_WL);
+ return otherThread.responseQueue.take();
+ }
+
+ public class OtherThread extends Thread {
+ volatile BlockingQueue<Integer> operationQueue = new ArrayBlockingQueue<Integer>(1);
+ volatile BlockingQueue<Boolean> responseQueue = new ArrayBlockingQueue<Boolean>(1);
+
+ public void run() {
+ try {
+ int operation = operationQueue.take();
+ Boolean response;
+ switch (operation) {
+ case CAN_ACQUIRE_RL: {
+ response = stripedLock.acquireLock(KEY, false, 0);
+ if (response) stripedLock.releaseLock(KEY);
+ break;
+ }
+ case CAN_ACQUIRE_WL: {
+ response = stripedLock.acquireLock(KEY, true, 0);
+ if (response) stripedLock.releaseLock(KEY);
+ break;
+ }
+ case ACQUIRE_RL: {
+ response = stripedLock.acquireLock(KEY, false, 0);
+ break;
+ }
+ case ACQUIRE_WL: {
+ response = stripedLock.acquireLock(KEY, true, 0);
+ break;
+ }
+ default: {
+ throw new IllegalStateException("Unknown operation: " + operation);
+ }
+ }
+ responseQueue.put(response);
+ } catch (Throwable e) {
+ e.printStackTrace();
+ }
+ }
+ }
+}
Property changes on: core/branches/flat/src/test/java/org/horizon/lock/StripedLockTest.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Deleted: core/branches/flat/src/test/java/org/horizon/test/UnitTestDatabaseManager.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/UnitTestDatabaseManager.java 2009-03-02 19:19:45 UTC (rev 7827)
+++ core/branches/flat/src/test/java/org/horizon/test/UnitTestDatabaseManager.java 2009-03-03 13:01:12 UTC (rev 7828)
@@ -1,113 +0,0 @@
-package org.horizon.test;
-
-import org.horizon.loader.jdbc.JdbcCacheStoreConfig;
-
-import java.io.File;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.util.Properties;
-import java.util.StringTokenizer;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-
-/**
- * @author Mircea.Markus(a)jboss.com
- */
-public class UnitTestDatabaseManager {
- private static final JdbcCacheStoreConfig realConfig = new JdbcCacheStoreConfig();
-
- private static AtomicInteger userIndex = new AtomicInteger(0);
-
- static {
- realConfig.setTableName("horizon");
- realConfig.setCreateTableOnStart(true);
- realConfig.setPrimaryKey("horizon_pk");
- realConfig.setKeyColumnName("key");
- realConfig.setKeyColumnType("varchar(255)");
- realConfig.setDataColumnName("bucket");
- realConfig.setDataColumnType("BINARY");
- realConfig.setDriverClass("org.hsqldb.jdbcDriver");
- realConfig.setConnectionUrl("jdbc:hsqldb:mem:horizon");
- realConfig.setUserName("sa");
- }
-
- public static JdbcCacheStoreConfig getUniqueJdbcCacheStoreConfig() {
- synchronized (realConfig) {
- return returnBasedOnDifferentInstance();
- }
- }
-
- public static void shutdownInMemoryDatabase(JdbcCacheStoreConfig config) {
- Connection conn = null;
- Statement st = null;
- try {
- String shutDownConnection = getShutdownUrl(config);
- String url = config.getConnectionUrl();
- assert url != null;
- conn = DriverManager.getConnection(shutDownConnection);
- st = conn.createStatement();
- st.execute("SHUTDOWN");
- }
- catch (Throwable e) {
- throw new IllegalStateException(e);
- }
- finally {
- try {
- conn.close();
- st.close();
- }
- catch (SQLException e) {
- e.printStackTrace();
- }
- }
- }
-
- public static void clearDatabaseFiles(Properties props) {
- //now delete the disk folder
- String dbName = getDatabaseName(props);
- String toDel = TestingUtil.TEST_FILES + File.separator + dbName;
- TestingUtil.recursiveFileRemove(toDel);
- }
-
- public static String getDatabaseName(Properties prop) {
- StringTokenizer tokenizer = new StringTokenizer(prop.getProperty("cache.jdbc.url"), ":");
- tokenizer.nextToken();
- tokenizer.nextToken();
- tokenizer.nextToken();
- return tokenizer.nextToken();
- }
-
- private static String getShutdownUrl(JdbcCacheStoreConfig props) {
- String url = props.getConnectionUrl();
- assert url != null;
- StringTokenizer tokenizer = new StringTokenizer(url, ";");
- String result = tokenizer.nextToken() + ";" + "shutdown=true";
- return result;
- }
-
- private static JdbcCacheStoreConfig returnBasedOnDifferentInstance() {
- JdbcCacheStoreConfig result = realConfig.clone();
- String jdbcUrl = result.getConnectionUrl();
- Pattern pattern = Pattern.compile("horizon");
- Matcher matcher = pattern.matcher(jdbcUrl);
- boolean found = matcher.find();
- assert found;
- String newJdbcUrl = matcher.replaceFirst(extractTestName() + userIndex.incrementAndGet());
- result.setConnectionUrl(newJdbcUrl);
- return result;
- }
-
- private static String extractTestName() {
- StackTraceElement[] stack = Thread.currentThread().getStackTrace();
- if (stack.length == 0) return null;
- for (int i = stack.length - 1; i > 0; i--) {
- StackTraceElement e = stack[i];
- String className = e.getClassName();
- if (className.indexOf("org.horizon") != -1) return className.replace('.', '_') + "_" + e.getMethodName();
- }
- return null;
- }
-}
15 years, 10 months
JBoss Cache SVN: r7827 - core/branches/flat/src/main/java/org/horizon/loader/bdbje.
by jbosscache-commits@lists.jboss.org
Author: adriancole
Date: 2009-03-02 14:19:45 -0500 (Mon, 02 Mar 2009)
New Revision: 7827
Modified:
core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
Log:
clarified location to store environment overrides
Modified: core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java 2009-03-02 19:04:28 UTC (rev 7826)
+++ core/branches/flat/src/main/java/org/horizon/loader/bdbje/BdbjeCacheStore.java 2009-03-02 19:19:45 UTC (rev 7827)
@@ -37,9 +37,9 @@
* name}</tt></li> {@link StoredEntry stored entries} are stored here, keyed on {@link
* org.horizon.loader.StoredEntry#getKey()} <li>class catalog database: <tt>/{location}/CacheInstance-{@link
* org.horizon.Cache#getName() name}_class_catalog</tt></li> class descriptions are stored here for efficiency reasons.
- * </ol>
+ * </ol> <p/> <p><tt>/{location}/je.properties</tt> is optional and will override any parameters to the internal
+ * SleepyCat JE {@link EnvironmentConfig}.</p>
* <p/>
- * <p/>
* All data access is transactional. Any attempted reads to locked records will block. The maximum duration of this is
* set in nanoseconds via the parameter {@link org.horizon.loader.bdbje.BdbjeCacheStoreConfig#getLockAcquistionTimeout()}.
* Calls to {@link BdbjeCacheStore#prepare(java.util.List, javax.transaction.Transaction, boolean) prepare} will attempt
15 years, 10 months