Author: manik.surtani(a)jboss.com
Date: 2009-02-25 10:52:32 -0500 (Wed, 25 Feb 2009)
New Revision: 7785
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
core/branches/flat/src/test/java/org/horizon/statetransfer/
core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Removed:
core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
Modified:
core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
core/branches/flat/src/main/java/org/horizon/config/Configuration.java
core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
core/branches/flat/src/main/java/org/horizon/util/Util.java
core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
core/branches/flat/src/test/resources/configs/named-cache-test.xml
core/branches/flat/src/test/resources/log4j.xml
Log:
Initial state transfer impl
Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-25 12:06:48
UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java 2009-02-25 15:52:32
UTC (rev 7785)
@@ -54,6 +54,7 @@
import org.horizon.marshall.Marshaller;
import org.horizon.notifications.cachelistener.CacheNotifier;
import org.horizon.remoting.RPCManager;
+import org.horizon.statetransfer.StateTransferManager;
import javax.transaction.Transaction;
import javax.transaction.TransactionManager;
@@ -84,6 +85,8 @@
private DataContainer dataContainer;
private static final Log log = LogFactory.getLog(CacheDelegate.class);
private CacheManager cacheManager;
+ // this is never used here but should be injected - this is a hack to make sure the
StateTransferManager is properly constructed if needed.
+ private StateTransferManager stateTransferManager;
public CacheDelegate(String name) {
this.name = name;
@@ -101,7 +104,7 @@
BatchContainer batchContainer,
RPCManager rpcManager, DataContainer dataContainer,
Marshaller marshaller,
- CacheManager cacheManager) {
+ CacheManager cacheManager, StateTransferManager
stateTransferManager) {
this.invocationContextContainer = invocationContextContainer;
this.commandsFactory = commandsFactory;
this.invoker = interceptorChain;
@@ -115,6 +118,7 @@
this.dataContainer = dataContainer;
this.marshaller = marshaller;
this.cacheManager = cacheManager;
+ this.stateTransferManager = stateTransferManager;
}
@SuppressWarnings("unchecked")
Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-02-25
12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -98,6 +98,9 @@
return this == REPL_SYNC || this == INVALIDATION_SYNC || this == LOCAL;
}
+ public boolean isClustered() {
+ return this != LOCAL;
+ }
}
//
------------------------------------------------------------------------------------------------------------
Modified:
core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -148,7 +148,6 @@
s.add(TransactionManagerFactory.class);
s.add(ReplicationQueueFactory.class);
s.add(StateTransferManagerFactory.class);
- s.add(StateTransferFactory.class);
s.add(LockManagerFactory.class);
s.add(DataContainerFactory.class);
s.add(EvictionManagerFactory.class);
Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -109,9 +109,13 @@
globalComponents.start();
}
boolean needToNotify = state != ComponentStatus.RUNNING && state !=
ComponentStatus.INITIALIZING;
+
+ // set this up *before* starting the components since some components -
specifically state transfer - needs to be
+ // able to locate this registry via the InboundInvocationHandler
+ globalComponents.registerNamedComponentRegistry(this, cacheName);
+
super.start();
if (needToNotify && state == ComponentStatus.RUNNING) {
- globalComponents.registerNamedComponentRegistry(this, cacheName);
cacheManagerNotifier.notifyCacheStarted(cacheName);
}
}
Deleted: core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.factories;
-
-/**
- * Factory class able to create {@link StateTransferGenerator} and {@link
StateTransferIntegrator} instances.
- * <p/>
- * Updated in 3.0.0 to extend ComponentFactory, etc.
- * <p/>
- *
- * @author <a href="brian.stansberry(a)jboss.com">Brian
Stansberry</a>
- * @author Manik Surtani
- * @since 1.0
- */
-// TODO: Implement me
-//@DefaultFactoryFor(classes = {StateTransferGenerator.class,
StateTransferIntegrator.class})
-public class StateTransferFactory extends AbstractComponentFactory implements
AutoInstantiableFactory {
- public <T> T construct(Class<T> componentType) {
- return null;
-// if (componentType.equals(StateTransferIntegrator.class))
-// {
-// return componentType.cast(new DefaultStateTransferIntegrator());
-// }
-// else
-// {
-// return componentType.cast(new DefaultStateTransferGenerator());
-// }
- }
-}
Modified:
core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -22,7 +22,7 @@
package org.horizon.factories;
import org.horizon.factories.annotations.DefaultFactoryFor;
-import org.horizon.statetransfer.DefaultStateTransferManager;
+import org.horizon.statetransfer.StateTransferManagerImpl;
import org.horizon.statetransfer.StateTransferManager;
/**
@@ -32,8 +32,11 @@
* @since 1.0
*/
@DefaultFactoryFor(classes = StateTransferManager.class)
-public class StateTransferManagerFactory extends AbstractComponentFactory implements
AutoInstantiableFactory {
+public class StateTransferManagerFactory extends AbstractNamedCacheComponentFactory
implements AutoInstantiableFactory {
public <T> T construct(Class<T> componentType) {
- return componentType.cast(new DefaultStateTransferManager());
+ if (configuration.getCacheMode().isClustered() &&
configuration.isFetchInMemoryState())
+ return componentType.cast(new StateTransferManagerImpl());
+ else
+ return null;
}
}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -3,10 +3,15 @@
import org.horizon.commands.RPCCommand;
import org.horizon.factories.scopes.Scope;
import org.horizon.factories.scopes.Scopes;
+import org.horizon.statetransfer.StateTransferException;
+import java.io.InputStream;
+import java.io.OutputStream;
+
/**
* A globally scoped component, that is able to locate named caches and invoke remotely
originating calls on the
- * appropriate cache.
+ * appropriate cache. The primary goal of this component is to act as a bridge between
the globally scoped {@link org.horizon.remoting.RPCManager}
+ * and named-cache scoped components.
*
* @author Manik Surtani
* @since 1.0
@@ -19,6 +24,27 @@
*
* @param command command to invoke
* @return results, if any, from the invocation
+ * @throws Throwable in the event of problems executing the command
*/
Object handle(RPCCommand command) throws Throwable;
+
+ /**
+ * Applies state onto a named cache. State to be read from the stream.
Implementations should NOT close the stream
+ * after use.
+ *
+ * @param cacheName name of cache to apply state
+ * @param i stream to read from
+ * @throws StateTransferException in the event of problems
+ */
+ void applyState(String cacheName, InputStream i) throws StateTransferException;
+
+ /**
+ * Generates state from a named cache. State to be written to the stream.
Implementations should NOT close the stream
+ * after use.
+ *
+ * @param cacheName name of cache from which to generate state
+ * @param o stream to write state to
+ * @throws StateTransferException in the event of problems
+ */
+ void generateState(String cacheName, OutputStream o) throws StateTransferException;
}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -12,7 +12,12 @@
import org.horizon.invocation.InvocationContextContainer;
import org.horizon.logging.Log;
import org.horizon.logging.LogFactory;
+import org.horizon.statetransfer.StateTransferException;
+import org.horizon.statetransfer.StateTransferManager;
+import java.io.InputStream;
+import java.io.OutputStream;
+
/**
* Sets the cache interceptor chain on an RPCCommand before calling it to perform
*
@@ -34,9 +39,15 @@
String cacheName = cmd.getCacheName();
ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
if (cr == null) {
- log.info("Cache named {0} does not exist on this cache manager!",
cacheName);
+ if (log.isInfoEnabled()) log.info("Cache named {0} does not exist on this
cache manager!", cacheName);
return null;
}
+
+ if (!cr.getStatus().allowInvocations()) {
+ if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in
a state to handle invocations. Its state is {1}", cacheName, cr.getStatus());
+ return null;
+ }
+
InterceptorChain ic = cr.getComponent(InterceptorChain.class);
InvocationContextContainer icc =
cr.getComponent(InvocationContextContainer.class);
CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
@@ -46,4 +57,23 @@
commandsFactory.initializeReplicableCommand(cmd);
return cmd.perform(icc.get());
}
+
+ public void applyState(String cacheName, InputStream i) throws StateTransferException
{
+ getStateTransferManager(cacheName).applyState(i);
+ }
+
+ public void generateState(String cacheName, OutputStream o) throws
StateTransferException {
+ getStateTransferManager(cacheName).generateState(o);
+ }
+
+ private StateTransferManager getStateTransferManager(String cacheName) throws
StateTransferException {
+ ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
+ if (cr == null) {
+ String msg = "Cache named "+cacheName+" does not exist on this
cache manager!";
+ log.info(msg);
+ throw new StateTransferException(msg);
+ }
+
+ return cr.getComponent(StateTransferManager.class);
+ }
}
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-02-25
12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -27,6 +27,7 @@
import org.horizon.factories.scopes.Scopes;
import org.horizon.lifecycle.Lifecycle;
import org.horizon.remoting.transport.Address;
+import org.horizon.statetransfer.StateTransferException;
import java.util.List;
@@ -111,4 +112,14 @@
* @return a list of members. Typically, this would be defensively copied.
*/
List<Address> getMembers();
+
+ /**
+ * Initiates a state retrieval process from neighbouring caches. This method will
block until it either times out,
+ * or state is retrieved and applied.
+ *
+ * @param cacheName name of cache requesting state
+ * @param timeout length of time to try to retrieve state on each peer
+ * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+ */
+ void retrieveState(String cacheName, long timeout) throws StateTransferException;
}
\ No newline at end of file
Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-02-25
12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -10,10 +10,13 @@
import org.horizon.jmx.annotations.MBean;
import org.horizon.jmx.annotations.ManagedAttribute;
import org.horizon.jmx.annotations.ManagedOperation;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
import org.horizon.marshall.Marshaller;
import org.horizon.notifications.cachemanagerlistener.CacheManagerNotifier;
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
import java.text.NumberFormat;
import java.util.List;
@@ -34,6 +37,7 @@
private final AtomicLong replicationCount = new AtomicLong(0);
private final AtomicLong replicationFailures = new AtomicLong(0);
boolean statisticsEnabled = false; // by default, don't gather statistics.
+ private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
@Inject
public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t,
InboundInvocationHandler handler,
@@ -44,7 +48,7 @@
this.t.initialize(globalConfiguration,
globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
}
- @Start
+ @Start(priority = 10)
public void start() {
t.start();
}
@@ -82,6 +86,47 @@
return t.getMembers();
}
+ public void retrieveState(String cacheName, long timeout) throws
StateTransferException {
+ List<Address> members = getMembers();
+ if (members.size() < 2) {
+ if (log.isDebugEnabled())
+ log.debug("We're the only member in the cluster; no one to retrieve
state from. Not doing anything!");
+ return;
+ }
+
+ boolean success = false;
+ outer:
+ for (int i = 0, wait = 1000; i < 5; i++) {
+ for (Address member : members) {
+ if (!member.equals(getAddress())) {
+ try {
+ if (log.isInfoEnabled()) log.info("Trying to fetch state from
{0}", member);
+ if (t.retrieveState(cacheName, member, timeout)) {
+ if (log.isInfoEnabled()) log.info("Successfully retrieved and
applied state from {0}", member);
+ success = true;
+ break outer;
+ }
+ } catch (StateTransferException e) {
+ if (log.isDebugEnabled()) log.debug("Error while fetching state
from member " + member, e);
+ }
+ }
+
+ if (!success) {
+ if (log.isWarnEnabled()) log.warn("Could not find available peer for
state, backing off and retrying");
+
+ try {
+ Thread.sleep(wait <<= 2);
+ }
+ catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+ }
+
+ if (!success) throw new StateTransferException("Unable to fetch state on
startup");
+ }
+
// -------------------------------------------- JMX information
-----------------------------------------------
@ManagedOperation
Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -11,6 +11,7 @@
import org.horizon.remoting.InboundInvocationHandler;
import org.horizon.remoting.ResponseFilter;
import org.horizon.remoting.ResponseMode;
+import org.horizon.statetransfer.StateTransferException;
import java.util.List;
import java.util.Properties;
@@ -78,4 +79,16 @@
* @return a list of members. Typically, this would be defensively copied.
*/
List<Address> getMembers();
+
+ /**
+ * Initiates a state retrieval from a specific cache (by typically invoking {@link
org.horizon.remoting.InboundInvocationHandler#generateState(String,
java.io.OutputStream)}),
+ * and applies this state to the current cache via the {@link
InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+ *
+ * @param cacheName name of cache for which to retrieve state
+ * @param address address of remote cache from which to retrieve state
+ * @param timeout state retrieval timeout in milliseconds
+ * @throws org.horizon.statetransfer.StateTransferException if state cannot be
retrieved from the specific cache
+ * @return true if state was transferred and applied successfully, false if it timed
out.
+ */
+ boolean retrieveState(String cacheName, Address address, long timeout) throws
StateTransferException;
}
Modified:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -15,24 +15,30 @@
import org.horizon.remoting.ResponseMode;
import org.horizon.remoting.transport.Address;
import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
import org.horizon.util.FileLookup;
+import org.horizon.util.Util;
import org.jgroups.Channel;
import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMessageListener;
import org.jgroups.JChannel;
import org.jgroups.MembershipListener;
import org.jgroups.Message;
-import org.jgroups.MessageListener;
import org.jgroups.View;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Properties;
import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
/**
@@ -41,7 +47,7 @@
* @author Manik Surtani
* @since 1.0
*/
-public class JGroupsTransport implements Transport, MembershipListener, MessageListener
{
+public class JGroupsTransport implements Transport, MembershipListener,
ExtendedMessageListener {
public static final String CONFIGURATION_STRING = "configurationString";
public static final String CONFIGURATION_XML = "configurationXml";
public static final String CONFIGURATION_FILE = "configurationFile";
@@ -61,7 +67,15 @@
Marshaller marshaller;
ExecutorService asyncExecutor;
CacheManagerNotifier notifier;
+ final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new
ConcurrentHashMap<String, StateTransferMonitor>();
+ /**
+ * Reference to an exception that was raised during state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+
+
//
------------------------------------------------------------------------------------------------------------------
// Lifecycle and setup stuff
//
------------------------------------------------------------------------------------------------------------------
@@ -204,6 +218,27 @@
return members;
}
+ public boolean retrieveState(String cacheName, Address address, long timeout) throws
StateTransferException {
+ boolean cleanup = false;
+ try {
+ StateTransferMonitor mon = new StateTransferMonitor();
+ if (stateTransfersInProgress.putIfAbsent(cacheName, mon) != null)
+ throw new StateTransferException("There already appears to be a state
transfer in progress for the cache named " + cacheName);
+
+ cleanup = true;
+ ((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout,
false);
+ mon.waitForState();
+ return true;
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ if (log.isInfoEnabled()) log.info("Unable to retrieve state from member
" + address, e);
+ return false;
+ } finally {
+ if (cleanup) stateTransfersInProgress.remove(cacheName);
+ }
+ }
+
public Address getAddress() {
if (address == null) {
address = new JGroupsAddress(channel.getLocalAddress());
@@ -325,8 +360,7 @@
}
public void block() {
- // TODO: Do we need these for state transfer?
- // a no-op for now
+ // a no-op
}
public void receive(Message msg) {
@@ -334,17 +368,55 @@
}
public byte[] getState() {
- // TODO: Do we need these for state transfer?
- // a no-op for now
- return null;
+ throw new UnsupportedOperationException("Retrieving state for the entire cache
system is not supported!");
}
public void setState(byte[] state) {
- // TODO: Do we need these for state transfer?
- // a no-op for now
+ throw new UnsupportedOperationException("Setting state for the entire cache
system is not supported!");
}
+ public byte[] getState(String state_id) {
+ throw new UnsupportedOperationException("Non-stream-based state retrieval is
not supported! Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+ }
+ public void setState(String state_id, byte[] state) {
+ throw new UnsupportedOperationException("Non-stream-based state retrieval is
not supported! Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+ }
+
+ public void getState(OutputStream ostream) {
+ throw new UnsupportedOperationException("Retrieving state for the entire cache
system is not supported!");
+ }
+
+ public void getState(String cacheName, OutputStream ostream) {
+ if (trace) log.trace("Received request to generate state for cache {0}.
Attempting to generate state.", cacheName);
+ try {
+ inboundInvocationHandler.generateState(cacheName, ostream);
+ } catch (StateTransferException e) {
+ log.error("Caught while responding to state transfer request", e);
+ } finally {
+ Util.closeStream(ostream);
+ }
+ }
+
+ public void setState(InputStream istream) {
+ throw new UnsupportedOperationException("Setting state for the entire cache
system is not supported!");
+ }
+
+ public void setState(String cacheName, InputStream istream) {
+ if (trace) log.trace("Received state for cache {0}. Attempting to apply
state.", cacheName);
+ StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+ try {
+ inboundInvocationHandler.applyState(cacheName, istream);
+ mon.notifyStateReceiptSucceeded();
+ } catch (StateTransferException e) {
+ log.error("Failed setting state", e);
+ mon.notifyStateReceiptFailed(e);
+ } finally {
+ Util.closeStream(istream);
+ }
+ }
+
+
//
------------------------------------------------------------------------------------------------------------------
// Helpers to convert between Address types
//
------------------------------------------------------------------------------------------------------------------
@@ -361,6 +433,10 @@
return retval;
}
+ private org.jgroups.Address toJGroupsAddress(Address a) {
+ return ((JGroupsAddress) a).address;
+ }
+
private List<Address> fromJGroupsAddressList(List<org.jgroups.Address>
list) {
if (list == null || list.isEmpty()) return Collections.emptyList();
Added:
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,55 @@
+package org.horizon.remoting.transport.jgroups;
+
+import org.horizon.statetransfer.StateTransferException;
+
+public class StateTransferMonitor {
+ /**
+ * Reference to an exception that was raised during state installation on this cache.
+ */
+ protected volatile StateTransferException setStateException;
+ private final Object stateLock = new Object();
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+ public boolean isStateSet() {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet) {
+ isStateSet = stateSet;
+ }
+
+ public StateTransferException getSetStateException() {
+ return setStateException;
+ }
+
+ public void waitForState() throws Exception {
+ synchronized (stateLock) {
+ while (!isStateSet) {
+ if (setStateException != null) {
+ throw setStateException;
+ }
+
+ try {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex) {
+ }
+ }
+ }
+ }
+
+ public void notifyStateReceiptSucceeded() {
+ synchronized (stateLock) {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+
+ public void notifyStateReceiptFailed(StateTransferException setStateException) {
+ this.setStateException = setStateException;
+ notifyStateReceiptSucceeded();
+ }
+}
Deleted:
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
- */
-package org.horizon.statetransfer;
-
-import org.horizon.factories.annotations.Inject;
-import org.horizon.factories.annotations.Start;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class DefaultStateTransferManager implements StateTransferManager {
-
- @Inject
- public void injectDependencies() {
- }
-
- @Start(priority = 14)
- public void start() {
- }
-
- public void getState(ObjectOutputStream out, Object o, long timeout, boolean force,
boolean suppressErrors) throws Exception {
- throw new UnsupportedOperationException("Implement me properly!");
- }
-
- public void setState(ObjectInputStream in, Object o) throws Exception {
- throw new UnsupportedOperationException("fix me!");
- }
-
- // TODO: implement me
- protected void setState()
- {
- }
-}
Added:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,24 @@
+package org.horizon.statetransfer;
+
+/**
+ * An exception to denote problems in transferring state between cache instances in a
cluster
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class StateTransferException extends Exception {
+ public StateTransferException() {
+ }
+
+ public StateTransferException(String message) {
+ super(message);
+ }
+
+ public StateTransferException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public StateTransferException(Throwable cause) {
+ super(cause);
+ }
+}
Modified:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -21,12 +21,19 @@
*/
package org.horizon.statetransfer;
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Handles generation and application of state on the cache
+ */
+(a)Scope(Scopes.NAMED_CACHE)
public interface StateTransferManager {
- void getState(ObjectOutputStream out, Object o, long timeout, boolean force, boolean
suppressErrors) throws Exception;
+ void generateState(OutputStream out) throws StateTransferException;
- void setState(ObjectInputStream in, Object o) throws Exception;
+ void applyState(InputStream in) throws StateTransferException;
}
Copied:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
(from rev 7782,
core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java)
===================================================================
---
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
(rev 0)
+++
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site:
http://www.fsf.org.
+ */
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.Configuration;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Start;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.RPCManager;
+import org.horizon.util.Util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public class StateTransferManagerImpl implements StateTransferManager {
+
+ RPCManager rpcManager;
+ Cache cache;
+ Configuration configuration;
+ private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+ private static final Delimiter DELIMITER = new Delimiter();
+
+ @Inject
+ public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration
configuration) {
+ this.rpcManager = rpcManager;
+ this.cache = cache;
+ this.configuration = configuration;
+ }
+
+ @Start(priority = 14)
+ // it is imperative that this starts *after* the RPCManager does.
+ public void start() throws StateTransferException {
+ long startTime = 0;
+ if (log.isDebugEnabled()) {
+ log.debug("Initiating state transfer process");
+ startTime = System.currentTimeMillis();
+ }
+
+ rpcManager.retrieveState(cache.getName(),
configuration.getStateRetrievalTimeout());
+
+ if (log.isDebugEnabled()) {
+ long duration = System.currentTimeMillis() - startTime;
+ log.debug("State transfer process completed in {0}",
Util.prettyPrintTime(duration));
+ }
+ }
+
+ public void generateState(OutputStream out) throws StateTransferException {
+ if (log.isDebugEnabled()) log.debug("Generating state");
+
+ try {
+ ObjectOutputStream oos = new ObjectOutputStream(out);
+ delimit(oos);
+ generateInMemoryState(oos);
+ delimit(oos);
+ generatePersistentState(oos);
+ delimit(oos);
+ oos.flush();
+ oos.close();
+ // just close the object stream but do NOT close the underlying stream
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ }
+
+ public void applyState(InputStream in) throws StateTransferException {
+ if (log.isDebugEnabled()) log.debug("Applying state");
+
+ try {
+ ObjectInputStream ois = new ObjectInputStream(in);
+ assertDelimited(ois);
+ applyInMemoryState(ois);
+ assertDelimited(ois);
+ applyPersistentState(ois);
+ assertDelimited(ois);
+ ois.close();
+ // just close the object stream but do NOT close the underlying stream
+ } catch (StateTransferException ste) {
+ throw ste;
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ }
+
+ private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException
{
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void applyPersistentState(ObjectInputStream i) throws StateTransferException
{
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void generatePersistentState(ObjectOutputStream o) throws
StateTransferException {
+ throw new StateTransferException("Implement me!");
+ }
+
+ private void delimit(ObjectOutputStream o) throws IOException {
+ o.writeObject(DELIMITER);
+ }
+
+ private void assertDelimited(ObjectInputStream i) throws StateTransferException {
+ Object o;
+ try {
+ o = i.readObject();
+ } catch (Exception e) {
+ throw new StateTransferException(e);
+ }
+ if ((o == null) || !(o instanceof Delimiter)) throw new
StateTransferException("Expected a delimiter, recieved " + o);
+ }
+
+ // used as a marker for streams.
+ private static final class Delimiter implements Serializable {
+
+ }
+}
Property changes on:
core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
___________________________________________________________________
Name: svn:keywords
+ Id Revision
Name: svn:eol-style
+ LF
Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-02-25 12:06:48 UTC
(rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java 2009-02-25 15:52:32 UTC
(rev 7785)
@@ -21,7 +21,10 @@
*/
package org.horizon.util;
+import java.io.InputStream;
+import java.io.OutputStream;
import java.lang.reflect.Method;
+import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;
@@ -126,4 +129,46 @@
return "Added Entries " + addedEntries + " Removeed Entries
" + removedEntries + " Modified Entries " + modifiedEntries;
}
}
+
+ /**
+ * Prints a time for display
+ *
+ * @param millis time in millis
+ * @return the time, represented as millis, seconds, minutes or hours as appropriate,
with suffix
+ */
+ public static String prettyPrintTime(long millis) {
+ if (millis < 1000) return millis + " milliseconds";
+ NumberFormat nf = NumberFormat.getNumberInstance();
+ nf.setMaximumFractionDigits(2);
+ double toPrint = ((double) millis) / 1000;
+ if (toPrint < 300) {
+ return nf.format(toPrint) + " seconds";
+ }
+
+ toPrint = toPrint / 60;
+
+ if (toPrint < 120) {
+ return nf.format(toPrint) + " minutes";
+ }
+
+ toPrint = toPrint / 60;
+
+ return nf.format(toPrint) + " hours";
+ }
+
+ public static void closeStream(InputStream i) {
+ try {
+ if (i != null) i.close();
+ } catch (Exception e) {
+
+ }
+ }
+
+ public static void closeStream(OutputStream o) {
+ try {
+ if (o != null) o.close();
+ } catch (Exception e) {
+
+ }
+ }
}
Modified: core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -22,6 +22,7 @@
cm1 = TestingUtil.createClusteredCacheManager();
cfg = new Configuration();
cfg.setCacheMode(CacheMode.REPL_SYNC);
+ cfg.setFetchInMemoryState(false);
cm1.defineCache("cache", cfg);
}
Modified:
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -31,12 +31,10 @@
TransactionManager tm1;
protected void createCacheManagers() throws Throwable {
- Configuration c = new Configuration();
+ Configuration c = getDefaultConfig();
c.setInvocationBatchingEnabled(true);
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
- c.setSyncCommitPhase(true);
- c.setSyncRollbackPhase(true);
createClusteredCaches(2, "replSync", c);
Modified:
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -52,8 +52,7 @@
c = namedCaches.get("syncRepl");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
c = namedCaches.get("asyncRepl");
@@ -61,23 +60,20 @@
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert !c.isUseReplQueue();
assert !c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
c = namedCaches.get("asyncReplQueue");
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert c.isUseReplQueue();
assert c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
c = namedCaches.get("txSyncRepl");
assert
c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
c = namedCaches.get("overriding");
@@ -112,8 +108,7 @@
assert c.getTransactionManagerLookupClass() == null;
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
@@ -126,8 +121,7 @@
assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
assert !c.isUseReplQueue();
assert !c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
assert c.getConcurrencyLevel() == 100;
@@ -141,8 +135,7 @@
assert c.getReplQueueInterval() == 1234;
assert c.getReplQueueMaxElements() == 100;
assert c.isUseAsyncSerialization();
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
assert c.getConcurrencyLevel() == 100;
@@ -151,8 +144,7 @@
c.applyOverrides(namedCaches.get("txSyncRepl"));
assert
c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
- assert c.isFetchInMemoryState();
- assert c.getStateRetrievalTimeout() == 15000;
+ assert !c.isFetchInMemoryState();
assert c.getSyncReplTimeout() == 15000;
assert c.getLockAcquisitionTimeout() == 1000;
assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
Modified:
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -45,7 +45,7 @@
cm1 = addClusterEnabledCacheManager();
cm2 = addClusterEnabledCacheManager();
- Configuration conf = new Configuration();
+ Configuration conf = getDefaultConfig();
conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
DummyInMemoryCacheStore.Cfg cfg = new DummyInMemoryCacheStore.Cfg();
cfg.setStore("Store-" + storeCounter.getAndIncrement());
@@ -64,7 +64,7 @@
((DummyInMemoryCacheStore.Cfg)
conf.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig()).setStore("Store-"
+ storeCounter.getAndIncrement());
cm2.defineCache("pushing", conf);
- conf = new Configuration();
+ conf = getDefaultConfig();
conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
cfg = new DummyInMemoryCacheStore.Cfg();
cfg.setStore("Store-" + storeCounter.getAndIncrement());
Modified:
core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -33,6 +33,8 @@
public void testForceSharedComponents() throws NamedCacheNotFoundException {
Configuration defaultCfg = new Configuration();
defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ defaultCfg.setFetchInMemoryState(false);
+ defaultCfg.setFetchInMemoryState(false);
// cache manager with default configuration
cm = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(),
@@ -61,6 +63,7 @@
ec.setAlgorithmConfig(new FIFOAlgorithmConfig());
Configuration defaultCfg = new Configuration();
+ defaultCfg.setFetchInMemoryState(false);
defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
defaultCfg.setEvictionConfig(ec);
// cache manager with default configuration
Modified:
core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -29,6 +29,7 @@
cm2 = TestingUtil.createClusteredCacheManager();
Configuration c = new Configuration();
c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ c.setFetchInMemoryState(false);
cm1.defineCache("cache", c);
cm2.defineCache("cache", c);
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
(rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,64 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Address implements Serializable {
+ private static final long serialVersionUID = 5943073369866339615L;
+
+ String street = null;
+ String city = "San Jose";
+ int zip = 0;
+
+ public String getStreet() {
+ return street;
+ }
+
+ public void setStreet(String street) {
+ this.street = street;
+ }
+
+ public String getCity() {
+ return city;
+ }
+
+ public void setCity(String city) {
+ this.city = city;
+ }
+
+ public int getZip() {
+ return zip;
+ }
+
+ public void setZip(int zip) {
+ this.zip = zip;
+ }
+
+ public String toString() {
+ return "street=" + getStreet() + ", city=" + getCity() +
", zip=" + getZip();
+ }
+
+// public Object writeReplace() {
+// return this;
+// }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Address address = (Address) o;
+
+ if (zip != address.zip) return false;
+ if (city != null ? !city.equals(address.city) : address.city != null) return
false;
+ if (street != null ? !street.equals(address.street) : address.street != null)
return false;
+
+ return true;
+ }
+
+ public int hashCode() {
+ int result;
+ result = (street != null ? street.hashCode() : 0);
+ result = 29 * result + (city != null ? city.hashCode() : 0);
+ result = 29 * result + zip;
+ return result;
+ }
+}
Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
(rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,56 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Person implements Serializable {
+
+ private static final long serialVersionUID = -885384294556845285L;
+
+ String name = null;
+ Address address;
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public void setName(Object obj) {
+ this.name = (String) obj;
+ }
+
+ public Address getAddress() {
+ return address;
+ }
+
+ public void setAddress(Address address) {
+ this.address = address;
+ }
+
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("name=").append(getName()).append(" Address=
").append(address);
+ return sb.toString();
+ }
+
+ public boolean equals(Object o) {
+ if (this == o) return true;
+ if (o == null || getClass() != o.getClass()) return false;
+
+ final Person person = (Person) o;
+
+ if (address != null ? !address.equals(person.address) : person.address != null)
return false;
+ if (name != null ? !name.equals(person.name) : person.name != null) return false;
+
+ return true;
+ }
+
+ public int hashCode() {
+ int result;
+ result = (name != null ? name.hashCode() : 0);
+ result = 29 * result + (address != null ? address.hashCode() : 0);
+ return result;
+ }
+}
Added:
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
(rev 0)
+++
core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -0,0 +1,287 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.config.Configuration;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.MultipleCacheManagersTest;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+@Test(groups = "functional", testName =
"statetransfer.StateTransferFunctionalTest", enabled = false)
+public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
+
+ protected static final String ADDRESS_CLASSNAME = Address.class.getName();
+ protected static final String PERSON_CLASSNAME = Person.class.getName();
+ public static final String A_B_NAME = "a_b_name";
+ public static final String A_C_NAME = "a_c_name";
+ public static final String A_D_NAME = "a_d_age";
+ public static final String A_B_AGE = "a_b_age";
+ public static final String A_C_AGE = "a_c_age";
+ public static final String A_D_AGE = "a_d_age";
+ public static final String JOE = "JOE";
+ public static final String BOB = "BOB";
+ public static final String JANE = "JANE";
+ public static final Integer TWENTY = 20;
+ public static final Integer FORTY = 40;
+
+ Configuration config;
+ private static final String cacheName = "nbst";
+
+ private volatile int testCount = 0;
+
+ private static final Log log = LogFactory.getLog(StateTransferFunctionalTest.class);
+
+ public StateTransferFunctionalTest() {
+ cleanup = CleanupPhase.AFTER_METHOD;
+ }
+
+ protected void createCacheManagers() throws Throwable {
+ // This impl only really sets up a configuration for use later.
+ config = new Configuration();
+ config.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+ config.setSyncCommitPhase(true);
+ config.setSyncReplTimeout(30000);
+ config.setFetchInMemoryState(true);
+
config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+ }
+
+ private CacheManager createCacheManager() {
+ CacheManager cm = addClusterEnabledCacheManager();
+ cm.defineCache(cacheName, config);
+ return cm;
+ }
+
+ public static class DelayTransfer implements Serializable {
+ private transient int count;
+
+ private void readObject(ObjectInputStream in) throws IOException,
ClassNotFoundException {
+ in.defaultReadObject();
+ }
+
+ private void writeObject(ObjectOutputStream out) throws IOException {
+ out.defaultWriteObject();
+
+ // RPC is first serialization, ST is second
+ if (count++ == 0)
+ return;
+
+ try {
+ // This sleep is not required for the test to function,
+ // however it improves the possibility of finding errors
+ // (since it keeps the tx log going)
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e) {
+ }
+ }
+
+ }
+
+ private static class WritingRunner implements Runnable {
+ private final Cache<Object, Object> cache;
+ private final boolean tx;
+ private volatile boolean stop;
+ private volatile int result;
+ private TransactionManager tm;
+
+ WritingRunner(Cache<Object, Object> cache, boolean tx) {
+ this.cache = cache;
+ this.tx = tx;
+ if (tx) tm = TestingUtil.getTransactionManager(cache);
+ }
+
+ public int result() {
+ return result;
+ }
+
+ public void run() {
+ int c = 0;
+ while (!stop) {
+ try {
+ if (tx) tm.begin();
+ cache.put("test" + c, c++);
+ if (tx) tm.commit();
+ }
+ catch (Exception e) {
+ e.printStackTrace();
+ log.error(e);
+ }
+ }
+ result = c;
+ }
+
+ public void stop() {
+ stop = true;
+ }
+ }
+
+ public void testInitialStateTransfer() throws Exception {
+ testCount++;
+ log.info("testInitialStateTransfer start - " + testCount);
+ Cache<Object, Object> cache1, cache2;
+ cache1 = createCacheManager().getCache(cacheName);
+ writeInitialData(cache1);
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ verifyInitialData(cache2);
+ log.info("testInitialStateTransfer end - " + testCount);
+ }
+
+ public void testConcurrentStateTransfer() throws Exception {
+ testCount++;
+ log.info("testConcurrentStateTransfer start - " + testCount);
+ Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 =
null;
+ cache1 = createCacheManager().getCache(cacheName);
+ writeInitialData(cache1);
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ cache1.put("delay", new DelayTransfer());
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+ verifyInitialData(cache2);
+
+ final CacheManager cm3 = createCacheManager();
+ final CacheManager cm4 = createCacheManager();
+
+ Thread t1 = new Thread(new Runnable() {
+ public void run() {
+ cm3.getCache(cacheName);
+ }
+ });
+ t1.start();
+
+ Thread t2 = new Thread(new Runnable() {
+ public void run() {
+ cm4.getCache(cacheName);
+ }
+ });
+ t2.start();
+
+ t1.join();
+ t2.join();
+
+ cache3 = cm3.getCache(cacheName);
+ cache4 = cm4.getCache(cacheName);
+
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+ verifyInitialData(cache3);
+ verifyInitialData(cache4);
+ log.info("testConcurrentStateTransfer end - " + testCount);
+ }
+
+ public void testSTWithThirdWritingNonTxCache() throws Exception {
+ testCount++;
+ log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
+ thirdWritingCacheTest(false, "nbst1");
+ log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
+ }
+
+ public void testSTWithThirdWritingTxCache() throws Exception {
+ testCount++;
+ log.info("testSTWithThirdWritingTxCache start - " + testCount);
+ thirdWritingCacheTest(true, "nbst2");
+ log.info("testSTWithThirdWritingTxCache end - " + testCount);
+ }
+
+ public void testSTWithWritingNonTxThread() throws Exception {
+ testCount++;
+ log.info("testSTWithWritingNonTxThread start - " + testCount);
+ writingThreadTest(false, "nbst3");
+ log.info("testSTWithWritingNonTxThread end - " + testCount);
+ }
+
+ public void testSTWithWritingTxThread() throws Exception {
+ testCount++;
+ log.info("testSTWithWritingTxThread start - " + testCount);
+ writingThreadTest(true, "nbst4");
+ log.info("testSTWithWritingTxThread end - " + testCount);
+ }
+
+ private void thirdWritingCacheTest(boolean tx, String name) throws
InterruptedException {
+ Cache<Object, Object> cache1, cache2, cache3;
+ cache1 = createCacheManager().getCache(cacheName);
+ cache3 = createCacheManager().getCache(cacheName);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache3, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assert cache2.get("test" + c).equals(c);
+ }
+
+ private void verifyInitialData(Cache<Object, Object> c) {
+ assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " +
A_B_NAME;
+ assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " +
A_B_AGE;
+ assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " +
A_C_NAME;
+ assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " +
A_C_AGE;
+ }
+
+ private void writeInitialData(final Cache<Object, Object> c) {
+ c.put(A_B_NAME, JOE);
+ c.put(A_B_AGE, TWENTY);
+ c.put(A_C_NAME, BOB);
+ c.put(A_C_AGE, FORTY);
+ }
+
+ private void writingThreadTest(boolean tx, String name) throws InterruptedException {
+ Cache<Object, Object> cache1 = null, cache2 = null;
+ cache1 = createCacheManager().getCache(cacheName);
+
+ writeInitialData(cache1);
+
+ // Delay the transient copy, so that we get a more thorough log test
+ cache1.put("delay", new DelayTransfer());
+
+ WritingRunner writer = new WritingRunner(cache1, tx);
+ Thread writerThread = new Thread(writer);
+ writerThread.start();
+
+ cache2 = createCacheManager().getCache(cacheName);
+
+ // Pause to give caches time to see each other
+ TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+ writer.stop();
+ writerThread.join();
+
+ verifyInitialData(cache2);
+
+ int count = writer.result();
+
+ for (int c = 0; c < count; c++)
+ assert cache2.get("test" + c).equals(c);
+ }
+}
Modified:
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
===================================================================
---
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-25
12:06:48 UTC (rev 7784)
+++
core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java 2009-02-25
15:52:32 UTC (rev 7785)
@@ -178,6 +178,7 @@
Configuration configuration = new Configuration();
configuration.setSyncCommitPhase(true);
configuration.setSyncRollbackPhase(true);
+ configuration.setFetchInMemoryState(false);
return configuration;
}
Modified: core/branches/flat/src/test/resources/configs/named-cache-test.xml
===================================================================
--- core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-25 12:06:48
UTC (rev 7784)
+++ core/branches/flat/src/test/resources/configs/named-cache-test.xml 2009-02-25 15:52:32
UTC (rev 7785)
@@ -38,21 +38,21 @@
<namedCache name="syncRepl">
<clustering>
- <stateRetrieval fetchInMemoryState="true"
timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<sync replTimeout="15000"/>
</clustering>
</namedCache>
<namedCache name="asyncRepl">
<clustering>
- <stateRetrieval fetchInMemoryState="true"
timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<async useAsyncSerialization="false"/>
</clustering>
</namedCache>
<namedCache name="asyncReplQueue">
<clustering>
- <stateRetrieval fetchInMemoryState="true"
timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<async useReplQueue="true" replQueueInterval="1234"
replQueueMaxElements="100"/>
</clustering>
</namedCache>
@@ -60,7 +60,7 @@
<namedCache name="txSyncRepl">
<transaction/>
<clustering>
- <stateRetrieval fetchInMemoryState="true"
timeout="15000"/>
+ <stateRetrieval fetchInMemoryState="false" />
<sync replTimeout="15000"/>
</clustering>
</namedCache>
Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml 2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/resources/log4j.xml 2009-02-25 15:52:32 UTC (rev 7785)
@@ -45,13 +45,17 @@
<!-- ================ -->
<category name="org.horizon">
- <priority value="INFO"/>
+ <priority value="WARN"/>
</category>
<category name="org.horizon.profiling">
<priority value="WARN"/>
</category>
+ <category name="org.horizon.jmx">
+ <priority value="WARN"/>
+ </category>
+
<category name="org.horizon.factories">
<priority value="WARN"/>
</category>
@@ -62,8 +66,8 @@
<root>
<priority value="WARN"/>
- <!--<appender-ref ref="CONSOLE"/>-->
- <appender-ref ref="FILE"/>
+ <appender-ref ref="CONSOLE"/>
+ <!--<appender-ref ref="FILE"/>-->
</root>
</log4j:configuration>