[jbosscache-commits] JBoss Cache SVN: r6944 - core/branches/flat/src/main/java/org/jboss/starobrno/remoting.

jbosscache-commits at lists.jboss.org jbosscache-commits at lists.jboss.org
Tue Oct 14 13:49:09 EDT 2008


Author: mircea.markus
Date: 2008-10-14 13:49:09 -0400 (Tue, 14 Oct 2008)
New Revision: 6944

Added:
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
Modified:
   core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
Log:


Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java	2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,407 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.starobrno.io.ExposedByteArrayOutputStream;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.NonVolatile;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at NonVolatile
+public class ChannelMessageListener implements ExtendedMessageListener
+{
+   /**
+    * Reference to an exception that was raised during
+    * state installation on this node.
+    */
+   protected volatile Exception setStateException;
+   private final Object stateLock = new Object();
+   private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+   private static final boolean trace = log.isTraceEnabled();
+   private StateTransferManager stateTransferManager;
+   private Configuration configuration;
+   /**
+    * True if state was initialized during start-up.
+    */
+   private volatile boolean isStateSet = false;
+
+
+   @Inject
+   private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+   {
+      this.stateTransferManager = stateTransferManager;
+      this.configuration = configuration;
+   }
+
+   public boolean isStateSet()
+   {
+      return isStateSet;
+   }
+
+   public void setStateSet(boolean stateSet)
+   {
+      isStateSet = stateSet;
+   }
+
+   public void waitForState() throws Exception
+   {
+      synchronized (stateLock)
+      {
+         while (!isStateSet)
+         {
+            if (setStateException != null)
+            {
+               throw setStateException;
+            }
+
+            try
+            {
+               stateLock.wait();
+            }
+            catch (InterruptedException iex)
+            {
+            }
+         }
+      }
+   }
+
+   protected void stateReceivedSuccess()
+   {
+      isStateSet = true;
+      setStateException = null;
+   }
+
+   protected void stateReceivingFailed(Throwable t)
+   {
+      if (t instanceof CacheException)
+      {
+         log.debug(t);
+      }
+      else
+      {
+         log.error("failed setting state", t);
+      }
+      if (t instanceof Exception)
+      {
+         setStateException = (Exception) t;
+      }
+      else
+      {
+         setStateException = new Exception(t);
+      }
+   }
+
+   protected void stateProducingFailed(Throwable t)
+   {
+      if (t instanceof CacheException)
+      {
+         log.debug(t);
+      }
+      else
+      {
+         log.error("Caught " + t.getClass().getName()
+               + " while responding to state transfer request", t);
+      }
+   }
+
+   /**
+    * Callback, does nothing.
+    */
+   public void receive(Message msg)
+   {
+   }
+
+   public byte[] getState()
+   {
+      MarshalledValueOutputStream out = null;
+      byte[] result;
+      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+      try
+      {
+         out = new MarshalledValueOutputStream(baos);
+
+         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         result = baos.getRawBuffer();
+         Util.close(out);
+      }
+      return result;
+   }
+
+   public void setState(byte[] new_state)
+   {
+      if (new_state == null)
+      {
+         log.debug("transferred state is null (may be first member in cluster)");
+         return;
+      }
+      ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+      MarshalledValueInputStream in = null;
+      try
+      {
+         in = new MarshalledValueInputStream(bais);
+         stateTransferManager.setState(in, Fqn.ROOT);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public byte[] getState(String state_id)
+   {
+      if (trace) log.trace("Getting state for state id " + state_id);
+      MarshalledValueOutputStream out = null;
+      String sourceRoot = state_id;
+      byte[] result;
+
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+      }
+
+      ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+      try
+      {
+         out = new MarshalledValueOutputStream(baos);
+
+         stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+               configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         result = baos.getRawBuffer();
+         Util.close(out);
+      }
+      return result;
+   }
+
+   public void getState(OutputStream ostream)
+   {
+      MarshalledValueOutputStream out = null;
+      try
+      {
+         out = new MarshalledValueOutputStream(ostream);
+         stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         Util.close(out);
+      }
+   }
+
+   public void getState(String state_id, OutputStream ostream)
+   {
+      if (trace) log.trace("Getting state for state id " + state_id);
+      String sourceRoot = state_id;
+      MarshalledValueOutputStream out = null;
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+      }
+      try
+      {
+         out = new MarshalledValueOutputStream(ostream);
+         stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+      }
+      catch (Throwable t)
+      {
+         stateProducingFailed(t);
+      }
+      finally
+      {
+         Util.close(out);
+      }
+   }
+
+   public void setState(InputStream istream)
+   {
+      if (istream == null)
+      {
+         log.debug("stream is null (may be first member in cluster)");
+         return;
+      }
+      MarshalledValueInputStream in = null;
+      try
+      {
+         in = new MarshalledValueInputStream(istream);
+         stateTransferManager.setState(in, Fqn.ROOT);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public void setState(String state_id, byte[] state)
+   {
+      if (trace) log.trace("Receiving state for " + state_id);
+      if (state == null)
+      {
+         log.debug("partial transferred state is null");
+         return;
+      }
+
+      MarshalledValueInputStream in = null;
+      String targetRoot = state_id;
+      boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         targetRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+      }
+      try
+      {
+         log.debug("Setting received partial state for subroot " + state_id);
+         Fqn subroot = Fqn.fromString(targetRoot);
+//            Region region = regionManager.getRegion(subroot, false);
+//            ClassLoader cl = null;
+//            if (region != null)
+//            {
+//               // If a classloader is registered for the node's region, use it
+//               cl = region.getClassLoader();
+//            }
+         ByteArrayInputStream bais = new ByteArrayInputStream(state);
+         in = new MarshalledValueInputStream(bais);
+         //getStateTransferManager().setState(in, subroot, cl);
+         stateTransferManager.setState(in, subroot);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+
+   public void setState(String stateId, InputStream istream)
+   {
+      if (trace) log.trace("Receiving state for " + stateId);
+      String targetRoot = stateId;
+      MarshalledValueInputStream in = null;
+      boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+      if (hasDifferentSourceAndIntegrationRoots)
+      {
+         targetRoot = stateId.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+      }
+      if (istream == null)
+      {
+         log.debug("stream is null (may be first member in cluster). State is not set");
+         return;
+      }
+
+      try
+      {
+         log.debug("Setting received partial state for subroot " + stateId);
+         in = new MarshalledValueInputStream(istream);
+         Fqn subroot = Fqn.fromString(targetRoot);
+//            Region region = regionManager.getRegion(subroot, false);
+//            ClassLoader cl = null;
+//            if (region != null)
+//            {
+//               // If a classloader is registered for the node's region, use it
+//               cl = region.getClassLoader();
+//            }
+         //getStateTransferManager().setState(in, subroot, cl);
+         stateTransferManager.setState(in, subroot);
+         stateReceivedSuccess();
+      }
+      catch (Throwable t)
+      {
+         if (log.isTraceEnabled()) log.trace("Unknown error while integrating state", t);
+         stateReceivingFailed(t);
+      }
+      finally
+      {
+         Util.close(in);
+         synchronized (stateLock)
+         {
+            // Notify wait that state has been set.
+            stateLock.notifyAll();
+         }
+      }
+   }
+}
\ No newline at end of file


Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
___________________________________________________________________
Name: svn:mergeinfo
   + 

Modified: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-10-14 17:47:27 UTC (rev 6943)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java	2008-10-14 17:49:09 UTC (rev 6944)
@@ -23,21 +23,10 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.NodeSPI;
-import org.jboss.cache.SuspectException;
-import org.jboss.cache.jmx.annotations.MBean;
-import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
-import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
-import org.jboss.cache.util.reflect.ReflectionUtil;
 import org.jboss.starobrno.CacheException;
 import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.context.InvocationContext;
 import org.jboss.starobrno.commands.ReplicableCommand;
 import org.jboss.starobrno.config.Configuration;
 import org.jboss.starobrno.config.RuntimeConfig;
@@ -47,18 +36,19 @@
 import org.jboss.starobrno.factories.annotations.Stop;
 import org.jboss.starobrno.interceptors.InterceptorChain;
 import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.jmx.annotations.MBean;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
 import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.lock.TimeoutException;
+import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
 import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
 import org.jboss.starobrno.transaction.TransactionTable;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.ChannelException;
-import org.jgroups.ChannelFactory;
-import org.jgroups.ExtendedMembershipListener;
-import org.jgroups.JChannel;
-import org.jgroups.StateTransferException;
-import org.jgroups.View;
+import org.jboss.starobrno.util.ReflectionUtil;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
+import org.jgroups.*;
 import org.jgroups.blocks.GroupRequest;
 import org.jgroups.blocks.RspFilter;
 import org.jgroups.protocols.TP;
@@ -70,10 +60,7 @@
 import java.text.NumberFormat;
 import java.util.ArrayList;
 import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Set;
 import java.util.Vector;
 import java.util.concurrent.TimeUnit;
 
@@ -115,19 +102,18 @@
    private CacheSPI spi;
    private InvocationContextContainer invocationContextContainer;
    private final boolean trace = log.isTraceEnabled();
-   private Marshaller marshaller;
+   private ExtendedMarshaller extendedMarshaller;
    private TransactionManager txManager;
    private TransactionTable txTable;
    private InterceptorChain interceptorChain;
 
-   private boolean isUsingBuddyReplication;
    private boolean isInLocalMode;
    private ComponentRegistry componentRegistry;
    private LockManager lockManager;
 
    @Inject
    public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
-                                 Marshaller marshaller, TransactionTable txTable,
+                                 ExtendedMarshaller extendedMarshaller, TransactionTable txTable,
                                  TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
                                  ComponentRegistry componentRegistry, LockManager lockManager)
    {
@@ -136,7 +122,7 @@
       this.notifier = notifier;
       // TODO: Inject cacheSPI when we are ready
 //      this.spi = spi;
-      this.marshaller = marshaller;
+      this.extendedMarshaller = extendedMarshaller;
       this.txManager = txManager;
       this.txTable = txTable;
       this.invocationContextContainer = container;
@@ -150,67 +136,27 @@
    @Start(priority = 15)
    public void start()
    {
-      switch (configuration.getCacheMode())
+      if (configuration.getCacheMode().equals(Configuration.CacheMode.LOCAL))
       {
-         case LOCAL:
-            log.debug("cache mode is local, will not create the channel");
-            isInLocalMode = true;
-            isUsingBuddyReplication = false;
-            break;
-         case REPL_SYNC:
-         case REPL_ASYNC:
-         case INVALIDATION_ASYNC:
-         case INVALIDATION_SYNC:
-            isInLocalMode = false;
-            isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
-            if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+         log.debug("cache mode is local, will not create the channel");
+         isInLocalMode = true;
+         return;
+      }
+      isInLocalMode = false;
+      if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
 
-            boolean fetchState = shouldFetchStateOnStartup();
-            initialiseChannelAndRpcDispatcher(fetchState);
+      initialiseChannelAndRpcDispatcher();
 
-            if (fetchState)
-            {
-               try
-               {
-                  long start = System.currentTimeMillis();
-                  // connect and state transfer
-                  channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
-                  //if I am not the only and the first member than wait for a state to arrive
-                  if (getMembers().size() > 1) messageListener.waitForState();
-
-                  if (log.isDebugEnabled())
-                     log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
-               }
-               catch (StateTransferException ste)
-               {
-                  // make sure we disconnect from the channel before we throw this exception!
-                  // JBCACHE-761
-                  disconnect();
-                  throw new CacheException("Unable to fetch state on startup", ste);
-               }
-               catch (ChannelException e)
-               {
-                  throw new CacheException("Unable to connect to JGroups channel", e);
-               }
-               catch (Exception ex)
-               {
-                  throw new CacheException("Unable to fetch state on startup", ex);
-               }
-            }
-            else
-            {
-               //otherwise just connect
-               try
-               {
-                  channel.connect(configuration.getClusterName());
-               }
-               catch (ChannelException e)
-               {
-                  throw new CacheException("Unable to connect to JGroups channel", e);
-               }
-            }
-            if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+      //otherwise just connect
+      try
+         {
+            channel.connect(configuration.getClusterName());
+         }
+         catch (ChannelException e)
+      {
+         throw new CacheException("Unable to connect to JGroups channel", e);
       }
+      if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
    }
 
    public void disconnect()
@@ -250,17 +196,24 @@
       rpcDispatcher = null;
    }
 
-   /**
-    * @return true if we need to fetch state on startup.  I.e., initiate a state transfer.
-    */
-   private boolean shouldFetchStateOnStartup()
+   @SuppressWarnings("deprecation")
+   private void initialiseChannelAndRpcDispatcher() throws CacheException
    {
-      boolean loaderFetch = configuration.getCacheLoaderConfig() != null && configuration.getCacheLoaderConfig().isFetchPersistentState();
-      return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication && (configuration.isFetchInMemoryState() || loaderFetch);
+      buildChannel();
+      // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+      // remote instances will be received by self.
+      channel.setOpt(Channel.LOCAL, false);
+      channel.setOpt(Channel.AUTO_RECONNECT, true);
+      channel.setOpt(Channel.AUTO_GETSTATE, false);
+      channel.setOpt(Channel.BLOCK, true);
+      rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
+               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+      checkAppropriateConfig();
+      rpcDispatcher.setRequestMarshaller(extendedMarshaller);
+      rpcDispatcher.setResponseMarshaller(extendedMarshaller);
    }
 
-   @SuppressWarnings("deprecation")
-   private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws CacheException
+   private void buildChannel()
    {
       channel = configuration.getRuntimeConfig().getChannel();
       if (channel == null)
@@ -301,29 +254,6 @@
 
          configuration.getRuntimeConfig().setChannel(channel);
       }
-
-      // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
-      // remote instances will be received by self.
-      channel.setOpt(Channel.LOCAL, false);
-      channel.setOpt(Channel.AUTO_RECONNECT, true);
-      channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
-      channel.setOpt(Channel.BLOCK, true);
-      // todo fix me
-      /*
-      if (configuration.isUseRegionBasedMarshalling())
-      {
-         rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               spi, invocationContextContainer, interceptorChain, componentRegistry);
-      }
-      else
-      {
-         rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
-               invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
-      }
-      */
-      checkAppropriateConfig();
-      rpcDispatcher.setRequestMarshaller(marshaller);
-      rpcDispatcher.setResponseMarshaller(marshaller);
    }
 
    public Channel getChannel()
@@ -357,37 +287,37 @@
 
 
    @Deprecated
-   private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+   private void removeLocksForDeadMembers(List deadMembers)
    {
-      Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
-      Object owner = lockManager.getOwner(node);
-
-      // todo fix me
-      /*
-      if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
-
-
-      for (Object readOwner : lockManager.getReadOwners(node))
-      {
-         if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
-      }
-      */
-
-      for (GlobalTransaction deadOwner : deadOwners)
-      {
-         boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
-         // TODO: Fix me!!!
+//      Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+//      Object owner = lockManager.getOwner(node);
+//
+//       todo fix me
+//      /*
+//      if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
+//
+//
+//      for (Object readOwner : lockManager.getReadOwners(node))
+//      {
+//         if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
+//      }
+//      */
+//
+//      for (GlobalTransaction deadOwner : deadOwners)
+//      {
+//         boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+//          TODO: Fix me!!!
 //         boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager, deadOwner, localTx, txTable, txManager);
-         boolean broken = true;
-
-         if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
-      }
-
-      // Recursively unlock children
-      for (Object child : node.getChildrenDirect())
-      {
-         removeLocksForDeadMembers((NodeSPI) child, deadMembers);
-      }
+//         boolean broken = true;
+//
+//         if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+//      }
+//
+//       Recursively unlock children
+//      for (Object child : node.getChildrenDirect())
+//      {
+//         removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+//      }
    }
 
 
@@ -463,7 +393,7 @@
          }
          useOutOfBandMessage = false;
          // todo fix me!!
-         RspList rsps = null;//rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+         RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, useOutOfBandMessage, responseFilter);
          if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
          if (trace)
             log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
@@ -512,91 +442,6 @@
       }
    }
 
-   // ------------ START: Partial state transfer methods ------------
-
-   public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
-   {
-      String encodedStateId = sourceTarget + DefaultStateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
-      fetchPartialState(sources, encodedStateId);
-   }
-
-   public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
-   {
-      if (subtree == null)
-      {
-         throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
-      }
-      fetchPartialState(sources, subtree.toString());
-   }
-
-   private void fetchPartialState(List<Address> sources, String stateId) throws Exception
-   {
-      if (sources == null || sources.isEmpty() || stateId == null)
-      {
-         // should this really be throwing an exception?  Are there valid use cases where partial state may not be available? - Manik
-         // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
-         //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         if (log.isWarnEnabled())
-            log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
-         return;
-      }
-
-      List<Address> targets = new LinkedList<Address>(sources);
-
-      //skip *this* node as a target
-      targets.remove(getLocalAddress());
-
-      if (targets.isEmpty())
-      {
-         // Definitely no exception here -- this happens every time the 1st node in the
-         // cluster activates a region!! -- Brian
-         if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are no target members specified");
-         return;
-      }
-
-      if (log.isDebugEnabled())
-         log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
-      boolean successfulTransfer = false;
-      for (Address target : targets)
-      {
-         try
-         {
-            if (log.isDebugEnabled())
-               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
-            messageListener.setStateSet(false);
-            successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
-            if (successfulTransfer)
-            {
-               try
-               {
-                  messageListener.waitForState();
-               }
-               catch (Exception transferFailed)
-               {
-                  if (log.isTraceEnabled()) log.trace("Error while fetching state", transferFailed);
-                  successfulTransfer = false;
-               }
-            }
-            if (log.isDebugEnabled())
-               log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
-            if (successfulTransfer) break;
-         }
-         catch (IllegalStateException ise)
-         {
-            // thrown by the JGroups channel if state retrieval fails.
-            if (log.isInfoEnabled())
-               log.info("Channel problems fetching state.  Continuing on to next provider. ", ise);
-         }
-      }
-
-      if (!successfulTransfer)
-      {
-         if (log.isDebugEnabled())
-            log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
-      }
-
-   }
-
    // ------------ END: Partial state transfer methods ------------
 
    // ------------ START: Informational methods ------------
@@ -671,12 +516,12 @@
                   removed.removeAll(newMembers);
                   spi.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
                   // todo fix me
-                  NodeSPI root = null; // spi.getRoot();
-                  if (root != null)
-                  {
-                     // todo fix me
-                     //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
-                  }
+//                  NodeSPI root = null; // spi.getRoot();
+//                  if (root != null)
+//                  {
+                  // todo fix me
+                  //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
+//                  }
                }
 
                members = new ArrayList<Address>(newMembers); // defensive copy.
@@ -690,10 +535,8 @@
             // now notify listeners - *after* updating the coordinator. - JBCACHE-662
             if (needNotification && notifier != null)
             {
-               // TODO: Fix me when we have repl working
-               throw new UnsupportedOperationException("Fix me!");
-//               InvocationContext ctx = spi.getInvocationContext();
-//               notifier.notifyViewChange(newView, ctx);
+               InvocationContext ctx = invocationContextContainer.get();
+               notifier.notifyViewChange(newView, ctx);
             }
 
             // Wake up any threads that are waiting to know about who the coordinator is

Added: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java	                        (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java	2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.CacheException;
+
+/**
+ * Thrown when a member is suspected during remote method invocation
+ *
+ * @author Bela Ban
+ * @version $Id: SuspectException.java 6886 2008-10-08 16:29:32Z manik.surtani at jboss.com $
+ */
+public class SuspectException extends CacheException
+{
+
+   private static final long serialVersionUID = -2965599037371850141L;
+
+   public SuspectException()
+   {
+      super();
+   }
+
+   public SuspectException(String msg)
+   {
+      super(msg);
+   }
+
+   public SuspectException(String msg, Throwable cause)
+   {
+      super(msg, cause);
+   }
+}




More information about the jbosscache-commits mailing list