[jbosscache-commits] JBoss Cache SVN: r4946 - in core/trunk/src/main/java/org/jboss/cache: buddyreplication and 4 other directories.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Wed Jan 2 14:32:44 EST 2008


Author: manik.surtani at jboss.com
Date: 2008-01-02 14:32:44 -0500 (Wed, 02 Jan 2008)
New Revision: 4946

Modified:
   core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
   core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
   core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
   core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
   core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java
Log:
Updated state transfer and BR codebases

Modified: core/trunk/src/main/java/org/jboss/cache/InvocationContext.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/InvocationContext.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/InvocationContext.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -88,6 +88,11 @@
       return optionOverrides;
    }
 
+   public boolean isOptionsUninitialised()
+   {
+      return optionOverrides == null;
+   }
+
    /**
     * Sets the option overrides associated with this invocation
     *
@@ -122,13 +127,13 @@
    public String toString()
    {
       return "InvocationContext{" +
-              "methodCall=" + methodCall +
-              "transaction=" + transaction +
-              ", globalTransaction=" + globalTransaction +
-              ", optionOverrides=" + optionOverrides +
-              ", originLocal=" + originLocal +
-              ", txHasMods=" + txHasMods +
-              '}';
+            "methodCall=" + methodCall +
+            "transaction=" + transaction +
+            ", globalTransaction=" + globalTransaction +
+            ", optionOverrides=" + optionOverrides +
+            ", originLocal=" + originLocal +
+            ", txHasMods=" + txHasMods +
+            '}';
    }
 
    public boolean isTxHasMods()
@@ -255,10 +260,11 @@
     * If the acq timeout if overwritten for current call, then return that one.
     * If not overwritten return default value.
     */
-   public long getContextLockAcquisitionTimeout(long defaultFalue) {
+   public long getContextLockAcquisitionTimeout(long defaultFalue)
+   {
       long timeout = defaultFalue;
       if (getOptionOverrides() != null
-              && getOptionOverrides().getLockAcquisitionTimeout() >= 0)
+            && getOptionOverrides().getLockAcquisitionTimeout() >= 0)
       {
          timeout = getOptionOverrides().getLockAcquisitionTimeout();
       }

Modified: core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/buddyreplication/BuddyManager.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -11,12 +11,14 @@
 import org.jboss.cache.CacheException;
 import org.jboss.cache.CacheSPI;
 import org.jboss.cache.Fqn;
+import org.jboss.cache.Node;
 import org.jboss.cache.RPCManager;
 import org.jboss.cache.Region;
 import org.jboss.cache.RegionManager;
 import org.jboss.cache.config.BuddyReplicationConfig;
 import org.jboss.cache.config.BuddyReplicationConfig.BuddyLocatorConfig;
 import org.jboss.cache.config.Configuration;
+import org.jboss.cache.config.Option;
 import org.jboss.cache.factories.annotations.Inject;
 import org.jboss.cache.factories.annotations.Stop;
 import org.jboss.cache.lock.TimeoutException;
@@ -251,6 +253,8 @@
 
       broadcastBuddyPoolMembership();
 
+      if (!cache.exists(BUDDY_BACKUP_SUBTREE_FQN)) cache.getRoot().addChildDirect(BUDDY_BACKUP_SUBTREE_FQN);
+
       // allow waiting threads to process.
       initialisationLatch.countDown();
 
@@ -519,43 +523,60 @@
       Fqn integrationBase = new Fqn(BuddyManager.BUDDY_BACKUP_SUBTREE_FQN,
             newGroup.getGroupName());
 
-      for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
+
+      if (state.isEmpty())
       {
-         Fqn fqn = entry.getKey();
-         if (!regionManager.isInactive(fqn))
+         if (configuredToFetchState())
+            log.info("Data owner has no state to set, even though buddy is configured to accept state.  Assuming there is no data on the data owner.");
+         // create the backup region anyway
+         Option o = cache.getInvocationContext().getOptionOverrides();
+         o.setSkipCacheStatusCheck(true);
+         Node root = cache.getRoot();
+         o = cache.getInvocationContext().getOptionOverrides();
+         o.setCacheModeLocal(true);
+         o.setSkipCacheStatusCheck(true);
+         root.addChild(new Fqn<String>(BUDDY_BACKUP_SUBTREE, newGroup.getGroupName()));
+      }
+      else
+      {
+         for (Map.Entry<Fqn, byte[]> entry : state.entrySet())
          {
-            //ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
-            Fqn integrationRoot = new Fqn(integrationBase, fqn);
+            Fqn fqn = entry.getKey();
+            if (!regionManager.isInactive(fqn))
+            {
+               //ClassLoader cl = (marshaller == null) ? null : marshaller.getClassLoader(fqnS);
+               Fqn integrationRoot = new Fqn(integrationBase, fqn);
 
-            byte[] stateBuffer = entry.getValue();
-            MarshalledValueInputStream in = null;
-            try
-            {
-               ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
-               in = new MarshalledValueInputStream(bais);
-               //stateMgr.setState(in, integrationRoot, cl);
-               stateTransferManager.setState(in, integrationRoot);
-            }
-            catch (Throwable t)
-            {
-               if (t instanceof CacheException)
+               byte[] stateBuffer = entry.getValue();
+               MarshalledValueInputStream in = null;
+               try
                {
-                  //excepected/common and can happen due to inactive regions and so on
-                  log.debug(t);
+                  ByteArrayInputStream bais = new ByteArrayInputStream(stateBuffer);
+                  in = new MarshalledValueInputStream(bais);
+                  //stateMgr.setState(in, integrationRoot, cl);
+                  stateTransferManager.setState(in, integrationRoot);
                }
-               else
+               catch (Throwable t)
                {
-                  //something has gone wrong
-                  log.error("State for fqn " + fqn
-                        + " could not be transferred to a buddy at "
-                        + cache.getLocalAddress(), t);
+                  if (t instanceof CacheException)
+                  {
+                     //excepected/common and can happen due to inactive regions and so on
+                     log.debug(t);
+                  }
+                  else
+                  {
+                     //something has gone wrong
+                     log.error("State for fqn " + fqn
+                           + " could not be transferred to a buddy at "
+                           + cache.getLocalAddress(), t);
+                  }
                }
-            }
-            finally
-            {
-               if (in != null)
+               finally
                {
-                  in.close();
+                  if (in != null)
+                  {
+                     in.close();
+                  }
                }
             }
          }
@@ -746,25 +767,36 @@
       // Create the state transfer map
 
       Map<Fqn, byte[]> stateMap = new HashMap<Fqn, byte[]>();
-      byte[] state;
-      if (configuration.isUseRegionBasedMarshalling())
+      if (configuredToFetchState())
       {
-         Collection<Region> regions = regionManager.getAllRegions(Region.Type.MARSHALLING);
-         if (regions.size() > 0)
+         byte[] state;
+         if (configuration.isUseRegionBasedMarshalling())
          {
-            for (Region r : regions)
+            Collection<Region> regions = regionManager.getAllRegions(Region.Type.MARSHALLING);
+            if (regions.size() > 0)
             {
-               Fqn f = r.getFqn();
-               state = acquireState(f);
+               for (Region r : regions)
+               {
+                  Fqn f = r.getFqn();
+                  state = acquireState(f);
+                  if (state != null)
+                  {
+                     stateMap.put(f, state);
+                  }
+               }
+            }
+            else if (!configuration.isInactiveOnStartup())
+            {
+               // No regions defined; try the root
+               state = acquireState(Fqn.ROOT);
                if (state != null)
                {
-                  stateMap.put(f, state);
+                  stateMap.put(Fqn.ROOT, state);
                }
             }
          }
-         else if (!configuration.isInactiveOnStartup())
+         else
          {
-            // No regions defined; try the root
             state = acquireState(Fqn.ROOT);
             if (state != null)
             {
@@ -772,14 +804,6 @@
             }
          }
       }
-      else
-      {
-         state = acquireState(Fqn.ROOT);
-         if (state != null)
-         {
-            stateMap.put(Fqn.ROOT, state);
-         }
-      }
 
       // now broadcast a message to the newly assigned buddies.
       MethodCall membershipCall = MethodCallFactory.create(MethodDeclarations.remoteAssignToBuddyGroupMethod, buddyGroup, stateMap);
@@ -827,6 +851,11 @@
       log.trace("addToGroup notification complete");
    }
 
+   private boolean configuredToFetchState()
+   {
+      return configuration.isFetchInMemoryState() || (cache.getCacheLoaderManager() != null && cache.getCacheLoaderManager().isFetchPersistentState());
+   }
+
    private byte[] acquireState(Fqn fqn) throws CacheException
    {
       // Call _getState with progressively longer timeouts until we

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/BaseTransactionalContextInterceptor.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -37,6 +37,7 @@
       {
          Option txScopeOption = new Option();
          txScopeOption.setCacheModeLocal(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isCacheModeLocal());
+         txScopeOption.setSkipCacheStatusCheck(ctx.getOptionOverrides() != null && ctx.getOptionOverrides().isSkipCacheStatusCheck());
          entry.setOption(txScopeOption);
       }
    }

Modified: core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/interceptors/TxInterceptor.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -1170,8 +1170,6 @@
 
       public void beforeCompletion()
       {
-         if (!cache.getCacheStatus().allowInvocations()) throw new IllegalStateException("Cache not in STARTED state!");
-
          if (log.isTraceEnabled()) log.trace("Running beforeCompletion on gtx " + gtx);
          entry = txTable.get(gtx);
          if (entry == null)
@@ -1183,6 +1181,7 @@
 
          modifications = entry.getModifications();
          ctx = cache.getInvocationContext();
+         if (ctx.isOptionsUninitialised() && entry.getOption() != null) ctx.setOptionOverrides(entry.getOption());
          ctx.setOriginLocal(false);
       }
 
@@ -1190,11 +1189,21 @@
       // it is supposed to be post commit not actually run the commit
       public void afterCompletion(int status)
       {
-         if (!cache.getCacheStatus().allowInvocations()) throw new IllegalStateException("Cache not in STARTED state!");
+         // could happen if a rollback is called and beforeCompletion() doesn't get called.
+         if (ctx == null)
+         {
+            ctx = cache.getInvocationContext();
+         }
+
+         if (ctx.isOptionsUninitialised() && entry.getOption() != null)
+         {
+            // use the options from the transaction entry instead
+            ctx.setOptionOverrides(entry.getOption());
+         }
+
          try
          {
-            // could happen if a rollback is called and beforeCompletion() doesn't get called.
-            if (ctx == null) ctx = cache.getInvocationContext();
+            assertCanContinue();
             setTransactionalContext(tx, gtx, ctx);
 
             try
@@ -1249,6 +1258,12 @@
          }
       }
 
+      private void assertCanContinue()
+      {
+         if (!cache.getCacheStatus().allowInvocations() && (ctx.getOptionOverrides() == null || !ctx.getOptionOverrides().isSkipCacheStatusCheck()))
+            throw new IllegalStateException("Cache not in STARTED state!");
+      }
+
       /**
        * Cleans out (nullifies) member variables held by the sync object for easier gc.  Could be (falsely) seen as a mem
        * leak if the TM implementation hangs on to the synchronizations for an unnecessarily long time even after the tx

Modified: core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/invocation/CacheInvocationDelegate.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -481,6 +481,7 @@
       if (peek(fqn, false, false) == null)
       {
          getInvocationContext().getOptionOverrides().setFailSilently(true);
+         getInvocationContext().getOptionOverrides().setForceAsynchronous(true);
          //GlobalTransaction tx = cache.getCurrentTransaction();
          MethodCall m = MethodCallFactory.create(MethodDeclarations.putForExternalReadMethodLocal, null, fqn, key, value);
          invoke(m);

Modified: core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/statetransfer/DefaultStateTransferIntegrator.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -381,6 +381,7 @@
    private Node getInternalNode(Node parent, Fqn internalFqn)
    {
       Object name = internalFqn.get(parent.getFqn().size());
+      cache.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
       Node result = parent.getChild(new Fqn(name));
       if (result != null)
       {

Modified: core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java
===================================================================
--- core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java	2008-01-02 19:32:14 UTC (rev 4945)
+++ core/trunk/src/main/java/org/jboss/cache/util/reflect/ClasspathScanner.java	2008-01-02 19:32:44 UTC (rev 4946)
@@ -51,7 +51,7 @@
       try
       {
          // only scan the current ClassPath location that contains this file.  Could be a directory or a JAR file.
-         URL url = getURLPathFromClassLoader("org/jboss/cache/Version.class");
+         URL url = getURLPathFromClassLoader("org/jboss/cache/Cache.class");
          String urlPath = url.getFile();
          if (urlPath.endsWith("/"))
          {




More information about the jbosscache-commits mailing list