[jbosscache-commits] JBoss Cache SVN: r6446 - in core/trunk/src: main/java/org/jboss/cache/loader and 3 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jul 30 13:13:20 EDT 2008


Author: manik.surtani at jboss.com
Date: 2008-07-30 13:13:20 -0400 (Wed, 30 Jul 2008)
New Revision: 6446

Modified:
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
   core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
   core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
   core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
Log:
Fixed funky state transfer brittleness

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -51,6 +51,7 @@
 import java.io.ByteArrayInputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -297,7 +298,10 @@
          {
             // need to get the root DIRECTLY.  cache.getRoot() will pass a call up the interceptor chain and we will
             // have a problem with the cache not being started.
-            dataContainer.getRoot().addChildDirect(BUDDY_BACKUP_SUBTREE_FQN);
+
+            cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
+            cache.getInvocationContext().getOptionOverrides().setCacheModeLocal(true);
+            cache.put(BUDDY_BACKUP_SUBTREE_FQN, (Map) Collections.emptyMap());
          }
 
          // allow waiting threads to process.

Modified: core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/AbstractCacheLoader.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -85,6 +85,7 @@
       if (objectFromStream instanceof NodeDataMarker)
       {
          // no persistent state sent across; return?
+         if (trace) log.trace("Empty persistent stream?");
          return;
       }
       if (objectFromStream instanceof NodeDataExceptionMarker)
@@ -113,7 +114,11 @@
       for (Object aNodeData : nodeData)
       {
          NodeData nd = (NodeData) aNodeData;
-         if (nd.isMarker()) break;
+         if (nd.isMarker())
+         {
+            if (trace) log.trace("Reached delimiter; exiting loop");
+            break;
+         }
          Fqn fqn;
          if (moveToBuddy)
          {
@@ -123,7 +128,7 @@
          {
             fqn = nd.getFqn();
          }
-
+         if (trace) log.trace("Storing state in Fqn " + fqn);
          if (nd.getAttributes() != null)
          {
             this.put(fqn, nd.getAttributes(), true);// creates a node with 0 or more attributes

Modified: core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/loader/CacheLoaderManager.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -451,6 +451,7 @@
          {
             throw new CacheException("Unable to start cache loaders", e);
          }
+         fetchPersistentState |= loader.getConfig().isFetchPersistentState();
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -35,7 +35,8 @@
     */
    protected volatile Exception setStateException;
    private final Object stateLock = new Object();
-   private Log log = LogFactory.getLog(ChannelMessageListener.class);
+   private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+   private static final boolean trace = log.isTraceEnabled();
    private StateTransferManager stateTransferManager;
    private Configuration configuration;
    /**
@@ -43,6 +44,7 @@
     */
    private volatile boolean isStateSet = false;
 
+
    @Inject
    private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
    {
@@ -183,6 +185,7 @@
 
    public byte[] getState(String state_id)
    {
+      if (trace) log.trace("Getting state for state id " + state_id);
       MarshalledValueOutputStream out = null;
       String sourceRoot = state_id;
       byte[] result;
@@ -233,6 +236,7 @@
 
    public void getState(String state_id, OutputStream ostream)
    {
+      if (trace) log.trace("Getting state for state id " + state_id);
       String sourceRoot = state_id;
       MarshalledValueOutputStream out = null;
       boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
@@ -286,6 +290,7 @@
 
    public void setState(String state_id, byte[] state)
    {
+      if (trace) log.trace("Receiving state for " + state_id);
       if (state == null)
       {
          log.debug("partial transferred state is null");
@@ -333,7 +338,7 @@
 
    public void setState(String stateId, InputStream istream)
    {
-      if (log.isTraceEnabled()) log.trace("**** Receiving state for " + stateId);
+      if (trace) log.trace("Receiving state for " + stateId);
       String targetRoot = stateId;
       MarshalledValueInputStream in = null;
       boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferGenerator.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -43,7 +43,7 @@
       this.cache = cache;
    }
 
-   @Start(priority = 18)
+   @Start(priority = 14)
    private void start()
    {
       this.internalFqns = cache.getInternalFqns();

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -39,7 +39,8 @@
 public class DefaultStateTransferIntegrator implements StateTransferIntegrator
 {
 
-   protected Log log = LogFactory.getLog(getClass().getName());
+   private static final Log log = LogFactory.getLog(DefaultStateTransferIntegrator.class);
+   private static final boolean trace = log.isTraceEnabled();
 
    private CacheSPI cache;
 
@@ -47,6 +48,7 @@
 
    private Set<Fqn> internalFqns;
 
+
    @Inject
    public void inject(CacheSPI<?, ?> cache, NodeFactory nodefactory)
    {
@@ -60,14 +62,18 @@
       this.internalFqns = cache.getInternalFqns();
    }
 
-   public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot) throws Exception
+   public void integrateState(ObjectInputStream ois, Object target, Fqn targetRoot, boolean integratePersistentState) throws Exception
    {
       // pop version from the stream first!
       short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
       log.info("Using version " + version);
       integrateTransientState(ois, (InternalNode) target);
-      integrateAssociatedState(ois);
-      integratePersistentState(ois, targetRoot);
+      if (trace) log.trace("Reading marker for nonexistent associated state");
+      cache.getMarshaller().objectFromObjectStream(ois);
+      if (integratePersistentState)
+      {
+         integratePersistentState(ois, targetRoot);
+      }
    }
 
    protected void integrateTransientState(ObjectInputStream in, InternalNode target) throws Exception
@@ -75,7 +81,7 @@
       boolean transientSet = false;
       try
       {
-         if (log.isTraceEnabled())
+         if (trace)
          {
             log.trace("integrating transient state for " + target);
          }
@@ -84,7 +90,7 @@
 
          transientSet = true;
 
-         if (log.isTraceEnabled())
+         if (trace)
          {
             log.trace("transient state successfully integrated");
          }
@@ -105,26 +111,13 @@
       }
    }
 
-   /**
-    * Provided for subclasses that deal with associated state.
-    *
-    * @throws Exception
-    */
-   protected void integrateAssociatedState(ObjectInputStream in) throws Exception
-   {
-      // no-op in this base class 
-      // just read marker
-      cache.getMarshaller().objectFromObjectStream(in);
-   }
-
    protected void integratePersistentState(ObjectInputStream in, Fqn targetFqn) throws Exception
    {
-
       CacheLoaderManager loaderManager = cache.getCacheLoaderManager();
       CacheLoader loader = loaderManager == null ? null : loaderManager.getCacheLoader();
       if (loader == null)
       {
-         if (log.isTraceEnabled())
+         if (trace)
          {
             log.trace("cache loader is null, will not attempt to integrate persistent state");
          }
@@ -132,7 +125,7 @@
       else
       {
 
-         if (log.isTraceEnabled())
+         if (trace)
          {
             log.trace("integrating persistent state using " + loader.getClass().getName());
          }
@@ -165,7 +158,7 @@
             }
             else
             {
-               if (log.isTraceEnabled())
+               if (trace)
                {
                   log.trace("persistent state integrated successfully");
                }
@@ -234,7 +227,7 @@
          }
 
          // read marker off stack
-         cache.getMarshaller().objectFromObjectStream(in);
+//         cache.getMarshaller().objectFromObjectStream(in);
       }
    }
 

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferManager.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -12,7 +12,6 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.InternalNode;
-import org.jboss.cache.NodeFactory;
 import org.jboss.cache.NodeSPI;
 import org.jboss.cache.RegionEmptyException;
 import org.jboss.cache.RegionManager;
@@ -34,6 +33,7 @@
 public class DefaultStateTransferManager implements StateTransferManager
 {
    protected final static Log log = LogFactory.getLog(DefaultStateTransferManager.class);
+   protected static final boolean trace = log.isTraceEnabled();
 
    public static final NodeData STREAMING_DELIMITER_NODE = new NodeDataMarker();
 
@@ -47,25 +47,23 @@
    boolean fetchTransientState;
    boolean fetchPersistentState;
    protected long stateRetrievalTimeout;
-   private NodeFactory nodeFactory;
    protected StateTransferIntegrator integrator;
    protected StateTransferGenerator generator;
 
 
    @Inject
-   public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager, NodeFactory nodeFactory, StateTransferIntegrator integrator, StateTransferGenerator generator)
+   public void injectDependencies(CacheSPI cache, Marshaller marshaller, RegionManager regionManager, Configuration configuration, CacheLoaderManager cacheLoaderManager, StateTransferIntegrator integrator, StateTransferGenerator generator)
    {
       this.cache = cache;
       this.regionManager = regionManager;
       this.marshaller = marshaller;
       this.configuration = configuration;
       this.cacheLoaderManager = cacheLoaderManager;
-      this.nodeFactory = nodeFactory;
       this.integrator = integrator;
       this.generator = generator;
    }
 
-   @Start(priority = 19)
+   @Start(priority = 14)
    public void start()
    {
       fetchTransientState = configuration.isFetchInMemoryState();
@@ -77,19 +75,18 @@
    {
       // can't give state for regions currently being activated/inactivated
       boolean canProvideState = (!regionManager.isInactive(fqn) && cache.peek(fqn, false) != null);
-
+      if (trace) log.trace("Can provide state? " + canProvideState);
       if (canProvideState && (fetchPersistentState || fetchTransientState))
       {
          marshaller.objectToObjectStream(true, out);
          long startTime = System.currentTimeMillis();
-         InternalNode rootNode = cache.getRoot().getDelegationTarget();
+         InternalNode subtreeRoot = fqn.isRoot() ? cache.getRoot().getDelegationTarget() : cache.getNode(fqn).getDelegationTarget();
 
          // we don't need READ locks for MVCC based state transfer!
-
          if (log.isDebugEnabled())
-            log.debug("locking the " + fqn + " subtree to return the in-memory (transient) state");
+            log.debug("Generating in-memory (transient) state for subtree " + fqn);
 
-         generator.generateState(out, rootNode, fetchTransientState, fetchPersistentState, suppressErrors);
+         generator.generateState(out, subtreeRoot, fetchTransientState, fetchPersistentState, suppressErrors);
 
          if (log.isDebugEnabled())
             log.debug("Successfully generated state in " + (System.currentTimeMillis() - startTime) + " msec");
@@ -175,8 +172,9 @@
        * to be directly stored into a tree since we bypass interceptor chain.
        *
        */
-      if (log.isDebugEnabled()) log.debug("starting state integration at node " + targetRoot);
-      integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn());
+      if (log.isDebugEnabled())
+         log.debug("starting state integration at node " + targetRoot + ".  Fetch Persistent State = " + fetchPersistentState);
+      integrator.integrateState(state, targetRoot.getDelegationTarget(), targetRoot.getFqn(), fetchPersistentState);
 
       if (log.isDebugEnabled())
          log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferIntegrator.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -63,7 +63,7 @@
       this.internalFqns = cache.getInternalFqns();
    }
 
-   public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws Exception
+   public void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn, boolean integratePersistentState) throws Exception
    {
       // pop version from the stream first!
       short version = (Short) cache.getMarshaller().objectFromObjectStream(ois);
@@ -72,7 +72,7 @@
       // read another marker for the dummy associated state
       if (trace) log.trace("Reading marker for nonexistent associated state");
       cache.getMarshaller().objectFromObjectStream(ois);
-      integratePersistentState(ois, targetFqn);
+      if (integratePersistentState) integratePersistentState(ois, targetFqn);
    }
 
    protected void integrateTransientState(ObjectInputStream in, NodeSPI target) throws Exception

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/LegacyStateTransferManager.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -121,7 +121,7 @@
          {
             log.debug("starting state integration at node " + targetRoot);
          }
-         integrator.integrateState(state, targetRoot, targetRoot.getFqn());
+         integrator.integrateState(state, targetRoot, targetRoot.getFqn(), fetchPersistentState);
          if (log.isDebugEnabled())
          {
             log.debug("successfully integrated state in " + (System.currentTimeMillis() - startTime) + " msec");

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/StateTransferIntegrator.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -16,5 +16,5 @@
 public interface StateTransferIntegrator
 {
    // TODO: target is an Object to support both InternalNodes and NodeSPIs.
-   void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn) throws Exception;
+   void integrateState(ObjectInputStream ois, Object target, Fqn targetFqn, boolean integratePersistentState) throws Exception;
 }
\ No newline at end of file

Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransfer200Test.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -11,12 +11,12 @@
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
 import org.jboss.cache.Region;
-import org.jboss.cache.util.TestingUtil;
 import org.jboss.cache.buddyreplication.BuddyFqnTransformer;
 import org.jboss.cache.buddyreplication.BuddyManager;
 import org.jboss.cache.config.BuddyReplicationConfig;
 import org.jboss.cache.loader.CacheLoader;
 import org.jboss.cache.marshall.MarshalledValue;
+import org.jboss.cache.util.TestingUtil;
 import static org.testng.AssertJUnit.*;
 import org.testng.annotations.Test;
 
@@ -407,7 +407,7 @@
 
    public void testStalePersistentState() throws Exception
    {
-      CacheSPI c1 = createCache("1", true, false, true, false);
+      CacheSPI c1 = createCache("1", true, false, true, true);
       c1.put(A, "K", "V");
 
       assert c1.get(A, "K").equals("V");
@@ -423,10 +423,12 @@
       assert l1.exists(A);
       assert l1.get(A).get("K").equals("V");
 
-      Cache c2 = createCache("2", true, false, true, false);
+      Cache c2 = createCache("2", true, false, true, true);
 
       c2.put(B, "K", "V");
 
+      assert c1.getConfiguration().isFetchInMemoryState();
+      assert c1.getConfiguration().getCacheLoaderConfig().isFetchPersistentState();
       c1.start();
 
       assert c1.get(B, "K").equals("V");

Modified: core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java
===================================================================
--- core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java	2008-07-30 16:06:38 UTC (rev 6445)
+++ core/trunk/src/test/java/org/jboss/cache/statetransfer/StateTransferTestBase.java	2008-07-30 17:13:20 UTC (rev 6446)
@@ -269,17 +269,12 @@
       cache1.put(A_C, "name", BOB);
       cache1.put(A_C, "age", FORTY);
 
-      CacheSPI<Object, Object> cache2 = createCache("cache2", false, false, cacheLoaderClass2, asyncLoader, false, true);
+      CacheSPI<Object, Object> cache2 = createCache("cache2", false, false, cacheLoaderClass2, asyncLoader, true, true);
 
-      cache2.start();
-
       // Pause to give caches time to see each other
       TestingUtil.blockUntilViewsReceived(new CacheSPI[]{cache1, cache2}, 60000);
 
-      if (asyncLoader)
-      {
-         TestingUtil.sleepThread((long) 100);
-      }
+      if (asyncLoader) TestingUtil.sleepThread(100);
 
       CacheLoader loader = cache2.getCacheLoaderManager().getCacheLoader();
 




More information about the jbosscache-commits mailing list