[jboss-cvs] JBoss Messaging SVN: r3013 - in trunk/src: main/org/jboss/messaging/core/impl/postoffice and 1 other directory.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Aug 15 08:08:02 EDT 2007
Author: timfox
Date: 2007-08-15 08:08:02 -0400 (Wed, 15 Aug 2007)
New Revision: 3013
Modified:
trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
Log:
Interim commit for group member cleanup
Modified: trunk/src/etc/server/default/deploy/mysql-persistence-service.xml
===================================================================
--- trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/etc/server/default/deploy/mysql-persistence-service.xml 2007-08-15 12:08:02 UTC (rev 3013)
@@ -141,51 +141,101 @@
<attribute name="ChannelPartitionName">${jboss.partition.name:DefaultPartition}-JMS</attribute>
-->
- <!-- JGroups stack configuration for the data channel - used when casting messages across the cluster -->
+ <!-- JGroups stack configuration for the data channel - used for sending data across the cluster -->
<attribute name="DataChannelConfig">
<config>
- <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
- mcast_port="45567" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
- mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="true" ucast_send_buf_size="32000" ip_ttl="32"/>
- <AUTOCONF down_thread="false" up_thread="false"/>
+ <UDP
+ mcast_addr="228.8.8.8"
+ mcast_port="45567"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ ip_ttl="2"
+ down_thread="false" up_thread="false"
+ enable_bundling="false"/>
<PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
- <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <MERGE2 max_interval="100000" down_thread="false" min_interval="20000" up_thread="false"/>
<FD_SOCK down_thread="false" up_thread="false"/>
- <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
<VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
- <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
- retransmit_timeout="100,200,600,1200,2400,4800"/>
- <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
- <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
- <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
- <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
- <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
+ <pbcast.NAKACK
+ max_xmit_size="60000"
+ use_mcast_xmit="false"
+ gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"
+ down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" use_flush="true" flush_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="false"
+ view_bundling="true"
+ view_ack_collection_timeout="5000"/>
+ <FC max_credits="2000000" down_thread="false" up_thread="false"
+ min_threshold="0.10"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
</config>
</attribute>
- <!-- JGroups stack configuration to use for the control channel - used for bind/unbind requests amongst others -->
-
+ <!-- JGroups stack configuration to use for the control channel - used for control messages -->
+
<attribute name="ControlChannelConfig">
<config>
- <UDP mcast_recv_buf_size="500000" down_thread="false" ip_mcast="true" mcast_send_buf_size="32000"
- mcast_port="45568" ucast_recv_buf_size="500000" use_incoming_packet_handler="false"
- mcast_addr="228.8.8.8" use_outgoing_packet_handler="true" loopback="true" ucast_send_buf_size="32000" ip_ttl="32"/>
- <AUTOCONF down_thread="false" up_thread="false"/>
- <PING timeout="2000" down_thread="false" num_initial_members="3" up_thread="false"/>
- <MERGE2 max_interval="10000" down_thread="false" min_interval="5000" up_thread="false"/>
+ <UDP
+ mcast_addr="228.8.8.8"
+ mcast_port="45568"
+ tos="8"
+ ucast_recv_buf_size="20000000"
+ ucast_send_buf_size="640000"
+ mcast_recv_buf_size="25000000"
+ mcast_send_buf_size="640000"
+ loopback="false"
+ discard_incompatible_packets="true"
+ max_bundle_size="64000"
+ max_bundle_timeout="30"
+ use_incoming_packet_handler="true"
+ use_outgoing_packet_handler="false"
+ ip_ttl="2"
+ down_thread="false" up_thread="false"
+ enable_bundling="false"/>
+ <PING timeout="2000"
+ down_thread="false" up_thread="false" num_initial_members="3"/>
+ <MERGE2 max_interval="100000"
+ down_thread="false" up_thread="false" min_interval="20000"/>
<FD_SOCK down_thread="false" up_thread="false"/>
- <FD timeout="20000" max_tries="3" down_thread="false" up_thread="false" shun="true"/>
+ <FD timeout="10000" max_tries="5" down_thread="false" up_thread="false" shun="true"/>
<VERIFY_SUSPECT timeout="1500" down_thread="false" up_thread="false"/>
- <pbcast.NAKACK max_xmit_size="8192" down_thread="false" use_mcast_xmit="true" gc_lag="50" up_thread="false"
- retransmit_timeout="100,200,600,1200,2400,4800"/>
- <UNICAST timeout="1200,2400,3600" down_thread="false" up_thread="false"/>
- <pbcast.STABLE stability_delay="1000" desired_avg_gossip="20000" down_thread="false" max_bytes="0" up_thread="false"/>
- <FRAG frag_size="8192" down_thread="false" up_thread="false"/>
- <VIEW_SYNC avg_send_interval="60000" down_thread="false" up_thread="false" />
- <pbcast.GMS print_local_addr="true" join_timeout="3000" down_thread="false" join_retry_timeout="2000" up_thread="false" shun="true"/>
- <pbcast.STATE_TRANSFER down_thread="false" up_thread="false"/>
- </config>
+ <pbcast.NAKACK max_xmit_size="60000"
+ use_mcast_xmit="false" gc_lag="0"
+ retransmit_timeout="300,600,1200,2400,4800"
+ down_thread="false" up_thread="false"
+ discard_delivered_msgs="true"/>
+ <UNICAST timeout="300,600,1200,2400,3600"
+ down_thread="false" up_thread="false"/>
+ <pbcast.STABLE stability_delay="1000" desired_avg_gossip="50000"
+ down_thread="false" up_thread="false"
+ max_bytes="400000"/>
+ <pbcast.GMS print_local_addr="true" join_timeout="3000" use_flush="true" flush_timeout="3000"
+ down_thread="false" up_thread="false"
+ join_retry_timeout="2000" shun="false"
+ view_bundling="true"/>
+ <FRAG2 frag_size="60000" down_thread="false" up_thread="false"/>
+ <pbcast.STATE_TRANSFER down_thread="false" up_thread="false" use_flush="true" flush_timeout="3000"/>
+ <pbcast.FLUSH down_thread="false" up_thread="false" timeout="20000" auto_flush_conf="false"/>
+ </config>
</attribute>
</mbean>
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/GroupMember.java 2007-08-15 12:08:02 UTC (rev 3013)
@@ -42,11 +42,6 @@
import org.jgroups.util.Rsp;
import org.jgroups.util.RspList;
-import EDU.oswego.cs.dl.util.concurrent.LinkedQueue;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-import EDU.oswego.cs.dl.util.concurrent.ReadWriteLock;
-import EDU.oswego.cs.dl.util.concurrent.ReentrantWriterPreferenceReadWriteLock;
-
/**
*
* This class handles the interface with JGroups
@@ -83,15 +78,18 @@
private volatile View currentView;
- private QueuedExecutor viewExecutor;
-
private Object waitLock = new Object();
+
+ private static final int STOPPED = 1;
- //Still needs to be volatile since the ReadWriteLock won't synchronize between threads
- private volatile boolean started;
+ private static final int WAITING_FOR_FIRST_VIEW = 2;
- private ReadWriteLock lock;
+ private static final int WAITING_FOR_STATE = 3;
+ private static final int STARTED = 4;
+
+ private volatile int startedState;
+
public GroupMember(String groupName, long stateTimeout, long castTimeout,
JChannelFactory jChannelFactory, RequestTarget requestTarget,
GroupListener groupListener)
@@ -107,115 +105,99 @@
this.requestTarget = requestTarget;
this.groupListener = groupListener;
-
- this.lock = new ReentrantWriterPreferenceReadWriteLock();
}
public void start() throws Exception
- {
- lock.writeLock().acquire();
+ {
+ this.controlChannel = jChannelFactory.createControlChannel();
- try
- {
- this.viewExecutor = new QueuedExecutor(new LinkedQueue());
-
- this.controlChannel = jChannelFactory.createControlChannel();
-
- this.dataChannel = jChannelFactory.createDataChannel();
-
- // We don't want to receive local messages on any of the channels
- controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
-
- MessageListener messageListener = new ControlMessageListener();
-
- MembershipListener membershipListener = new ControlMembershipListener();
-
- RequestHandler requestHandler = new ControlRequestHandler();
-
- dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
-
- Receiver dataReceiver = new DataReceiver();
-
- dataChannel.setReceiver(dataReceiver);
-
- controlChannel.connect(groupName);
-
- dataChannel.connect(groupName);
-
- //Note we're not started until getState() is successfullly called
-
- if (!getState())
- {
- if (trace) { log.trace(this + " is the first member of group"); }
- }
- else
- {
- if (trace) { log.trace(this + " is not the first member of group"); }
- }
-
- //Wait for the first view to arrive
+ this.dataChannel = jChannelFactory.createDataChannel();
+
+ this.startedState = STOPPED;
- synchronized (waitLock)
- {
- long timeRemaining = 5000; //Hardcoded for now
-
- long start = System.currentTimeMillis();
-
- while (currentView == null && timeRemaining > 0)
- {
- waitLock.wait(stateTimeout);
-
- if (currentView == null)
- {
- long waited = System.currentTimeMillis() - start;
-
- timeRemaining -= waited;
- }
- }
-
- if (currentView == null)
- {
- throw new IllegalStateException("Timed out waiting for first view to arrive");
- }
- }
-
- //Now we can be considered started
- started = true;
+ // We don't want to receive local messages on any of the channels
+ controlChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ dataChannel.setOpt(Channel.LOCAL, Boolean.FALSE);
+
+ MessageListener messageListener = new ControlMessageListener();
+
+ MembershipListener membershipListener = new ControlMembershipListener();
+
+ RequestHandler requestHandler = new ControlRequestHandler();
+
+ dispatcher = new MessageDispatcher(controlChannel, messageListener, membershipListener, requestHandler, true);
+
+ Receiver dataReceiver = new DataReceiver();
+
+ dataChannel.setReceiver(dataReceiver);
+
+ this.startedState = WAITING_FOR_FIRST_VIEW;
+
+ controlChannel.connect(groupName);
+
+ //The first thing that happens after connect is a view change arrives
+ //Then the state will arrive (if we are not the first member)
+ //Then the control messages will start arriving.
+ //We can guarantee that messages won't arrive until after the state is set because we use
+ //the FLUSH protocol on the control channel
+
+ //First wait for view
+ waitForStateChange(WAITING_FOR_STATE);
+
+ log.debug("First view arrived");
+
+ //Now wait for state if we are not the first member
+
+ if (controlChannel.getState(null, stateTimeout))
+ {
+ //We are not the first member of the group, so let's wait for state to be got and processed
+
+ waitForStateChange(STARTED);
+
+ log.debug("State arrived");
}
- finally
+ else
{
- lock.writeLock().release();
+ //We are the first member, no need to wait
+
+ startedState = STARTED;
+
+ log.debug("We are the first member of the group so no need to wait for state");
}
+
+ //Now connect the data channel.
+
+ dataChannel.connect(groupName);
}
public void stop() throws Exception
- {
- lock.writeLock().acquire();
-
+ {
try
- {
- viewExecutor.shutdownAfterProcessingCurrentTask();
-
- controlChannel.close();
-
- dataChannel.close();
-
- controlChannel = null;
-
- dataChannel = null;
-
- currentView = null;
-
- viewExecutor = null;
-
- started = false;
+ {
+ dataChannel.close();
}
- finally
+ catch (Exception e)
{
- lock.writeLock().release();
+ log.debug("Failed to close data channel", e);
}
+
+ try
+ {
+ controlChannel.close();
+ }
+ catch (Exception e)
+ {
+ log.debug("Failed to close control channel", e);
+ }
+
+ controlChannel = null;
+
+ dataChannel = null;
+
+ currentView = null;
+
+ startedState = STOPPED;
}
public Address getSyncAddress()
@@ -240,106 +222,53 @@
public void multicastControl(ClusterRequest request, boolean sync) throws Exception
{
- lock.readLock().acquire();
-
- try
- {
- if (started)
- {
- if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
-
- Message message = new Message(null, null, writeRequest(request));
+ if (startedState == STARTED)
+ {
+ if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
+
+ Message message = new Message(null, null, writeRequest(request));
- RspList rspList =
- dispatcher.castMessage(null, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);
+ RspList rspList =
+ dispatcher.castMessage(null, message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);
+
+ if (sync)
+ {
+ Iterator iter = rspList.values().iterator();
- if (sync)
- {
- Iterator iter = rspList.values().iterator();
-
- while (iter.hasNext())
- {
- Rsp rsp = (Rsp)iter.next();
-
- if (!rsp.wasReceived())
- {
- throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
- }
- }
- }
+ while (iter.hasNext())
+ {
+ Rsp rsp = (Rsp)iter.next();
+
+ if (!rsp.wasReceived())
+ {
+ throw new IllegalStateException(this + " response not received from " + rsp.getSender() + " - there may be others");
+ }
+ }
}
}
- finally
- {
- lock.readLock().release();
- }
}
- public void unicastControl(ClusterRequest request, Address address, boolean sync) throws Exception
- {
- lock.readLock().acquire();
-
- try
- {
- if (started)
- {
- if (trace) { log.trace(this + " multicasting " + request + " to control channel, sync=" + sync); }
-
- Message message = new Message(address, null, writeRequest(request));
-
- String ret = (String)dispatcher.sendMessage(message, sync ? GroupRequest.GET_ALL: GroupRequest.GET_NONE, castTimeout);
-
- if (sync && !"ok".equals(ret))
- {
- throw new IllegalStateException("Invalid response received " + ret);
- }
- }
- }
- finally
- {
- lock.readLock().release();
- }
- }
-
public void multicastData(ClusterRequest request) throws Exception
{
- lock.readLock().acquire();
-
- try
- {
- if (started)
- {
- if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
-
- byte[] bytes = writeRequest(request);
-
- dataChannel.send(new Message(null, null, bytes));
- }
+ if (startedState == STARTED)
+ {
+ if (trace) { log.trace(this + " multicasting " + request + " to data channel"); }
+
+ byte[] bytes = writeRequest(request);
+
+ dataChannel.send(new Message(null, null, bytes));
}
- finally
- {
- lock.readLock().release();
- }
}
public void unicastData(ClusterRequest request, Address address) throws Exception
{
- lock.readLock().acquire();
-
- try
- {
- if (started)
- {
- if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
-
- byte[] bytes = writeRequest(request);
-
- dataChannel.send(new Message(address, null, bytes));
- }
- }
- finally
+ if (startedState == STARTED)
{
- lock.readLock().release();
+ if (trace) { log.trace(this + " unicasting " + request + " to address " + address); }
+
+ byte[] bytes = writeRequest(request);
+
+ dataChannel.send(new Message(address, null, bytes));
}
}
@@ -352,37 +281,46 @@
{
//We are not the first member of the group, so let's wait for state to be got and processed
- synchronized (waitLock)
- {
- long timeRemaining = stateTimeout;
-
- long start = System.currentTimeMillis();
-
- while (!started && timeRemaining > 0)
- {
- waitLock.wait(stateTimeout);
-
- if (!started)
- {
- long waited = System.currentTimeMillis() - start;
-
- timeRemaining -= waited;
- }
- }
-
- if (!started)
- {
- throw new IllegalStateException("Timed out waiting for state to arrive");
- }
- }
+ waitForStateChange(STARTED);
retrievedState = true;
}
+ else
+ {
+ this.startedState = STARTED;
+ }
return retrievedState;
}
+ private void waitForStateChange(int newState) throws Exception
+ {
+ synchronized (waitLock)
+ {
+ long timeRemaining = stateTimeout;
+
+ long start = System.currentTimeMillis();
+
+ while (startedState != newState && timeRemaining > 0)
+ {
+ waitLock.wait(stateTimeout);
+
+ if (startedState != newState)
+ {
+ long waited = System.currentTimeMillis() - start;
+
+ timeRemaining -= waited;
+ }
+ }
+
+ if (startedState != newState)
+ {
+ throw new IllegalStateException("Timed out waiting for state to arrive");
+ }
+ }
+ }
+
private ClusterRequest readRequest(byte[] bytes) throws Exception
{
ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
@@ -418,34 +356,23 @@
public byte[] getState()
{
try
- {
- lock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- //Ignore if received after stopped
-
- return null;
- }
-
- if (trace) { log.trace(this + ".ControlMessageListener got state"); }
-
- byte[] state = groupListener.getState();
-
- return state;
- }
- finally
+ {
+ if (startedState != STARTED)
{
- lock.readLock().release();
+ throw new IllegalStateException("Received control message but group member is not started: " + startedState);
}
+
+ if (trace) { log.trace(this + ".ControlMessageListener got state"); }
+
+ byte[] state = groupListener.getState();
+
+ return state;
}
catch (Exception e)
{
log.error("Failed to get state", e);
- return null;
+ throw new IllegalStateException("Failed to get state");
}
}
@@ -457,6 +384,11 @@
{
synchronized (waitLock)
{
+ if (startedState != WAITING_FOR_STATE)
+ {
+ throw new IllegalStateException("Received state but started state is " + startedState);
+ }
+
try
{
groupListener.setState(bytes);
@@ -466,7 +398,7 @@
log.error("Failed to set state", e);
}
- started = true;
+ startedState = STARTED;
waitLock.notify();
}
@@ -490,19 +422,67 @@
public void viewAccepted(View newView)
{
+ log.debug(this + " got new view " + newView + ", old view is " + currentView);
+
+ if (currentView == null)
+ {
+ //The first view is arriving
+
+ if (startedState != WAITING_FOR_FIRST_VIEW)
+ {
+ throw new IllegalStateException("Got first view but started state is " + startedState);
+ }
+ }
+
+ // JGroups will make sure this method is never called by more than one thread concurrently
+
+ View oldView = currentView;
+
+ currentView = newView;
+
try
{
- // We queue up changes and execute them asynchronously.
- // This is because JGroups will not let us do stuff like send synch messages using the
- // same thread that delivered the view change and this is what we need to do in
- // failover, for example.
+ // Act on membership change, on both cases when an old member left or a new member joined
- viewExecutor.execute(new HandleViewAcceptedRunnable(newView));
+ if (oldView != null)
+ {
+ for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ // this is where the failover happens, if necessary
+ groupListener.nodeLeft(address);
+ }
+ }
+ }
+
+ for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ groupListener.nodeJoined(address);
+ }
+ }
}
- catch (InterruptedException e)
+ catch (Throwable e)
{
- log.warn("Caught InterruptedException", e);
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
}
+
+ if (startedState == WAITING_FOR_FIRST_VIEW)
+ {
+ synchronized (waitLock)
+ {
+ startedState = WAITING_FOR_STATE;
+
+ waitLock.notify();
+ }
+ }
}
public byte[] getState()
@@ -544,27 +524,16 @@
try
{
- lock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- //Ignore messages received when not started
-
- return;
- }
-
- byte[] bytes = message.getBuffer();
-
- ClusterRequest request = readRequest(bytes);
-
- request.execute(requestTarget);
- }
- finally
- {
- lock.readLock().release();
- }
+ if (startedState != STARTED)
+ {
+ throw new IllegalStateException("Received data message but member is not started " + startedState);
+ }
+
+ byte[] bytes = message.getBuffer();
+
+ ClusterRequest request = readRequest(bytes);
+
+ request.execute(requestTarget);
}
catch (Throwable e)
{
@@ -593,27 +562,16 @@
try
{
- lock.readLock().acquire();
-
- try
- {
- if (!started)
- {
- //Ignore messages received when stopped
-
- return null;
- }
-
- byte[] bytes = message.getBuffer();
-
- ClusterRequest request = readRequest(bytes);
-
- return request.execute(requestTarget);
- }
- finally
- {
- lock.readLock().release();
- }
+ if (startedState != STARTED)
+ {
+ throw new IllegalStateException("Received control message but member is not started " + startedState);
+ }
+
+ byte[] bytes = message.getBuffer();
+
+ ClusterRequest request = readRequest(bytes);
+
+ return request.execute(requestTarget);
}
catch (Throwable e)
{
@@ -625,59 +583,5 @@
}
}
- private class HandleViewAcceptedRunnable implements Runnable
- {
- private View newView;
- HandleViewAcceptedRunnable(View newView)
- {
- this.newView = newView;
- }
-
- public void run()
- {
- log.debug(this + " got new view " + newView + ", old view is " + currentView);
-
- // JGroups will make sure this method is never called by more than one thread concurrently
-
- View oldView = currentView;
-
- currentView = newView;
-
- try
- {
- // Act on membership change, on both cases when an old member left or a new member joined
-
- if (oldView != null)
- {
- for (Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- // this is where the failover happens, if necessary
- groupListener.nodeLeft(address);
- }
- }
- }
-
- for (Iterator i = newView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
- {
- groupListener.nodeJoined(address);
- }
- }
- }
- catch (Throwable e)
- {
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
- }
- }
- }
-
}
Modified: trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-15 08:43:42 UTC (rev 3012)
+++ trunk/src/main/org/jboss/messaging/core/impl/postoffice/MessagingPostOffice.java 2007-08-15 12:08:02 UTC (rev 3013)
@@ -613,7 +613,7 @@
PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
- replyAddress = info.getControlChannelAddress();
+ replyAddress = info.getDataChannelAddress();
}
ClusterRequest request = new ReplicateDeliveryMessage(thisNodeID, queueName, sessionID, messageID, deliveryID, replyAddress);
@@ -621,11 +621,11 @@
if (trace) { log.trace(this + " sending replicate delivery message " + queueName + " " + sessionID + " " + messageID); }
//TODO could be optimised too
- Address address = getFailoverNodeForControlChannelAddress();
+ Address address = getFailoverNodeDataChannelAddress();
if (address != null)
{
- groupMember.unicastControl(request, address, false);
+ groupMember.unicastData(request, address);
}
}
@@ -636,11 +636,11 @@
ClusterRequest request = new ReplicateAckMessage(thisNodeID, queueName, messageID);
- Address address = getFailoverNodeForControlChannelAddress();
+ Address address = getFailoverNodeDataChannelAddress();
if (address != null)
{
- groupMember.unicastControl(request, address, false);
+ groupMember.unicastData(request, address);
}
}
@@ -1205,7 +1205,7 @@
{
try
{
- groupMember.unicastControl(request, replyAddress, false);
+ groupMember.unicastData(request, replyAddress);
}
catch (Exception e)
{
@@ -1604,19 +1604,19 @@
PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(thisNodeID));
- Address replyAddress = info.getControlChannelAddress();
+ Address replyAddress = info.getDataChannelAddress();
ClusterRequest request = new GetReplicatedDeliveriesRequest(queue.getName(), replyAddress);
info = (PostOfficeAddressInfo)nodeIDAddressMap.get(masterNodeID);
- Address address = info.getControlChannelAddress();
+ Address address = info.getDataChannelAddress();
log.info("Sending the message to node " + masterNodeID + " with address " + address);
if (address != null)
{
- groupMember.unicastControl(request, address, false);
+ groupMember.unicastData(request, address);
}
}
}
@@ -1651,7 +1651,7 @@
return nodeID;
}
- private Address getFailoverNodeForControlChannelAddress()
+ private Address getFailoverNodeDataChannelAddress()
{
PostOfficeAddressInfo info = (PostOfficeAddressInfo)nodeIDAddressMap.get(new Integer(failoverNodeID));
@@ -1660,7 +1660,7 @@
return null;
}
- Address address = info.getControlChannelAddress();
+ Address address = info.getDataChannelAddress();
return address;
}
@@ -2763,7 +2763,7 @@
ClusterRequest request = new AckAllReplicatedDeliveriesMessage(thisNodeID);
- groupMember.unicastControl(request, info.getControlChannelAddress(), false);
+ groupMember.unicastData(request, info.getDataChannelAddress());
if (trace) { log.trace("Sent AckAllReplicatedDeliveriesMessage"); }
}
@@ -2813,7 +2813,7 @@
ClusterRequest request = new AddAllReplicatedDeliveriesMessage(thisNodeID, deliveries);
- groupMember.unicastControl(request, info.getControlChannelAddress(), false);
+ groupMember.unicastData(request, info.getDataChannelAddress());
if (trace) { log.trace("Sent AddAllReplicatedDeliveriesMessage"); }
}
@@ -3008,7 +3008,7 @@
{
ClusterRequest req = new AddAllReplicatedDeliveriesMessage(thisNodeID, dels);
- groupMember.unicastControl(req, address, false);
+ groupMember.unicastData(req, address);
}
}
More information about the jboss-cvs-commits
mailing list