[jboss-cvs] JBoss Messaging SVN: r1704 - in branches/Branch_Client_Failover_Experiment/src/main/org/jboss: jms/util messaging/core/plugin messaging/core/plugin/postoffice/cluster
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 4 22:10:33 EST 2006
Author: clebert.suconic at jboss.com
Date: 2006-12-04 22:10:30 -0500 (Mon, 04 Dec 2006)
New Revision: 1704
Modified:
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
Log:
Fixing dependency of stateTransfer & membership + adding listBindings into ClusteredPostOfficeService to help debug membership data
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java 2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/jms/util/JNDIUtil.java 2006-12-05 03:10:30 UTC (rev 1704)
@@ -21,12 +21,12 @@
*/
package org.jboss.jms.util;
+import java.util.StringTokenizer;
+import javax.naming.Binding;
import javax.naming.Context;
import javax.naming.NameNotFoundException;
+import javax.naming.NamingEnumeration;
import javax.naming.NamingException;
-import javax.naming.NamingEnumeration;
-import javax.naming.Binding;
-import java.util.StringTokenizer;
/**
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -101,7 +101,19 @@
context = createContext(c, jndiName.substring(0, idx));
name = jndiName.substring(idx + 1);
}
- context.bind(name, o);
+ boolean failed=false;
+ try
+ {
+ context.rebind(name,o);
+ }
+ catch (Exception ignored)
+ {
+ failed=true;
+ }
+ if (failed)
+ {
+ context.bind(name, o);
+ }
}
// Attributes ----------------------------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/ClusteredPostOfficeService.java 2006-12-05 03:10:30 UTC (rev 1704)
@@ -23,7 +23,6 @@
import javax.management.ObjectName;
import javax.transaction.TransactionManager;
-
import org.jboss.jms.selector.SelectorFactory;
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.jms.server.ServerPeer;
@@ -201,6 +200,11 @@
{
this.messagePullPolicy = messagePullPolicy;
}
+
+ public String listBindings()
+ {
+ return postOffice.printBindingInformation();
+ }
// ServiceMBeanSupport overrides ---------------------------------
Modified: branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java
===================================================================
--- branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-04 18:13:09 UTC (rev 1703)
+++ branches/Branch_Client_Failover_Experiment/src/main/org/jboss/messaging/core/plugin/postoffice/cluster/DefaultClusteredPostOffice.java 2006-12-05 03:10:30 UTC (rev 1704)
@@ -21,6 +21,7 @@
*/
package org.jboss.messaging.core.plugin.postoffice.cluster;
+import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
@@ -39,10 +40,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
-
import javax.sql.DataSource;
import javax.transaction.TransactionManager;
-
import org.jboss.jms.server.QueuedExecutorPool;
import org.jboss.logging.Logger;
import org.jboss.messaging.core.Delivery;
@@ -75,8 +74,6 @@
import org.jgroups.blocks.RequestHandler;
import org.w3c.dom.Element;
-import EDU.oswego.cs.dl.util.concurrent.QueuedExecutor;
-
/**
*
* A DefaultClusteredPostOffice
@@ -312,7 +309,7 @@
MessageListener cml = new ControlMessageListener();
MembershipListener ml = new ControlMembershipListener();
RequestHandler rh = new PostOfficeRequestHandler();
-
+
this.controlMessageDispatcher = new MessageDispatcher(syncChannel, cml, ml, rh, true);
Receiver r = new DataReceiver();
@@ -323,7 +320,7 @@
asyncChannel.connect(groupName);
super.start();
-
+
Address syncAddress = syncChannel.getLocalAddress();
Address asyncAddress = asyncChannel.getLocalAddress();
@@ -331,7 +328,9 @@
PostOfficeAddressInfo info = new PostOfficeAddressInfo(syncAddress, asyncAddress);
put(ADDRESS_INFO_KEY, info);
-
+
+ verifyMembership(null,this.currentView);
+
statsSender.start();
started = true;
@@ -1314,7 +1313,48 @@
return holdingArea.values();
}
+
/**
+ * Verifies changes on the View deciding if a node joined or lefted the cluster
+ *
+ * */
+ private void verifyMembership(View oldView, View newView)
+ {
+ try
+ {
+ if (oldView != null)
+ {
+ for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (!newView.containsMember(address))
+ {
+ nodeLeft(address);
+ }
+ }
+ }
+
+ for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
+ {
+ Address address = (Address)i.next();
+ if (oldView == null || !oldView.containsMember(address))
+ {
+ nodeJoined(address);
+ }
+ }
+ }
+ catch (Throwable e)
+ {
+ log.error("Caught Exception in MembershipListener", e);
+ IllegalStateException e2 = new IllegalStateException(e.getMessage());
+ e2.setStackTrace(e.getStackTrace());
+ throw e2;
+ }
+ }
+
+
+
+ /**
* This method fails over all the queues from node <nodeId> onto this node
* It is triggered when a JGroups view change occurs due to a member leaving and
* it's determined the member didn't leave cleanly
@@ -1567,17 +1607,40 @@
out.println("</table>");
-// out.println("Clustered Information");
-//
-// NodeAddressInfo info[] = getClusterNodes();
-//
-// out.println("<table border=1><tr><td>Node</td><td>AsyncChannel</td><td>SyncChannel</td></tr>");
-// for (int i = 0; i < info.length; i++)
-// {
-// out.println("<tr><td>" + info[i].getNodeId() + "</td><td>" + info[i].getAsyncChannelAddress() + "</td><td>" + info[i].getSyncChannelAddress() + "</td>");
-// }
-// out.println("</table>");
+ out.println("Replicator's Information");
+ out.println("<table border=1><tr><td>Node</td><td>Key</td><td>Value</td></tr>");
+
+ for (Iterator iter = replicatedData.entrySet().iterator(); iter.hasNext();)
+ {
+ Map.Entry entry = (Map.Entry) iter.next();
+ Map subMap = (Map)entry.getValue();
+ boolean firstTime=true;
+ for (Iterator subIterator = subMap.entrySet().iterator(); subIterator.hasNext();)
+ {
+ Map.Entry subValue = (Map.Entry) subIterator.next();
+ out.println("<tr><td>" + entry.getKey() + "</td>");
+ out.println("<td>" + subValue.getKey() + "</td><td>" + subValue.getValue() + "</td></tr>" );
+ }
+
+ }
+
+ out.println("</table>");
+
+
+ out.println("View Information");
+
+ out.println("<table border=1><tr><td>Members</td></tr>");
+
+
+ for (Iterator iterMembers = currentView.getMembers().iterator(); iterMembers.hasNext();)
+ {
+ Address address = (Address) iterMembers.next();
+ out.println("<tr><td>" + address + "</td></tr>");
+ }
+
+ out.println("</table>");
+
return buffer.toString();
}
@@ -1693,7 +1756,7 @@
if (trace) { log.trace(this + " loading bindings"); }
boolean isState = syncChannel.getState(null, stateTimeout);
-
+
if (!isState)
{
//Must be first member in group or non clustered- we load the state ourself from the database
@@ -1707,20 +1770,20 @@
//The state will be set in due course via the MessageListener - we must wait until this happens
if (trace) { log.trace(this.nodeId + " Not first member of group- so waiting for state to arrive...."); }
-
- synchronized (setStateLock)
+
+ synchronized (setStateLock)
+ {
+ //TODO we should implement a timeout on this
+ while (!stateSet)
{
- //TODO we should implement a timeout on this
- while (!stateSet)
- {
- setStateLock.wait();
- }
+ setStateLock.wait();
}
-
+ }
+
if (trace) { log.trace(this.nodeId + " Received state"); }
}
}
-
+
protected Binding createBinding(int nodeId, String condition, String queueName, long channelId, Filter filter, boolean durable, boolean failed)
{
Queue queue;
@@ -2193,6 +2256,8 @@
}
try
{
+ // TODO: Make it trace
+ log.info("getState:" + DefaultClusteredPostOffice.this.getOfficeName());
return getStateAsBytes();
}
catch (Exception e)
@@ -2226,7 +2291,9 @@
}
try
{
- processStateBytes(bytes);
+ // TODO: Make it trace
+ log.info("setState:" + DefaultClusteredPostOffice.this.getOfficeName());
+ processStateBytes(bytes);
}
catch (Exception e)
{
@@ -2266,52 +2333,36 @@
public void viewAccepted(View newView)
{
- if (trace) { log.trace(DefaultClusteredPostOffice.this + " got new view: " + newView); }
+ //if (trace) { log.trace(DefaultClusteredPostOffice.this + " got new view: " + newView
+ // + DefaultClusteredPostOffice.this.getOfficeName()); }
+ //TODO: (by Clebert) Most JBoss Services use info on viewAccepted,
+ //TODO: can't we do the same since this is pretty useful?
+ log.info(DefaultClusteredPostOffice.this + " got new view: " + newView + " postOffice:"
+ + DefaultClusteredPostOffice.this.getOfficeName());
// JGroups will make sure this method is never called by more than one thread concurrently
View oldView = currentView;
currentView = newView;
-
- try
- {
- if (oldView != null)
- {
- for(Iterator i = oldView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (!newView.containsMember(address))
- {
- nodeLeft(address);
- }
- }
- }
-
- for(Iterator i = newView.getMembers().iterator(); i.hasNext(); )
- {
- Address address = (Address)i.next();
- if (oldView == null || !oldView.containsMember(address))
- {
- nodeJoined(address);
- }
- }
- }
- catch (Throwable e)
+
+ // Since now membership is sent over state transfer, we have to wait stateSet to be set.
+ // If this logic is not good enough, we will have to place Address in a separate data structure
+ // as it used to be (NodeInfos)
+ if (stateSet)
{
- log.error("Caught Exception in MembershipListener", e);
- IllegalStateException e2 = new IllegalStateException(e.getMessage());
- e2.setStackTrace(e.getStackTrace());
- throw e2;
+ verifyMembership(oldView, newView);
}
}
-
+
public byte[] getState()
{
//NOOP
return null;
}
}
-
+
+
+
/*
* This class is used to listen for messages on the async channel
*/
More information about the jboss-cvs-commits
mailing list