[jbosscache-commits] JBoss Cache SVN: r7785 - in core/branches/flat/src: main/java/org/horizon/config and 17 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Feb 25 10:52:33 EST 2009


Author: manik.surtani at jboss.com
Date: 2009-02-25 10:52:32 -0500 (Wed, 25 Feb 2009)
New Revision: 7785

Added:
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
   core/branches/flat/src/test/java/org/horizon/statetransfer/
   core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
   core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
   core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
Removed:
   core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
Modified:
   core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
   core/branches/flat/src/main/java/org/horizon/config/Configuration.java
   core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
   core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
   core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
   core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
   core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
   core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
   core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
   core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
   core/branches/flat/src/main/java/org/horizon/util/Util.java
   core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
   core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
   core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
   core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
   core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
   core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
   core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
   core/branches/flat/src/test/resources/configs/named-cache-test.xml
   core/branches/flat/src/test/resources/log4j.xml
Log:
Initial state transfer impl

Modified: core/branches/flat/src/main/java/org/horizon/CacheDelegate.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/CacheDelegate.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -54,6 +54,7 @@
 import org.horizon.marshall.Marshaller;
 import org.horizon.notifications.cachelistener.CacheNotifier;
 import org.horizon.remoting.RPCManager;
+import org.horizon.statetransfer.StateTransferManager;
 
 import javax.transaction.Transaction;
 import javax.transaction.TransactionManager;
@@ -84,6 +85,8 @@
    private DataContainer dataContainer;
    private static final Log log = LogFactory.getLog(CacheDelegate.class);
    private CacheManager cacheManager;
+   // this is never used here but should be injected - this is a hack to make sure the StateTransferManager is properly constructed if needed.
+   private StateTransferManager stateTransferManager;
 
    public CacheDelegate(String name) {
       this.name = name;
@@ -101,7 +104,7 @@
                                   BatchContainer batchContainer,
                                   RPCManager rpcManager, DataContainer dataContainer,
                                   Marshaller marshaller,
-                                  CacheManager cacheManager) {
+                                  CacheManager cacheManager, StateTransferManager stateTransferManager) {
       this.invocationContextContainer = invocationContextContainer;
       this.commandsFactory = commandsFactory;
       this.invoker = interceptorChain;
@@ -115,6 +118,7 @@
       this.dataContainer = dataContainer;
       this.marshaller = marshaller;
       this.cacheManager = cacheManager;
+      this.stateTransferManager = stateTransferManager;
    }
 
    @SuppressWarnings("unchecked")

Modified: core/branches/flat/src/main/java/org/horizon/config/Configuration.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/config/Configuration.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/config/Configuration.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -98,6 +98,9 @@
          return this == REPL_SYNC || this == INVALIDATION_SYNC || this == LOCAL;
       }
 
+      public boolean isClustered() {
+         return this != LOCAL;
+      }
    }
 
    // ------------------------------------------------------------------------------------------------------------

Modified: core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/AbstractComponentRegistry.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -148,7 +148,6 @@
       s.add(TransactionManagerFactory.class);
       s.add(ReplicationQueueFactory.class);
       s.add(StateTransferManagerFactory.class);
-      s.add(StateTransferFactory.class);
       s.add(LockManagerFactory.class);
       s.add(DataContainerFactory.class);
       s.add(EvictionManagerFactory.class);

Modified: core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/ComponentRegistry.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -109,9 +109,13 @@
          globalComponents.start();
       }
       boolean needToNotify = state != ComponentStatus.RUNNING && state != ComponentStatus.INITIALIZING;
+
+      // set this up *before* starting the components since some components - specifically state transfer - needs to be
+      // able to locate this registry via the InboundInvocationHandler
+      globalComponents.registerNamedComponentRegistry(this, cacheName);
+
       super.start();
       if (needToNotify && state == ComponentStatus.RUNNING) {
-         globalComponents.registerNamedComponentRegistry(this, cacheName);
          cacheManagerNotifier.notifyCacheStarted(cacheName);
       }
    }

Deleted: core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/StateTransferFactory.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -1,48 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.horizon.factories;
-
-/**
- * Factory class able to create {@link StateTransferGenerator} and {@link StateTransferIntegrator} instances.
- * <p/>
- * Updated in 3.0.0 to extend ComponentFactory, etc.
- * <p/>
- *
- * @author <a href="brian.stansberry at jboss.com">Brian Stansberry</a>
- * @author Manik Surtani
- * @since 1.0
- */
-// TODO: Implement me
-//@DefaultFactoryFor(classes = {StateTransferGenerator.class, StateTransferIntegrator.class})
-public class StateTransferFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
-   public <T> T construct(Class<T> componentType) {
-      return null;
-//      if (componentType.equals(StateTransferIntegrator.class))
-//      {
-//         return componentType.cast(new DefaultStateTransferIntegrator());
-//      }
-//      else
-//      {
-//         return componentType.cast(new DefaultStateTransferGenerator());
-//      }
-   }
-}

Modified: core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/factories/StateTransferManagerFactory.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -22,7 +22,7 @@
 package org.horizon.factories;
 
 import org.horizon.factories.annotations.DefaultFactoryFor;
-import org.horizon.statetransfer.DefaultStateTransferManager;
+import org.horizon.statetransfer.StateTransferManagerImpl;
 import org.horizon.statetransfer.StateTransferManager;
 
 /**
@@ -32,8 +32,11 @@
  * @since 1.0
  */
 @DefaultFactoryFor(classes = StateTransferManager.class)
-public class StateTransferManagerFactory extends AbstractComponentFactory implements AutoInstantiableFactory {
+public class StateTransferManagerFactory extends AbstractNamedCacheComponentFactory implements AutoInstantiableFactory {
    public <T> T construct(Class<T> componentType) {
-      return componentType.cast(new DefaultStateTransferManager());
+      if (configuration.getCacheMode().isClustered() && configuration.isFetchInMemoryState())
+         return componentType.cast(new StateTransferManagerImpl());
+      else
+         return null;
    }
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandler.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -3,10 +3,15 @@
 import org.horizon.commands.RPCCommand;
 import org.horizon.factories.scopes.Scope;
 import org.horizon.factories.scopes.Scopes;
+import org.horizon.statetransfer.StateTransferException;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
 /**
  * A globally scoped component, that is able to locate named caches and invoke remotely originating calls on the
- * appropriate cache.
+ * appropriate cache.  The primary goal of this component is to act as a bridge between the globally scoped {@link org.horizon.remoting.RPCManager}
+ * and named-cache scoped components.
  *
  * @author Manik Surtani
  * @since 1.0
@@ -19,6 +24,27 @@
     *
     * @param command command to invoke
     * @return results, if any, from the invocation
+    * @throws Throwable in the event of problems executing the command
     */
    Object handle(RPCCommand command) throws Throwable;
+
+   /**
+    * Applies state onto a named cache.  State to be read from the stream.  Implementations should NOT close the stream
+    * after use.
+    *
+    * @param cacheName name of cache to apply state
+    * @param i stream to read from
+    * @throws StateTransferException in the event of problems
+    */
+   void applyState(String cacheName, InputStream i) throws StateTransferException;
+
+   /**
+    * Generates state from a named cache.  State to be written to the stream.  Implementations should NOT close the stream
+    * after use.
+    *
+    * @param cacheName name of cache from which to generate state
+    * @param o stream to write state to
+    * @throws StateTransferException in the event of problems
+    */
+   void generateState(String cacheName, OutputStream o) throws StateTransferException;
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/InboundInvocationHandlerImpl.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -12,7 +12,12 @@
 import org.horizon.invocation.InvocationContextContainer;
 import org.horizon.logging.Log;
 import org.horizon.logging.LogFactory;
+import org.horizon.statetransfer.StateTransferException;
+import org.horizon.statetransfer.StateTransferManager;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
 /**
  * Sets the cache interceptor chain on an RPCCommand before calling it to perform
  *
@@ -34,9 +39,15 @@
       String cacheName = cmd.getCacheName();
       ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
       if (cr == null) {
-         log.info("Cache named {0} does not exist on this cache manager!", cacheName);
+         if (log.isInfoEnabled()) log.info("Cache named {0} does not exist on this cache manager!", cacheName);
          return null;
       }
+
+      if (!cr.getStatus().allowInvocations()) {
+         if (log.isInfoEnabled()) log.info("Cache named {0} exists but isn't in a state to handle invocations.  Its state is {1}", cacheName, cr.getStatus());
+         return null;
+      }
+
       InterceptorChain ic = cr.getComponent(InterceptorChain.class);
       InvocationContextContainer icc = cr.getComponent(InvocationContextContainer.class);
       CommandsFactory commandsFactory = cr.getComponent(CommandsFactory.class);
@@ -46,4 +57,23 @@
       commandsFactory.initializeReplicableCommand(cmd);
       return cmd.perform(icc.get());
    }
+
+   public void applyState(String cacheName, InputStream i) throws StateTransferException {
+      getStateTransferManager(cacheName).applyState(i);
+   }
+
+   public void generateState(String cacheName, OutputStream o) throws StateTransferException {
+      getStateTransferManager(cacheName).generateState(o);
+   }
+
+   private StateTransferManager getStateTransferManager(String cacheName) throws StateTransferException {
+      ComponentRegistry cr = gcr.getNamedComponentRegistry(cacheName);
+      if (cr == null) {
+         String msg = "Cache named "+cacheName+" does not exist on this cache manager!";
+         log.info(msg);
+         throw new StateTransferException(msg);
+      }
+
+      return cr.getComponent(StateTransferManager.class);
+   }
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManager.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -27,6 +27,7 @@
 import org.horizon.factories.scopes.Scopes;
 import org.horizon.lifecycle.Lifecycle;
 import org.horizon.remoting.transport.Address;
+import org.horizon.statetransfer.StateTransferException;
 
 import java.util.List;
 
@@ -111,4 +112,14 @@
     * @return a list of members.  Typically, this would be defensively copied.
     */
    List<Address> getMembers();
+
+   /**
+    * Initiates a state retrieval process from neighbouring caches.  This method will block until it either times out,
+    * or state is retrieved and applied.
+    *
+    * @param cacheName name of cache requesting state
+    * @param timeout length of time to try to retrieve state on each peer
+    * @throws org.horizon.statetransfer.StateTransferException in the event of problems
+    */
+   void retrieveState(String cacheName, long timeout) throws StateTransferException;
 }
\ No newline at end of file

Modified: core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/RPCManagerImpl.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -10,10 +10,13 @@
 import org.horizon.jmx.annotations.MBean;
 import org.horizon.jmx.annotations.ManagedAttribute;
 import org.horizon.jmx.annotations.ManagedOperation;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
 import org.horizon.marshall.Marshaller;
 import org.horizon.notifications.cachemanagerlistener.CacheManagerNotifier;
 import org.horizon.remoting.transport.Address;
 import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
 
 import java.text.NumberFormat;
 import java.util.List;
@@ -34,6 +37,7 @@
    private final AtomicLong replicationCount = new AtomicLong(0);
    private final AtomicLong replicationFailures = new AtomicLong(0);
    boolean statisticsEnabled = false; // by default, don't gather statistics.
+   private static final Log log = LogFactory.getLog(RPCManagerImpl.class);
 
    @Inject
    public void injectDependencies(GlobalConfiguration globalConfiguration, Transport t, InboundInvocationHandler handler,
@@ -44,7 +48,7 @@
       this.t.initialize(globalConfiguration, globalConfiguration.getTransportProperties(), marshaller, e, handler, notifier);
    }
 
-   @Start
+   @Start(priority = 10)
    public void start() {
       t.start();
    }
@@ -82,6 +86,47 @@
       return t.getMembers();
    }
 
+   public void retrieveState(String cacheName, long timeout) throws StateTransferException {
+      List<Address> members = getMembers();
+      if (members.size() < 2) {
+         if (log.isDebugEnabled())
+            log.debug("We're the only member in the cluster; no one to retrieve state from. Not doing anything!");
+         return;
+      }
+
+      boolean success = false;
+      outer:
+      for (int i = 0, wait = 1000; i < 5; i++) {
+         for (Address member : members) {
+            if (!member.equals(getAddress())) {
+               try {
+                  if (log.isInfoEnabled()) log.info("Trying to fetch state from {0}", member);
+                  if (t.retrieveState(cacheName, member, timeout)) {
+                     if (log.isInfoEnabled()) log.info("Successfully retrieved and applied state from {0}", member);
+                     success = true;
+                     break outer;
+                  }
+               } catch (StateTransferException e) {
+                  if (log.isDebugEnabled()) log.debug("Error while fetching state from member " + member, e);
+               }
+            }
+
+            if (!success) {
+               if (log.isWarnEnabled()) log.warn("Could not find available peer for state, backing off and retrying");
+
+               try {
+                  Thread.sleep(wait <<= 2);
+               }
+               catch (InterruptedException e) {
+                  Thread.currentThread().interrupt();
+               }
+            }
+         }
+      }
+
+      if (!success) throw new StateTransferException("Unable to fetch state on startup");
+   }
+
    // -------------------------------------------- JMX information -----------------------------------------------
 
    @ManagedOperation

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/Transport.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -11,6 +11,7 @@
 import org.horizon.remoting.InboundInvocationHandler;
 import org.horizon.remoting.ResponseFilter;
 import org.horizon.remoting.ResponseMode;
+import org.horizon.statetransfer.StateTransferException;
 
 import java.util.List;
 import java.util.Properties;
@@ -78,4 +79,16 @@
     * @return a list of members.  Typically, this would be defensively copied.
     */
    List<Address> getMembers();
+
+   /**
+    * Initiates a state retrieval from a specific cache (by typically invoking {@link org.horizon.remoting.InboundInvocationHandler#generateState(String, java.io.OutputStream)}),
+    * and applies this state to the current cache via the  {@link InboundInvocationHandler#applyState(String, java.io.InputStream)} callback.
+    *
+    * @param cacheName name of cache for which to retrieve state
+    * @param address address of remote cache from which to retrieve state
+    * @param timeout state retrieval timeout in milliseconds 
+    * @throws org.horizon.statetransfer.StateTransferException if state cannot be retrieved from the specific cache
+    * @return true if state was transferred and applied successfully, false if it timed out.
+    */
+   boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException;
 }

Modified: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/JGroupsTransport.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -15,24 +15,30 @@
 import org.horizon.remoting.ResponseMode;
 import org.horizon.remoting.transport.Address;
 import org.horizon.remoting.transport.Transport;
+import org.horizon.statetransfer.StateTransferException;
 import org.horizon.util.FileLookup;
+import org.horizon.util.Util;
 import org.jgroups.Channel;
 import org.jgroups.ChannelException;
+import org.jgroups.ExtendedMessageListener;
 import org.jgroups.JChannel;
 import org.jgroups.MembershipListener;
 import org.jgroups.Message;
-import org.jgroups.MessageListener;
 import org.jgroups.View;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.util.Rsp;
 import org.jgroups.util.RspList;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Properties;
 import java.util.Vector;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
 
 /**
@@ -41,7 +47,7 @@
  * @author Manik Surtani
  * @since 1.0
  */
-public class JGroupsTransport implements Transport, MembershipListener, MessageListener {
+public class JGroupsTransport implements Transport, MembershipListener, ExtendedMessageListener {
    public static final String CONFIGURATION_STRING = "configurationString";
    public static final String CONFIGURATION_XML = "configurationXml";
    public static final String CONFIGURATION_FILE = "configurationFile";
@@ -61,7 +67,15 @@
    Marshaller marshaller;
    ExecutorService asyncExecutor;
    CacheManagerNotifier notifier;
+   final ConcurrentMap<String, StateTransferMonitor> stateTransfersInProgress = new ConcurrentHashMap<String, StateTransferMonitor>();
 
+   /**
+    * Reference to an exception that was raised during state installation on this node.
+    */
+   protected volatile Exception setStateException;
+   private final Object stateLock = new Object();
+
+
    // ------------------------------------------------------------------------------------------------------------------
    // Lifecycle and setup stuff
    // ------------------------------------------------------------------------------------------------------------------
@@ -204,6 +218,27 @@
       return members;
    }
 
+   public boolean retrieveState(String cacheName, Address address, long timeout) throws StateTransferException {
+      boolean cleanup = false;
+      try {
+         StateTransferMonitor mon = new StateTransferMonitor();
+         if (stateTransfersInProgress.putIfAbsent(cacheName, mon) != null)
+            throw new StateTransferException("There already appears to be a state transfer in progress for the cache named " + cacheName);
+
+         cleanup = true;
+         ((JChannel) channel).getState(toJGroupsAddress(address), cacheName, timeout, false);
+         mon.waitForState();
+         return true;
+      } catch (StateTransferException ste) {
+         throw ste;
+      } catch (Exception e) {
+         if (log.isInfoEnabled()) log.info("Unable to retrieve state from member " + address, e);
+         return false;
+      } finally {
+         if (cleanup) stateTransfersInProgress.remove(cacheName);
+      }
+   }
+
    public Address getAddress() {
       if (address == null) {
          address = new JGroupsAddress(channel.getLocalAddress());
@@ -325,8 +360,7 @@
    }
 
    public void block() {
-      // TODO: Do we need these for state transfer?
-      // a no-op for now
+      // a no-op
    }
 
    public void receive(Message msg) {
@@ -334,17 +368,55 @@
    }
 
    public byte[] getState() {
-      // TODO: Do we need these for state transfer?
-      // a no-op for now
-      return null;
+      throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
    }
 
    public void setState(byte[] state) {
-      // TODO: Do we need these for state transfer?
-      // a no-op for now
+      throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
    }
 
+   public byte[] getState(String state_id) {
+      throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+   }
 
+   public void setState(String state_id, byte[] state) {
+      throw new UnsupportedOperationException("Non-stream-based state retrieval is not supported!  Make sure you use the JGroups STREAMING_STATE_TRANSFER protocol!");
+   }
+
+   public void getState(OutputStream ostream) {
+      throw new UnsupportedOperationException("Retrieving state for the entire cache system is not supported!");
+   }
+
+   public void getState(String cacheName, OutputStream ostream) {
+      if (trace) log.trace("Received request to generate state for cache {0}.  Attempting to generate state.", cacheName);
+      try {
+         inboundInvocationHandler.generateState(cacheName, ostream);
+      } catch (StateTransferException e) {
+         log.error("Caught while responding to state transfer request", e);
+      } finally {
+         Util.closeStream(ostream);
+      }
+   }
+
+   public void setState(InputStream istream) {
+      throw new UnsupportedOperationException("Setting state for the entire cache system is not supported!");
+   }
+
+   public void setState(String cacheName, InputStream istream) {
+      if (trace) log.trace("Received state for cache {0}.  Attempting to apply state.", cacheName);
+      StateTransferMonitor mon = stateTransfersInProgress.get(cacheName);
+      try {
+         inboundInvocationHandler.applyState(cacheName, istream);
+         mon.notifyStateReceiptSucceeded();
+      } catch (StateTransferException e) {
+         log.error("Failed setting state", e);
+         mon.notifyStateReceiptFailed(e);
+      } finally {
+         Util.closeStream(istream);
+      }
+   }
+
+
    // ------------------------------------------------------------------------------------------------------------------
    // Helpers to convert between Address types
    // ------------------------------------------------------------------------------------------------------------------
@@ -361,6 +433,10 @@
       return retval;
    }
 
+   private org.jgroups.Address toJGroupsAddress(Address a) {
+      return ((JGroupsAddress) a).address;
+   }
+
    private List<Address> fromJGroupsAddressList(List<org.jgroups.Address> list) {
       if (list == null || list.isEmpty()) return Collections.emptyList();
 

Added: core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/remoting/transport/jgroups/StateTransferMonitor.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,55 @@
+package org.horizon.remoting.transport.jgroups;
+
+import org.horizon.statetransfer.StateTransferException;
+
+public class StateTransferMonitor {
+   /**
+    * Reference to an exception that was raised during state installation on this cache.
+    */
+   protected volatile StateTransferException setStateException;
+   private final Object stateLock = new Object();
+   /**
+    * True if state was initialized during start-up.
+    */
+   private volatile boolean isStateSet = false;
+
+   public boolean isStateSet() {
+      return isStateSet;
+   }
+
+   public void setStateSet(boolean stateSet) {
+      isStateSet = stateSet;
+   }
+
+   public StateTransferException getSetStateException() {
+      return setStateException;
+   }
+
+   public void waitForState() throws Exception {
+      synchronized (stateLock) {
+         while (!isStateSet) {
+            if (setStateException != null) {
+               throw setStateException;
+            }
+
+            try {
+               stateLock.wait();
+            }
+            catch (InterruptedException iex) {
+            }
+         }
+      }
+   }
+
+   public void notifyStateReceiptSucceeded() {
+      synchronized (stateLock) {
+         // Notify wait that state has been set.
+         stateLock.notifyAll();
+      }
+   }
+
+   public void notifyStateReceiptFailed(StateTransferException setStateException) {
+      this.setStateException = setStateException;
+      notifyStateReceiptSucceeded();
+   }
+}

Deleted: core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -1,52 +0,0 @@
-/*
- * JBoss, Home of Professional Open Source.
- * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
- * as indicated by the @author tags. See the copyright.txt file in the
- * distribution for a full listing of individual contributors.
- *
- * This is free software; you can redistribute it and/or modify it
- * under the terms of the GNU Lesser General Public License as
- * published by the Free Software Foundation; either version 2.1 of
- * the License, or (at your option) any later version.
- *
- * This software is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this software; if not, write to the Free
- * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
- * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
-package org.horizon.statetransfer;
-
-import org.horizon.factories.annotations.Inject;
-import org.horizon.factories.annotations.Start;
-
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
-
-public class DefaultStateTransferManager implements StateTransferManager {
-
-   @Inject
-   public void injectDependencies() {
-   }
-
-   @Start(priority = 14)
-   public void start() {
-   }
-
-   public void getState(ObjectOutputStream out, Object o, long timeout, boolean force, boolean suppressErrors) throws Exception {
-      throw new UnsupportedOperationException("Implement me properly!");
-   }
-
-   public void setState(ObjectInputStream in, Object o) throws Exception {
-      throw new UnsupportedOperationException("fix me!");
-   }
-
-   // TODO: implement me
-   protected void setState()
-   {
-   }
-}

Added: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferException.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,24 @@
+package org.horizon.statetransfer;
+
+/**
+ * An exception to denote problems in transferring state between cache instances in a cluster
+ *
+ * @author Manik Surtani
+ * @since 1.0
+ */
+public class StateTransferException extends Exception {
+   public StateTransferException() {
+   }
+
+   public StateTransferException(String message) {
+      super(message);
+   }
+
+   public StateTransferException(String message, Throwable cause) {
+      super(message, cause);
+   }
+
+   public StateTransferException(Throwable cause) {
+      super(cause);
+   }
+}

Modified: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManager.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -21,12 +21,19 @@
  */
 package org.horizon.statetransfer;
 
-import java.io.ObjectInputStream;
-import java.io.ObjectOutputStream;
+import org.horizon.factories.scopes.Scope;
+import org.horizon.factories.scopes.Scopes;
 
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * Handles generation and application of state on the cache
+ */
+ at Scope(Scopes.NAMED_CACHE)
 public interface StateTransferManager {
 
-   void getState(ObjectOutputStream out, Object o, long timeout, boolean force, boolean suppressErrors) throws Exception;
+   void generateState(OutputStream out) throws StateTransferException;
 
-   void setState(ObjectInputStream in, Object o) throws Exception;
+   void applyState(InputStream in) throws StateTransferException;
 }

Copied: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java (from rev 7782, core/branches/flat/src/main/java/org/horizon/statetransfer/DefaultStateTransferManager.java)
===================================================================
--- core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,145 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.config.Configuration;
+import org.horizon.factories.annotations.Inject;
+import org.horizon.factories.annotations.Start;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.remoting.RPCManager;
+import org.horizon.util.Util;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.OutputStream;
+import java.io.Serializable;
+
+public class StateTransferManagerImpl implements StateTransferManager {
+
+   RPCManager rpcManager;
+   Cache cache;
+   Configuration configuration;
+   private static final Log log = LogFactory.getLog(StateTransferManagerImpl.class);
+   private static final Delimiter DELIMITER = new Delimiter();
+
+   @Inject
+   public void injectDependencies(RPCManager rpcManager, Cache cache, Configuration configuration) {
+      this.rpcManager = rpcManager;
+      this.cache = cache;
+      this.configuration = configuration;
+   }
+
+   @Start(priority = 14)
+   // it is imperative that this starts *after* the RPCManager does.
+   public void start() throws StateTransferException {
+      long startTime = 0;
+      if (log.isDebugEnabled()) {
+         log.debug("Initiating state transfer process");
+         startTime = System.currentTimeMillis();
+      }
+
+      rpcManager.retrieveState(cache.getName(), configuration.getStateRetrievalTimeout());
+
+      if (log.isDebugEnabled()) {
+         long duration = System.currentTimeMillis() - startTime;
+         log.debug("State transfer process completed in {0}", Util.prettyPrintTime(duration));
+      }
+   }
+
+   public void generateState(OutputStream out) throws StateTransferException {
+      if (log.isDebugEnabled()) log.debug("Generating state");
+
+      try {
+         ObjectOutputStream oos = new ObjectOutputStream(out);
+         delimit(oos);
+         generateInMemoryState(oos);
+         delimit(oos);
+         generatePersistentState(oos);
+         delimit(oos);
+         oos.flush();
+         oos.close();
+         // just close the object stream but do NOT close the underlying stream
+      } catch (StateTransferException ste) {
+         throw ste;
+      } catch (Exception e) {
+         throw new StateTransferException(e);
+      }
+   }
+
+   public void applyState(InputStream in) throws StateTransferException {
+      if (log.isDebugEnabled()) log.debug("Applying state");
+
+      try {
+         ObjectInputStream ois = new ObjectInputStream(in);
+         assertDelimited(ois);
+         applyInMemoryState(ois);
+         assertDelimited(ois);
+         applyPersistentState(ois);
+         assertDelimited(ois);
+         ois.close();
+         // just close the object stream but do NOT close the underlying stream
+      } catch (StateTransferException ste) {
+         throw ste;
+      } catch (Exception e) {
+         throw new StateTransferException(e);
+      }
+   }
+
+   private void applyInMemoryState(ObjectInputStream i) throws StateTransferException {
+      throw new StateTransferException("Implement me!");
+   }
+
+   private void generateInMemoryState(ObjectOutputStream o) throws StateTransferException {
+      throw new StateTransferException("Implement me!");
+   }
+
+   private void applyPersistentState(ObjectInputStream i) throws StateTransferException {
+      throw new StateTransferException("Implement me!");
+   }
+   
+   private void generatePersistentState(ObjectOutputStream o) throws StateTransferException {
+      throw new StateTransferException("Implement me!");
+   }
+
+   private void delimit(ObjectOutputStream o) throws IOException {
+      o.writeObject(DELIMITER);
+   }
+
+   private void assertDelimited(ObjectInputStream i) throws StateTransferException {
+      Object o;
+      try {
+         o = i.readObject();
+      } catch (Exception e) {
+         throw new StateTransferException(e);
+      }
+      if ((o == null) || !(o instanceof Delimiter)) throw new StateTransferException("Expected a delimiter, recieved " + o);
+   }
+
+   // used as a marker for streams.
+   private static final class Delimiter implements Serializable {
+
+   }
+}


Property changes on: core/branches/flat/src/main/java/org/horizon/statetransfer/StateTransferManagerImpl.java
___________________________________________________________________
Name: svn:keywords
   + Id Revision
Name: svn:eol-style
   + LF

Modified: core/branches/flat/src/main/java/org/horizon/util/Util.java
===================================================================
--- core/branches/flat/src/main/java/org/horizon/util/Util.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/main/java/org/horizon/util/Util.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -21,7 +21,10 @@
  */
 package org.horizon.util;
 
+import java.io.InputStream;
+import java.io.OutputStream;
 import java.lang.reflect.Method;
+import java.text.NumberFormat;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -126,4 +129,46 @@
          return "Added Entries " + addedEntries + " Removeed Entries " + removedEntries + " Modified Entries " + modifiedEntries;
       }
    }
+
+   /**
+    * Prints a time for display
+    *
+    * @param millis time in millis
+    * @return the time, represented as millis, seconds, minutes or hours as appropriate, with suffix
+    */
+   public static String prettyPrintTime(long millis) {
+      if (millis < 1000) return millis + " milliseconds";
+      NumberFormat nf = NumberFormat.getNumberInstance();
+      nf.setMaximumFractionDigits(2);
+      double toPrint = ((double) millis) / 1000;
+      if (toPrint < 300) {
+         return nf.format(toPrint) + " seconds";
+      }
+
+      toPrint = toPrint / 60;
+
+      if (toPrint < 120) {
+         return nf.format(toPrint) + " minutes";
+      }
+
+      toPrint = toPrint / 60;
+
+      return nf.format(toPrint) + " hours";
+   }
+
+   public static void closeStream(InputStream i) {
+      try {
+         if (i != null) i.close();
+      } catch (Exception e) {
+
+      }
+   }
+
+   public static void closeStream(OutputStream o) {
+       try {
+         if (o != null) o.close();
+      } catch (Exception e) {
+
+      }
+   }
 }

Modified: core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/api/CacheClusterJoinTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -22,6 +22,7 @@
       cm1 = TestingUtil.createClusteredCacheManager();
       cfg = new Configuration();
       cfg.setCacheMode(CacheMode.REPL_SYNC);
+      cfg.setFetchInMemoryState(false);
       cm1.defineCache("cache", cfg);
    }
 

Modified: core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/api/tree/NodeReplicatedMoveTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -31,12 +31,10 @@
    TransactionManager tm1;
 
    protected void createCacheManagers() throws Throwable {
-      Configuration c = new Configuration();
+      Configuration c = getDefaultConfig();
       c.setInvocationBatchingEnabled(true);
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       c.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
-      c.setSyncCommitPhase(true);
-      c.setSyncRollbackPhase(true);
 
       createClusteredCaches(2, "replSync", c);
 

Modified: core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/config/parsing/XmlFileParsingTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -52,8 +52,7 @@
       c = namedCaches.get("syncRepl");
 
       assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getSyncReplTimeout() == 15000;
 
       c = namedCaches.get("asyncRepl");
@@ -61,23 +60,20 @@
       assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       assert !c.isUseReplQueue();
       assert !c.isUseAsyncSerialization();
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
 
       c = namedCaches.get("asyncReplQueue");
 
       assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       assert c.isUseReplQueue();
       assert c.isUseAsyncSerialization();
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
 
       c = namedCaches.get("txSyncRepl");
 
       assert c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
       assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getSyncReplTimeout() == 15000;
 
       c = namedCaches.get("overriding");
@@ -112,8 +108,7 @@
 
       assert c.getTransactionManagerLookupClass() == null;
       assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getSyncReplTimeout() == 15000;
       assert c.getLockAcquisitionTimeout() == 1000;
       assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
@@ -126,8 +121,7 @@
       assert c.getCacheMode() == Configuration.CacheMode.REPL_ASYNC;
       assert !c.isUseReplQueue();
       assert !c.isUseAsyncSerialization();
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getLockAcquisitionTimeout() == 1000;
       assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
       assert c.getConcurrencyLevel() == 100;
@@ -141,8 +135,7 @@
       assert c.getReplQueueInterval() == 1234;
       assert c.getReplQueueMaxElements() == 100;
       assert c.isUseAsyncSerialization();
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getLockAcquisitionTimeout() == 1000;
       assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;
       assert c.getConcurrencyLevel() == 100;
@@ -151,8 +144,7 @@
       c.applyOverrides(namedCaches.get("txSyncRepl"));
       assert c.getTransactionManagerLookupClass().equals("org.horizon.transaction.GenericTransactionManagerLookup");
       assert c.getCacheMode() == Configuration.CacheMode.REPL_SYNC;
-      assert c.isFetchInMemoryState();
-      assert c.getStateRetrievalTimeout() == 15000;
+      assert !c.isFetchInMemoryState();
       assert c.getSyncReplTimeout() == 15000;
       assert c.getLockAcquisitionTimeout() == 1000;
       assert c.getIsolationLevel() == IsolationLevel.READ_COMMITTED;

Modified: core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/loader/decorators/SingletonStoreTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -45,7 +45,7 @@
       cm1 = addClusterEnabledCacheManager();
       cm2 = addClusterEnabledCacheManager();
 
-      Configuration conf = new Configuration();
+      Configuration conf = getDefaultConfig();
       conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       DummyInMemoryCacheStore.Cfg cfg = new DummyInMemoryCacheStore.Cfg();
       cfg.setStore("Store-" + storeCounter.getAndIncrement());
@@ -64,7 +64,7 @@
       ((DummyInMemoryCacheStore.Cfg) conf.getCacheLoaderManagerConfig().getFirstCacheLoaderConfig()).setStore("Store-" + storeCounter.getAndIncrement());
       cm2.defineCache("pushing", conf);
 
-      conf = new Configuration();
+      conf = getDefaultConfig();
       conf.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       cfg = new DummyInMemoryCacheStore.Cfg();
       cfg.setStore("Store-" + storeCounter.getAndIncrement());

Modified: core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/manager/CacheManagerComponentRegistryTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -33,6 +33,8 @@
    public void testForceSharedComponents() throws NamedCacheNotFoundException {
       Configuration defaultCfg = new Configuration();
       defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      defaultCfg.setFetchInMemoryState(false);
+      defaultCfg.setFetchInMemoryState(false);
 
       // cache manager with default configuration
       cm = new DefaultCacheManager(GlobalConfiguration.getClusteredDefault(),
@@ -61,6 +63,7 @@
       ec.setAlgorithmConfig(new FIFOAlgorithmConfig());
 
       Configuration defaultCfg = new Configuration();
+      defaultCfg.setFetchInMemoryState(false);
       defaultCfg.setCacheMode(Configuration.CacheMode.REPL_SYNC);
       defaultCfg.setEvictionConfig(ec);
       // cache manager with default configuration

Modified: core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/notifications/cachemanagerlistener/CacheManagerNotifierTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -29,6 +29,7 @@
       cm2 = TestingUtil.createClusteredCacheManager();
       Configuration c = new Configuration();
       c.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      c.setFetchInMemoryState(false);
       cm1.defineCache("cache", c);
       cm2.defineCache("cache", c);
 

Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Address.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,64 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Address implements Serializable {
+   private static final long serialVersionUID = 5943073369866339615L;
+
+   String street = null;
+   String city = "San Jose";
+   int zip = 0;
+
+   public String getStreet() {
+      return street;
+   }
+
+   public void setStreet(String street) {
+      this.street = street;
+   }
+
+   public String getCity() {
+      return city;
+   }
+
+   public void setCity(String city) {
+      this.city = city;
+   }
+
+   public int getZip() {
+      return zip;
+   }
+
+   public void setZip(int zip) {
+      this.zip = zip;
+   }
+
+   public String toString() {
+      return "street=" + getStreet() + ", city=" + getCity() + ", zip=" + getZip();
+   }
+
+//    public Object writeReplace() {
+//	return this;
+//    }
+
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      final Address address = (Address) o;
+
+      if (zip != address.zip) return false;
+      if (city != null ? !city.equals(address.city) : address.city != null) return false;
+      if (street != null ? !street.equals(address.street) : address.street != null) return false;
+
+      return true;
+   }
+
+   public int hashCode() {
+      int result;
+      result = (street != null ? street.hashCode() : 0);
+      result = 29 * result + (city != null ? city.hashCode() : 0);
+      result = 29 * result + zip;
+      return result;
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/Person.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,56 @@
+package org.horizon.statetransfer;
+
+import java.io.Serializable;
+
+public class Person implements Serializable {
+
+   private static final long serialVersionUID = -885384294556845285L;
+
+   String name = null;
+   Address address;
+
+   public String getName() {
+      return name;
+   }
+
+   public void setName(String name) {
+      this.name = name;
+   }
+
+   public void setName(Object obj) {
+      this.name = (String) obj;
+   }
+
+   public Address getAddress() {
+      return address;
+   }
+
+   public void setAddress(Address address) {
+      this.address = address;
+   }
+
+   public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("name=").append(getName()).append(" Address= ").append(address);
+      return sb.toString();
+   }
+
+   public boolean equals(Object o) {
+      if (this == o) return true;
+      if (o == null || getClass() != o.getClass()) return false;
+
+      final Person person = (Person) o;
+
+      if (address != null ? !address.equals(person.address) : person.address != null) return false;
+      if (name != null ? !name.equals(person.name) : person.name != null) return false;
+
+      return true;
+   }
+
+   public int hashCode() {
+      int result;
+      result = (name != null ? name.hashCode() : 0);
+      result = 29 * result + (address != null ? address.hashCode() : 0);
+      return result;
+   }
+}

Added: core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java	                        (rev 0)
+++ core/branches/flat/src/test/java/org/horizon/statetransfer/StateTransferFunctionalTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -0,0 +1,287 @@
+package org.horizon.statetransfer;
+
+import org.horizon.Cache;
+import org.horizon.transaction.DummyTransactionManagerLookup;
+import org.horizon.config.Configuration;
+import org.horizon.logging.Log;
+import org.horizon.logging.LogFactory;
+import org.horizon.manager.CacheManager;
+import org.horizon.test.MultipleCacheManagersTest;
+import org.horizon.test.TestingUtil;
+import org.testng.annotations.Test;
+
+import javax.transaction.TransactionManager;
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
+import java.io.Serializable;
+
+ at Test(groups = "functional", testName = "statetransfer.StateTransferFunctionalTest", enabled = false)
+public class StateTransferFunctionalTest extends MultipleCacheManagersTest {
+
+   protected static final String ADDRESS_CLASSNAME = Address.class.getName();
+   protected static final String PERSON_CLASSNAME = Person.class.getName();
+   public static final String A_B_NAME = "a_b_name";
+   public static final String A_C_NAME = "a_c_name";
+   public static final String A_D_NAME = "a_d_age";
+   public static final String A_B_AGE = "a_b_age";
+   public static final String A_C_AGE = "a_c_age";
+   public static final String A_D_AGE = "a_d_age";
+   public static final String JOE = "JOE";
+   public static final String BOB = "BOB";
+   public static final String JANE = "JANE";
+   public static final Integer TWENTY = 20;
+   public static final Integer FORTY = 40;
+
+   Configuration config;
+   private static final String cacheName = "nbst";
+
+   private volatile int testCount = 0;
+
+   private static final Log log = LogFactory.getLog(StateTransferFunctionalTest.class);
+
+   public StateTransferFunctionalTest() {
+      cleanup = CleanupPhase.AFTER_METHOD;
+   }
+
+   protected void createCacheManagers() throws Throwable {
+      // This impl only really sets up a configuration for use later.
+      config = new Configuration();
+      config.setCacheMode(Configuration.CacheMode.REPL_SYNC);
+      config.setSyncCommitPhase(true);
+      config.setSyncReplTimeout(30000);
+      config.setFetchInMemoryState(true);
+      config.setTransactionManagerLookupClass(DummyTransactionManagerLookup.class.getName());
+   }
+
+   private CacheManager createCacheManager() {
+      CacheManager cm = addClusterEnabledCacheManager();
+      cm.defineCache(cacheName, config);
+      return cm;
+   }
+
+   public static class DelayTransfer implements Serializable {
+      private transient int count;
+
+      private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+         in.defaultReadObject();
+      }
+
+      private void writeObject(ObjectOutputStream out) throws IOException {
+         out.defaultWriteObject();
+
+         // RPC is first serialization, ST is second
+         if (count++ == 0)
+            return;
+
+         try {
+            // This sleep is not required for the test to function,
+            // however it improves the possibility of finding errors
+            // (since it keeps the tx log going)
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e) {
+         }
+      }
+
+   }
+
+   private static class WritingRunner implements Runnable {
+      private final Cache<Object, Object> cache;
+      private final boolean tx;
+      private volatile boolean stop;
+      private volatile int result;
+      private TransactionManager tm;
+
+      WritingRunner(Cache<Object, Object> cache, boolean tx) {
+         this.cache = cache;
+         this.tx = tx;
+         if (tx) tm = TestingUtil.getTransactionManager(cache);
+      }
+
+      public int result() {
+         return result;
+      }
+
+      public void run() {
+         int c = 0;
+         while (!stop) {
+            try {
+               if (tx) tm.begin();
+               cache.put("test" + c, c++);
+               if (tx) tm.commit();
+            }
+            catch (Exception e) {
+               e.printStackTrace();
+               log.error(e);
+            }
+         }
+         result = c;
+      }
+
+      public void stop() {
+         stop = true;
+      }
+   }
+
+   public void testInitialStateTransfer() throws Exception {
+      testCount++;
+      log.info("testInitialStateTransfer start - " + testCount);
+      Cache<Object, Object> cache1, cache2;
+      cache1 = createCacheManager().getCache(cacheName);
+      writeInitialData(cache1);
+
+      cache2 = createCacheManager().getCache(cacheName);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+      verifyInitialData(cache2);
+      log.info("testInitialStateTransfer end - " + testCount);
+   }
+
+   public void testConcurrentStateTransfer() throws Exception {
+      testCount++;
+      log.info("testConcurrentStateTransfer start - " + testCount);
+      Cache<Object, Object> cache1 = null, cache2 = null, cache3 = null, cache4 = null;
+      cache1 = createCacheManager().getCache(cacheName);
+      writeInitialData(cache1);
+
+      cache2 = createCacheManager().getCache(cacheName);
+
+      cache1.put("delay", new DelayTransfer());
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+      verifyInitialData(cache2);
+
+      final CacheManager cm3 = createCacheManager();
+      final CacheManager cm4 = createCacheManager();
+
+      Thread t1 = new Thread(new Runnable() {
+         public void run() {
+            cm3.getCache(cacheName);
+         }
+      });
+      t1.start();
+
+      Thread t2 = new Thread(new Runnable() {
+         public void run() {
+            cm4.getCache(cacheName);
+         }
+      });
+      t2.start();
+
+      t1.join();
+      t2.join();
+
+      cache3 = cm3.getCache(cacheName);
+      cache4 = cm4.getCache(cacheName);
+
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3, cache4);
+      verifyInitialData(cache3);
+      verifyInitialData(cache4);
+      log.info("testConcurrentStateTransfer end - " + testCount);
+   }
+
+   public void testSTWithThirdWritingNonTxCache() throws Exception {
+      testCount++;
+      log.info("testSTWithThirdWritingNonTxCache start - " + testCount);
+      thirdWritingCacheTest(false, "nbst1");
+      log.info("testSTWithThirdWritingNonTxCache end - " + testCount);
+   }
+
+   public void testSTWithThirdWritingTxCache() throws Exception {
+      testCount++;
+      log.info("testSTWithThirdWritingTxCache start - " + testCount);
+      thirdWritingCacheTest(true, "nbst2");
+      log.info("testSTWithThirdWritingTxCache end - " + testCount);
+   }
+
+   public void testSTWithWritingNonTxThread() throws Exception {
+      testCount++;
+      log.info("testSTWithWritingNonTxThread start - " + testCount);
+      writingThreadTest(false, "nbst3");
+      log.info("testSTWithWritingNonTxThread end - " + testCount);
+   }
+
+   public void testSTWithWritingTxThread() throws Exception {
+      testCount++;
+      log.info("testSTWithWritingTxThread start - " + testCount);
+      writingThreadTest(true, "nbst4");
+      log.info("testSTWithWritingTxThread end - " + testCount);
+   }
+
+   private void thirdWritingCacheTest(boolean tx, String name) throws InterruptedException {
+      Cache<Object, Object> cache1, cache2, cache3;
+      cache1 = createCacheManager().getCache(cacheName);
+      cache3 = createCacheManager().getCache(cacheName);
+
+      writeInitialData(cache1);
+
+      // Delay the transient copy, so that we get a more thorough log test
+      cache1.put("delay", new DelayTransfer());
+
+      WritingRunner writer = new WritingRunner(cache3, tx);
+      Thread writerThread = new Thread(writer);
+      writerThread.start();
+
+      cache2 = createCacheManager().getCache(cacheName);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2, cache3);
+
+      writer.stop();
+      writerThread.join();
+
+      verifyInitialData(cache2);
+
+      int count = writer.result();
+
+      for (int c = 0; c < count; c++)
+         assert cache2.get("test" + c).equals(c);
+   }
+
+   private void verifyInitialData(Cache<Object, Object> c) {
+      assert JOE.equals(c.get(A_B_NAME)) : "Incorrect value for key " + A_B_NAME;
+      assert TWENTY.equals(c.get(A_B_AGE)) : "Incorrect value for key " + A_B_AGE;
+      assert BOB.equals(c.get(A_C_NAME)) : "Incorrect value for key " + A_C_NAME;
+      assert FORTY.equals(c.get(A_C_AGE)) : "Incorrect value for key " + A_C_AGE;
+   }
+
+   private void writeInitialData(final Cache<Object, Object> c) {
+      c.put(A_B_NAME, JOE);
+      c.put(A_B_AGE, TWENTY);
+      c.put(A_C_NAME, BOB);
+      c.put(A_C_AGE, FORTY);
+   }
+
+   private void writingThreadTest(boolean tx, String name) throws InterruptedException {
+      Cache<Object, Object> cache1 = null, cache2 = null;
+      cache1 = createCacheManager().getCache(cacheName);
+
+      writeInitialData(cache1);
+
+      // Delay the transient copy, so that we get a more thorough log test
+      cache1.put("delay", new DelayTransfer());
+
+      WritingRunner writer = new WritingRunner(cache1, tx);
+      Thread writerThread = new Thread(writer);
+      writerThread.start();
+
+      cache2 = createCacheManager().getCache(cacheName);
+
+      // Pause to give caches time to see each other
+      TestingUtil.blockUntilViewsReceived(60000, cache1, cache2);
+
+      writer.stop();
+      writerThread.join();
+
+      verifyInitialData(cache2);
+
+      int count = writer.result();
+
+      for (int c = 0; c < count; c++)
+         assert cache2.get("test" + c).equals(c);
+   }
+}

Modified: core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java
===================================================================
--- core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/java/org/horizon/test/MultipleCacheManagersTest.java	2009-02-25 15:52:32 UTC (rev 7785)
@@ -178,6 +178,7 @@
       Configuration configuration = new Configuration();
       configuration.setSyncCommitPhase(true);
       configuration.setSyncRollbackPhase(true);
+      configuration.setFetchInMemoryState(false);
       return configuration;
    }
 

Modified: core/branches/flat/src/test/resources/configs/named-cache-test.xml
===================================================================
--- core/branches/flat/src/test/resources/configs/named-cache-test.xml	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/resources/configs/named-cache-test.xml	2009-02-25 15:52:32 UTC (rev 7785)
@@ -38,21 +38,21 @@
 
    <namedCache name="syncRepl">
       <clustering>
-         <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+         <stateRetrieval fetchInMemoryState="false" />
          <sync replTimeout="15000"/>
       </clustering>
    </namedCache>
 
    <namedCache name="asyncRepl">
       <clustering>
-         <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+         <stateRetrieval fetchInMemoryState="false" />
          <async useAsyncSerialization="false"/>
       </clustering>
    </namedCache>
 
    <namedCache name="asyncReplQueue">
       <clustering>
-         <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+         <stateRetrieval fetchInMemoryState="false" />
          <async useReplQueue="true" replQueueInterval="1234" replQueueMaxElements="100"/>
       </clustering>
    </namedCache>
@@ -60,7 +60,7 @@
    <namedCache name="txSyncRepl">
       <transaction/>
       <clustering>
-         <stateRetrieval fetchInMemoryState="true" timeout="15000"/>
+         <stateRetrieval fetchInMemoryState="false" />
          <sync replTimeout="15000"/>
       </clustering>
    </namedCache>

Modified: core/branches/flat/src/test/resources/log4j.xml
===================================================================
--- core/branches/flat/src/test/resources/log4j.xml	2009-02-25 12:06:48 UTC (rev 7784)
+++ core/branches/flat/src/test/resources/log4j.xml	2009-02-25 15:52:32 UTC (rev 7785)
@@ -45,13 +45,17 @@
    <!-- ================ -->
 
    <category name="org.horizon">
-      <priority value="INFO"/>
+      <priority value="WARN"/>
    </category>
 
    <category name="org.horizon.profiling">
       <priority value="WARN"/>
    </category>
 
+   <category name="org.horizon.jmx">
+      <priority value="WARN"/>
+   </category>
+
    <category name="org.horizon.factories">
       <priority value="WARN"/>
    </category>
@@ -62,8 +66,8 @@
 
    <root>
       <priority value="WARN"/>
-      <!--<appender-ref ref="CONSOLE"/>-->
-      <appender-ref ref="FILE"/>
+      <appender-ref ref="CONSOLE"/>
+      <!--<appender-ref ref="FILE"/>-->
    </root>
 
 </log4j:configuration>




More information about the jbosscache-commits mailing list