[jbosscache-commits] JBoss Cache SVN: r6944 - core/branches/flat/src/main/java/org/jboss/starobrno/remoting.
jbosscache-commits at lists.jboss.org
jbosscache-commits at lists.jboss.org
Tue Oct 14 13:49:09 EDT 2008
Author: mircea.markus
Date: 2008-10-14 13:49:09 -0400 (Tue, 14 Oct 2008)
New Revision: 6944
Added:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
Modified:
core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
Log:
Copied: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java (from rev 6936, core/branches/flat/src/main/java/org/jboss/cache/remoting/jgroups/ChannelMessageListener.java)
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,407 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.jboss.cache.Fqn;
+import org.jboss.starobrno.io.ExposedByteArrayOutputStream;
+import org.jboss.starobrno.statetransfer.DefaultStateTransferManager;
+import org.jboss.starobrno.statetransfer.StateTransferManager;
+import org.jboss.starobrno.CacheException;
+import org.jboss.starobrno.config.Configuration;
+import org.jboss.starobrno.factories.annotations.Inject;
+import org.jboss.starobrno.factories.annotations.NonVolatile;
+import org.jboss.util.stream.MarshalledValueInputStream;
+import org.jboss.util.stream.MarshalledValueOutputStream;
+import org.jgroups.ExtendedMessageListener;
+import org.jgroups.Message;
+import org.jgroups.util.Util;
+
+import java.io.ByteArrayInputStream;
+import java.io.InputStream;
+import java.io.OutputStream;
+
+/**
+ * JGroups MessageListener
+ *
+ * @author Manik Surtani (<a href="mailto:manik at jboss.org">manik at jboss.org</a>)
+ * @since 2.1.0
+ */
+ at NonVolatile
+public class ChannelMessageListener implements ExtendedMessageListener
+{
+ /**
+ * Reference to an exception that was raised during
+ * state installation on this node.
+ */
+ protected volatile Exception setStateException;
+ private final Object stateLock = new Object();
+ private static final Log log = LogFactory.getLog(ChannelMessageListener.class);
+ private static final boolean trace = log.isTraceEnabled();
+ private StateTransferManager stateTransferManager;
+ private Configuration configuration;
+ /**
+ * True if state was initialized during start-up.
+ */
+ private volatile boolean isStateSet = false;
+
+
+ @Inject
+ private void injectDependencies(StateTransferManager stateTransferManager, Configuration configuration)
+ {
+ this.stateTransferManager = stateTransferManager;
+ this.configuration = configuration;
+ }
+
+ public boolean isStateSet()
+ {
+ return isStateSet;
+ }
+
+ public void setStateSet(boolean stateSet)
+ {
+ isStateSet = stateSet;
+ }
+
+ public void waitForState() throws Exception
+ {
+ synchronized (stateLock)
+ {
+ while (!isStateSet)
+ {
+ if (setStateException != null)
+ {
+ throw setStateException;
+ }
+
+ try
+ {
+ stateLock.wait();
+ }
+ catch (InterruptedException iex)
+ {
+ }
+ }
+ }
+ }
+
+ protected void stateReceivedSuccess()
+ {
+ isStateSet = true;
+ setStateException = null;
+ }
+
+ protected void stateReceivingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("failed setting state", t);
+ }
+ if (t instanceof Exception)
+ {
+ setStateException = (Exception) t;
+ }
+ else
+ {
+ setStateException = new Exception(t);
+ }
+ }
+
+ protected void stateProducingFailed(Throwable t)
+ {
+ if (t instanceof CacheException)
+ {
+ log.debug(t);
+ }
+ else
+ {
+ log.error("Caught " + t.getClass().getName()
+ + " while responding to state transfer request", t);
+ }
+ }
+
+ /**
+ * Callback, does nothing.
+ */
+ public void receive(Message msg)
+ {
+ }
+
+ public byte[] getState()
+ {
+ MarshalledValueOutputStream out = null;
+ byte[] result;
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void setState(byte[] new_state)
+ {
+ if (new_state == null)
+ {
+ log.debug("transferred state is null (may be first member in cluster)");
+ return;
+ }
+ ByteArrayInputStream bais = new ByteArrayInputStream(new_state);
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(bais);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public byte[] getState(String state_id)
+ {
+ if (trace) log.trace("Getting state for state id " + state_id);
+ MarshalledValueOutputStream out = null;
+ String sourceRoot = state_id;
+ byte[] result;
+
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+
+ ExposedByteArrayOutputStream baos = new ExposedByteArrayOutputStream(16 * 1024);
+ try
+ {
+ out = new MarshalledValueOutputStream(baos);
+
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot),
+ configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ result = baos.getRawBuffer();
+ Util.close(out);
+ }
+ return result;
+ }
+
+ public void getState(OutputStream ostream)
+ {
+ MarshalledValueOutputStream out = null;
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.ROOT, configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void getState(String state_id, OutputStream ostream)
+ {
+ if (trace) log.trace("Getting state for state id " + state_id);
+ String sourceRoot = state_id;
+ MarshalledValueOutputStream out = null;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ sourceRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[0];
+ }
+ try
+ {
+ out = new MarshalledValueOutputStream(ostream);
+ stateTransferManager.getState(out, Fqn.fromString(sourceRoot), configuration.getStateRetrievalTimeout(), true, true);
+ }
+ catch (Throwable t)
+ {
+ stateProducingFailed(t);
+ }
+ finally
+ {
+ Util.close(out);
+ }
+ }
+
+ public void setState(InputStream istream)
+ {
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster)");
+ return;
+ }
+ MarshalledValueInputStream in = null;
+ try
+ {
+ in = new MarshalledValueInputStream(istream);
+ stateTransferManager.setState(in, Fqn.ROOT);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String state_id, byte[] state)
+ {
+ if (trace) log.trace("Receiving state for " + state_id);
+ if (state == null)
+ {
+ log.debug("partial transferred state is null");
+ return;
+ }
+
+ MarshalledValueInputStream in = null;
+ String targetRoot = state_id;
+ boolean hasDifferentSourceAndIntegrationRoots = state_id.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = state_id.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ try
+ {
+ log.debug("Setting received partial state for subroot " + state_id);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ ByteArrayInputStream bais = new ByteArrayInputStream(state);
+ in = new MarshalledValueInputStream(bais);
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+
+ public void setState(String stateId, InputStream istream)
+ {
+ if (trace) log.trace("Receiving state for " + stateId);
+ String targetRoot = stateId;
+ MarshalledValueInputStream in = null;
+ boolean hasDifferentSourceAndIntegrationRoots = stateId.indexOf(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER) > 0;
+ if (hasDifferentSourceAndIntegrationRoots)
+ {
+ targetRoot = stateId.split(DefaultStateTransferManager.PARTIAL_STATE_DELIMITER)[1];
+ }
+ if (istream == null)
+ {
+ log.debug("stream is null (may be first member in cluster). State is not set");
+ return;
+ }
+
+ try
+ {
+ log.debug("Setting received partial state for subroot " + stateId);
+ in = new MarshalledValueInputStream(istream);
+ Fqn subroot = Fqn.fromString(targetRoot);
+// Region region = regionManager.getRegion(subroot, false);
+// ClassLoader cl = null;
+// if (region != null)
+// {
+// // If a classloader is registered for the node's region, use it
+// cl = region.getClassLoader();
+// }
+ //getStateTransferManager().setState(in, subroot, cl);
+ stateTransferManager.setState(in, subroot);
+ stateReceivedSuccess();
+ }
+ catch (Throwable t)
+ {
+ if (log.isTraceEnabled()) log.trace("Unknown error while integrating state", t);
+ stateReceivingFailed(t);
+ }
+ finally
+ {
+ Util.close(in);
+ synchronized (stateLock)
+ {
+ // Notify wait that state has been set.
+ stateLock.notifyAll();
+ }
+ }
+ }
+}
\ No newline at end of file
Property changes on: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/ChannelMessageListener.java
___________________________________________________________________
Name: svn:mergeinfo
+
Modified: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-14 17:47:27 UTC (rev 6943)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/RPCManagerImpl.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -23,21 +23,10 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.jboss.cache.Fqn;
-import org.jboss.cache.NodeSPI;
-import org.jboss.cache.SuspectException;
-import org.jboss.cache.jmx.annotations.MBean;
-import org.jboss.cache.jmx.annotations.ManagedAttribute;
-import org.jboss.cache.jmx.annotations.ManagedOperation;
-import org.jboss.cache.lock.TimeoutException;
-import org.jboss.cache.marshall.CommandAwareRpcDispatcher;
-import org.jboss.cache.marshall.Marshaller;
-import org.jboss.cache.remoting.jgroups.ChannelMessageListener;
-import org.jboss.cache.statetransfer.DefaultStateTransferManager;
-import org.jboss.cache.util.concurrent.ReclosableLatch;
-import org.jboss.cache.util.reflect.ReflectionUtil;
import org.jboss.starobrno.CacheException;
import org.jboss.starobrno.CacheSPI;
+import org.jboss.starobrno.RPCManager;
+import org.jboss.starobrno.context.InvocationContext;
import org.jboss.starobrno.commands.ReplicableCommand;
import org.jboss.starobrno.config.Configuration;
import org.jboss.starobrno.config.RuntimeConfig;
@@ -47,18 +36,19 @@
import org.jboss.starobrno.factories.annotations.Stop;
import org.jboss.starobrno.interceptors.InterceptorChain;
import org.jboss.starobrno.invocation.InvocationContextContainer;
+import org.jboss.starobrno.jmx.annotations.MBean;
+import org.jboss.starobrno.jmx.annotations.ManagedAttribute;
+import org.jboss.starobrno.jmx.annotations.ManagedOperation;
import org.jboss.starobrno.lock.LockManager;
+import org.jboss.starobrno.lock.TimeoutException;
+import org.jboss.starobrno.marshall.CommandAwareRpcDispatcher;
+import org.jboss.starobrno.marshall.ExtendedMarshaller;
import org.jboss.starobrno.notifications.Notifier;
-import org.jboss.starobrno.transaction.GlobalTransaction;
+import org.jboss.starobrno.remoting.ChannelMessageListener;
import org.jboss.starobrno.transaction.TransactionTable;
-import org.jgroups.Address;
-import org.jgroups.Channel;
-import org.jgroups.ChannelException;
-import org.jgroups.ChannelFactory;
-import org.jgroups.ExtendedMembershipListener;
-import org.jgroups.JChannel;
-import org.jgroups.StateTransferException;
-import org.jgroups.View;
+import org.jboss.starobrno.util.ReflectionUtil;
+import org.jboss.cache.util.concurrent.ReclosableLatch;
+import org.jgroups.*;
import org.jgroups.blocks.GroupRequest;
import org.jgroups.blocks.RspFilter;
import org.jgroups.protocols.TP;
@@ -70,10 +60,7 @@
import java.text.NumberFormat;
import java.util.ArrayList;
import java.util.Collections;
-import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Set;
import java.util.Vector;
import java.util.concurrent.TimeUnit;
@@ -115,19 +102,18 @@
private CacheSPI spi;
private InvocationContextContainer invocationContextContainer;
private final boolean trace = log.isTraceEnabled();
- private Marshaller marshaller;
+ private ExtendedMarshaller extendedMarshaller;
private TransactionManager txManager;
private TransactionTable txTable;
private InterceptorChain interceptorChain;
- private boolean isUsingBuddyReplication;
private boolean isInLocalMode;
private ComponentRegistry componentRegistry;
private LockManager lockManager;
@Inject
public void setupDependencies(ChannelMessageListener messageListener, Configuration configuration, Notifier notifier,
- Marshaller marshaller, TransactionTable txTable,
+ ExtendedMarshaller extendedMarshaller, TransactionTable txTable,
TransactionManager txManager, InvocationContextContainer container, InterceptorChain interceptorChain,
ComponentRegistry componentRegistry, LockManager lockManager)
{
@@ -136,7 +122,7 @@
this.notifier = notifier;
// TODO: Inject cacheSPI when we are ready
// this.spi = spi;
- this.marshaller = marshaller;
+ this.extendedMarshaller = extendedMarshaller;
this.txManager = txManager;
this.txTable = txTable;
this.invocationContextContainer = container;
@@ -150,67 +136,27 @@
@Start(priority = 15)
public void start()
{
- switch (configuration.getCacheMode())
+ if (configuration.getCacheMode().equals(Configuration.CacheMode.LOCAL))
{
- case LOCAL:
- log.debug("cache mode is local, will not create the channel");
- isInLocalMode = true;
- isUsingBuddyReplication = false;
- break;
- case REPL_SYNC:
- case REPL_ASYNC:
- case INVALIDATION_ASYNC:
- case INVALIDATION_SYNC:
- isInLocalMode = false;
- isUsingBuddyReplication = configuration.getBuddyReplicationConfig() != null && configuration.getBuddyReplicationConfig().isEnabled();
- if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
+ log.debug("cache mode is local, will not create the channel");
+ isInLocalMode = true;
+ return;
+ }
+ isInLocalMode = false;
+ if (log.isDebugEnabled()) log.debug("Cache mode is " + configuration.getCacheMode());
- boolean fetchState = shouldFetchStateOnStartup();
- initialiseChannelAndRpcDispatcher(fetchState);
+ initialiseChannelAndRpcDispatcher();
- if (fetchState)
- {
- try
- {
- long start = System.currentTimeMillis();
- // connect and state transfer
- channel.connect(configuration.getClusterName(), null, null, configuration.getStateRetrievalTimeout());
- //if I am not the only and the first member than wait for a state to arrive
- if (getMembers().size() > 1) messageListener.waitForState();
-
- if (log.isDebugEnabled())
- log.debug("connected, state was retrieved successfully (in " + (System.currentTimeMillis() - start) + " milliseconds)");
- }
- catch (StateTransferException ste)
- {
- // make sure we disconnect from the channel before we throw this exception!
- // JBCACHE-761
- disconnect();
- throw new CacheException("Unable to fetch state on startup", ste);
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups channel", e);
- }
- catch (Exception ex)
- {
- throw new CacheException("Unable to fetch state on startup", ex);
- }
- }
- else
- {
- //otherwise just connect
- try
- {
- channel.connect(configuration.getClusterName());
- }
- catch (ChannelException e)
- {
- throw new CacheException("Unable to connect to JGroups channel", e);
- }
- }
- if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
+ //otherwise just connect
+ try
+ {
+ channel.connect(configuration.getClusterName());
+ }
+ catch (ChannelException e)
+ {
+ throw new CacheException("Unable to connect to JGroups channel", e);
}
+ if (log.isInfoEnabled()) log.info("Cache local address is " + getLocalAddress());
}
public void disconnect()
@@ -250,17 +196,24 @@
rpcDispatcher = null;
}
- /**
- * @return true if we need to fetch state on startup. I.e., initiate a state transfer.
- */
- private boolean shouldFetchStateOnStartup()
+ @SuppressWarnings("deprecation")
+ private void initialiseChannelAndRpcDispatcher() throws CacheException
{
- boolean loaderFetch = configuration.getCacheLoaderConfig() != null && configuration.getCacheLoaderConfig().isFetchPersistentState();
- return !configuration.isInactiveOnStartup() && !isUsingBuddyReplication && (configuration.isFetchInMemoryState() || loaderFetch);
+ buildChannel();
+ // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
+ // remote instances will be received by self.
+ channel.setOpt(Channel.LOCAL, false);
+ channel.setOpt(Channel.AUTO_RECONNECT, true);
+ channel.setOpt(Channel.AUTO_GETSTATE, false);
+ channel.setOpt(Channel.BLOCK, true);
+ rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
+ invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
+ checkAppropriateConfig();
+ rpcDispatcher.setRequestMarshaller(extendedMarshaller);
+ rpcDispatcher.setResponseMarshaller(extendedMarshaller);
}
- @SuppressWarnings("deprecation")
- private void initialiseChannelAndRpcDispatcher(boolean fetchState) throws CacheException
+ private void buildChannel()
{
channel = configuration.getRuntimeConfig().getChannel();
if (channel == null)
@@ -301,29 +254,6 @@
configuration.getRuntimeConfig().setChannel(channel);
}
-
- // Channel.LOCAL *must* be set to false so we don't see our own messages - otherwise invalidations targeted at
- // remote instances will be received by self.
- channel.setOpt(Channel.LOCAL, false);
- channel.setOpt(Channel.AUTO_RECONNECT, true);
- channel.setOpt(Channel.AUTO_GETSTATE, fetchState);
- channel.setOpt(Channel.BLOCK, true);
- // todo fix me
- /*
- if (configuration.isUseRegionBasedMarshalling())
- {
- rpcDispatcher = new InactiveRegionAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
- spi, invocationContextContainer, interceptorChain, componentRegistry);
- }
- else
- {
- rpcDispatcher = new CommandAwareRpcDispatcher(channel, messageListener, new MembershipListenerAdaptor(),
- invocationContextContainer, invocationContextContainer, interceptorChain, componentRegistry);
- }
- */
- checkAppropriateConfig();
- rpcDispatcher.setRequestMarshaller(marshaller);
- rpcDispatcher.setResponseMarshaller(marshaller);
}
public Channel getChannel()
@@ -357,37 +287,37 @@
@Deprecated
- private void removeLocksForDeadMembers(NodeSPI node, List deadMembers)
+ private void removeLocksForDeadMembers(List deadMembers)
{
- Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
- Object owner = lockManager.getOwner(node);
-
- // todo fix me
- /*
- if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
-
-
- for (Object readOwner : lockManager.getReadOwners(node))
- {
- if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
- }
- */
-
- for (GlobalTransaction deadOwner : deadOwners)
- {
- boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
- // TODO: Fix me!!!
+// Set<GlobalTransaction> deadOwners = new HashSet<GlobalTransaction>();
+// Object owner = lockManager.getOwner(node);
+//
+// todo fix me
+// /*
+// if (isLockOwnerDead(owner, deadMembers)) deadOwners.add((GlobalTransaction) owner);
+//
+//
+// for (Object readOwner : lockManager.getReadOwners(node))
+// {
+// if (isLockOwnerDead(readOwner, deadMembers)) deadOwners.add((GlobalTransaction) readOwner);
+// }
+// */
+//
+// for (GlobalTransaction deadOwner : deadOwners)
+// {
+// boolean localTx = deadOwner.getAddress().equals(getLocalAddress());
+// TODO: Fix me!!!
// boolean broken = LockUtil.breakTransactionLock(node.getFqn(), lockManager, deadOwner, localTx, txTable, txManager);
- boolean broken = true;
-
- if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
- }
-
- // Recursively unlock children
- for (Object child : node.getChildrenDirect())
- {
- removeLocksForDeadMembers((NodeSPI) child, deadMembers);
- }
+// boolean broken = true;
+//
+// if (broken && trace) log.trace("Broke lock for node " + node.getFqn() + " held by " + deadOwner);
+// }
+//
+// Recursively unlock children
+// for (Object child : node.getChildrenDirect())
+// {
+// removeLocksForDeadMembers((NodeSPI) child, deadMembers);
+// }
}
@@ -463,7 +393,7 @@
}
useOutOfBandMessage = false;
// todo fix me!!
- RspList rsps = null;//rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, isUsingBuddyReplication, useOutOfBandMessage, responseFilter);
+ RspList rsps = rpcDispatcher.invokeRemoteCommands(recipients, command, modeToUse, timeout, useOutOfBandMessage, responseFilter);
if (mode == GroupRequest.GET_NONE) return Collections.emptyList();// async case
if (trace)
log.trace("(" + getLocalAddress() + "): responses for method " + command.getClass().getSimpleName() + ":\n" + rsps);
@@ -512,91 +442,6 @@
}
}
- // ------------ START: Partial state transfer methods ------------
-
- public void fetchPartialState(List<Address> sources, Fqn sourceTarget, Fqn integrationTarget) throws Exception
- {
- String encodedStateId = sourceTarget + DefaultStateTransferManager.PARTIAL_STATE_DELIMITER + integrationTarget;
- fetchPartialState(sources, encodedStateId);
- }
-
- public void fetchPartialState(List<Address> sources, Fqn subtree) throws Exception
- {
- if (subtree == null)
- {
- throw new IllegalArgumentException("Cannot fetch partial state. Null subtree.");
- }
- fetchPartialState(sources, subtree.toString());
- }
-
- private void fetchPartialState(List<Address> sources, String stateId) throws Exception
- {
- if (sources == null || sources.isEmpty() || stateId == null)
- {
- // should this really be throwing an exception? Are there valid use cases where partial state may not be available? - Manik
- // Yes -- cache is configured LOCAL but app doesn't know it -- Brian
- //throw new IllegalArgumentException("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
- if (log.isWarnEnabled())
- log.warn("Cannot fetch partial state, targets are " + sources + " and stateId is " + stateId);
- return;
- }
-
- List<Address> targets = new LinkedList<Address>(sources);
-
- //skip *this* node as a target
- targets.remove(getLocalAddress());
-
- if (targets.isEmpty())
- {
- // Definitely no exception here -- this happens every time the 1st node in the
- // cluster activates a region!! -- Brian
- if (log.isDebugEnabled()) log.debug("Cannot fetch partial state. There are no target members specified");
- return;
- }
-
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from members " + targets);
- boolean successfulTransfer = false;
- for (Address target : targets)
- {
- try
- {
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target);
- messageListener.setStateSet(false);
- successfulTransfer = channel.getState(target, stateId, configuration.getStateRetrievalTimeout());
- if (successfulTransfer)
- {
- try
- {
- messageListener.waitForState();
- }
- catch (Exception transferFailed)
- {
- if (log.isTraceEnabled()) log.trace("Error while fetching state", transferFailed);
- successfulTransfer = false;
- }
- }
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " fetching partial state " + stateId + " from member " + target + (successfulTransfer ? " successful" : " failed"));
- if (successfulTransfer) break;
- }
- catch (IllegalStateException ise)
- {
- // thrown by the JGroups channel if state retrieval fails.
- if (log.isInfoEnabled())
- log.info("Channel problems fetching state. Continuing on to next provider. ", ise);
- }
- }
-
- if (!successfulTransfer)
- {
- if (log.isDebugEnabled())
- log.debug("Node " + getLocalAddress() + " could not fetch partial state " + stateId + " from any member " + targets);
- }
-
- }
-
// ------------ END: Partial state transfer methods ------------
// ------------ START: Informational methods ------------
@@ -671,12 +516,12 @@
removed.removeAll(newMembers);
spi.getInvocationContext().getOptionOverrides().setSkipCacheStatusCheck(true);
// todo fix me
- NodeSPI root = null; // spi.getRoot();
- if (root != null)
- {
- // todo fix me
- //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
- }
+// NodeSPI root = null; // spi.getRoot();
+// if (root != null)
+// {
+ // todo fix me
+ //removeLocksForDeadMembers(root.getDelegationTarget(), removed);
+// }
}
members = new ArrayList<Address>(newMembers); // defensive copy.
@@ -690,10 +535,8 @@
// now notify listeners - *after* updating the coordinator. - JBCACHE-662
if (needNotification && notifier != null)
{
- // TODO: Fix me when we have repl working
- throw new UnsupportedOperationException("Fix me!");
-// InvocationContext ctx = spi.getInvocationContext();
-// notifier.notifyViewChange(newView, ctx);
+ InvocationContext ctx = invocationContextContainer.get();
+ notifier.notifyViewChange(newView, ctx);
}
// Wake up any threads that are waiting to know about who the coordinator is
Added: core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java
===================================================================
--- core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java (rev 0)
+++ core/branches/flat/src/main/java/org/jboss/starobrno/remoting/SuspectException.java 2008-10-14 17:49:09 UTC (rev 6944)
@@ -0,0 +1,51 @@
+/*
+ * JBoss, Home of Professional Open Source.
+ * Copyright 2000 - 2008, Red Hat Middleware LLC, and individual contributors
+ * as indicated by the @author tags. See the copyright.txt file in the
+ * distribution for a full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+package org.jboss.starobrno.remoting;
+
+import org.jboss.starobrno.CacheException;
+
+/**
+ * Thrown when a member is suspected during remote method invocation
+ *
+ * @author Bela Ban
+ * @version $Id: SuspectException.java 6886 2008-10-08 16:29:32Z manik.surtani at jboss.com $
+ */
+public class SuspectException extends CacheException
+{
+
+ private static final long serialVersionUID = -2965599037371850141L;
+
+ public SuspectException()
+ {
+ super();
+ }
+
+ public SuspectException(String msg)
+ {
+ super(msg);
+ }
+
+ public SuspectException(String msg, Throwable cause)
+ {
+ super(msg, cause);
+ }
+}
More information about the jbosscache-commits
mailing list