[jboss-cvs] JBoss Messaging SVN: r1911 - in trunk: src/main/org/jboss/jms/client/state src/main/org/jboss/jms/server/destination src/main/org/jboss/jms/server/endpoint src/main/org/jboss/messaging/core/plugin/contract src/main/org/jboss/messaging/core/plugin/postoffice tests/src/org/jboss/test/messaging/core/plugin/postoffice
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sat Jan 6 01:35:14 EST 2007
Author: ovidiu.feodorov at jboss.com
Date: 2007-01-06 01:35:08 -0500 (Sat, 06 Jan 2007)
New Revision: 1911
Modified:
trunk/src/main/org/jboss/jms/client/state/SessionState.java
trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
trunk/src/main/org/jboss/jms/server/destination/TopicService.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
Log:
Failed-over queues are preferred when creating consumers/browsers
over a failed-over session.
Modified: trunk/src/main/org/jboss/jms/client/state/SessionState.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/client/state/SessionState.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -136,7 +136,6 @@
callbackHandlers = new HashMap();
}
-
// HierarchicalState implementation -------------------------------------------------------------
public DelegateSupport getDelegate()
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedQueue.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -79,7 +79,7 @@
{
JMSCondition queueCond = new JMSCondition(true, name);
- Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+ Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
if (binding == null)
{
@@ -99,7 +99,7 @@
{
JMSCondition queueCond = new JMSCondition(true, name);
- Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+ Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
if (binding == null)
{
@@ -124,7 +124,7 @@
JMSCondition queueCond = new JMSCondition(true, name);
- Binding binding = (Binding)postOffice.listBindingsForCondition(queueCond).iterator().next();
+ Binding binding = (Binding)postOffice.getBindingForCondition(queueCond).iterator().next();
if (binding == null)
{
Modified: trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/ManagedTopic.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -60,7 +60,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
//XXX How to lock down all subscriptions?
Iterator iter = subs.iterator();
@@ -76,7 +76,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
return subs.size();
}
@@ -85,7 +85,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
Iterator iter = subs.iterator();
@@ -108,7 +108,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
return getSubscriptionsAsText(subs, true) + getSubscriptionsAsText(subs, false);
}
@@ -117,7 +117,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
return getSubscriptionsAsText(subs, durable);
}
@@ -127,7 +127,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
return getMessagesFromDurableSub(subs, subName, clientID, trimSelector(selector));
}
@@ -137,7 +137,7 @@
{
JMSCondition topicCond = new JMSCondition(false, name);
- Collection subs = postOffice.listBindingsForCondition(topicCond);
+ Collection subs = postOffice.getBindingForCondition(topicCond);
return getMessagesFromNonDurableSub(subs, channelID, trimSelector(selector));
}
Modified: trunk/src/main/org/jboss/jms/server/destination/TopicService.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/destination/TopicService.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -73,7 +73,7 @@
JMSCondition topicCond = new JMSCondition(false, destination.getName());
// We deploy any queues corresponding to pre-existing durable subscriptions
- Collection bindings = postOffice.listBindingsForCondition(topicCond);
+ Collection bindings = postOffice.getBindingForCondition(topicCond);
Iterator iter = bindings.iterator();
while (iter.hasNext())
{
@@ -116,7 +116,7 @@
JMSCondition topicCond = new JMSCondition(false, destination.getName());
- Collection bindings = postOffice.listBindingsForCondition(topicCond);
+ Collection bindings = postOffice.getBindingForCondition(topicCond);
Iterator iter = bindings.iterator();
while (iter.hasNext())
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConnectionEndpoint.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -632,6 +632,24 @@
}
// Protected ------------------------------------------------------------------------------------
+
+ /**
+ * Give access to children enpoints to the failed node ID, in case this is a failover connection.
+ * Return null if the connection is regular (not failover).
+ */
+ Integer getFailedNodeID()
+ {
+ return failedNodeID;
+ }
+
+ /**
+ * Tell children enpoints (and anybody from this package, for that matter) whether this
+ * connection is a regular or failover connection.
+ */
+ boolean isFailoverConnection()
+ {
+ return failedNodeID != null;
+ }
// Private --------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -181,7 +181,7 @@
// SessionDelegate implementation ---------------------------------------------------------------
public ConsumerDelegate createConsumerDelegate(JBossDestination jmsDestination,
- String selectorString,
+ String selector,
boolean noLocal,
String subscriptionName,
boolean isCC,
@@ -189,19 +189,18 @@
{
try
{
- if (failoverChannelID == null)
+ if (!connectionEndpoint.isFailoverConnection())
{
// regular consumer
- return createConsumerDelegateInternal(jmsDestination, selectorString,
+ return createConsumerDelegateInternal(jmsDestination, selector,
noLocal, subscriptionName);
}
- else
- {
- // failover consumer
- return createFailoverConsumerDelegateInternal(jmsDestination, selectorString,
- noLocal, subscriptionName,
- failoverChannelID);
- }
+
+ // we're child of a failover connection. Favor failover channels when creating new
+ // consumers
+ return createFailoverConsumerDelegateInternal(jmsDestination, selector,
+ noLocal, subscriptionName,
+ failoverChannelID);
}
catch (Throwable t)
{
@@ -210,22 +209,20 @@
}
public BrowserDelegate createBrowserDelegate(JBossDestination jmsDestination,
- String messageSelector,
- Long failoverChannelID) throws JMSException
+ String selector, Long failoverChannelID)
+ throws JMSException
{
try
{
- if (failoverChannelID == null)
+ if (!connectionEndpoint.isFailoverConnection())
{
// regular browser
- return createBrowserDelegateInternal(jmsDestination, messageSelector);
+ return createBrowserDelegateInternal(jmsDestination, selector);
}
- else
- {
- // failover browser
- return createFailoverBrowserDelegateInternal(jmsDestination, messageSelector,
- failoverChannelID);
- }
+
+ // we're child of a failover connection. Favor failover channels when creating new
+ // browsers
+ return createFailoverBrowserDelegateInternal(jmsDestination, selector, failoverChannelID);
}
catch (Throwable t)
{
@@ -567,7 +564,7 @@
else
{
//Topic
- Collection bindings = postOffice.listBindingsForCondition(new JMSCondition(false, dest.getName()));
+ Collection bindings = postOffice.getBindingForCondition(new JMSCondition(false, dest.getName()));
if (!bindings.isEmpty())
{
@@ -1019,17 +1016,23 @@
}
rec.del.acknowledge(null);
- }
+ }
+ /**
+ * @param oldChannelID - old channel ID. It may be null when creating consumer subsequently to
+ * the actual failover, so it's our responsibility to look the proper channel.
+ * @return
+ * @throws Exception
+ */
private ConsumerDelegate createFailoverConsumerDelegateInternal(JBossDestination jmsDestination,
String selectorString,
boolean noLocal,
String subscriptionName,
- Long ocid)
+ Long oldChannelID)
throws Exception
{
log.debug(this + " creating FAILOVER consumer for failed channel " +
- ocid + " for " + jmsDestination +
+ oldChannelID + " for " + jmsDestination +
(selectorString == null ? "" : ", selector '" + selectorString + "'") +
(subscriptionName == null ? "" : ", subscription '" + subscriptionName + "'") +
(noLocal ? ", noLocal" : ""));
@@ -1040,19 +1043,39 @@
throw new IllegalStateException("Cannot failover on a non-clustered post office!");
}
- long oldChannelID = ocid.longValue();
- Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+ Binding binding = null;
+ if (oldChannelID != null)
+ {
+ binding = postOffice.getBindingforChannelId(oldChannelID.longValue());
+ }
+ else
+ {
+ // locate the binding based on destination name and the failed node ID
+ Collection c = postOffice.
+ getBindingForCondition(new JMSCondition(true, jmsDestination.getName()));
+
+ for(Iterator i = c.iterator(); i.hasNext(); )
+ {
+ Binding b = (Binding)i.next();
+ if (connectionEndpoint.getFailedNodeID().equals(b.getFailedNodeID()))
+ {
+ binding = b;
+ break;
+ }
+ }
+ }
+
if (binding == null)
{
- throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+ throw new IllegalStateException("Can't find failed over " +
+ (oldChannelID != null ?
+ "channel " + oldChannelID : "queue " + jmsDestination.getName()));
}
Queue newQueue = binding.getQueue();
long newChannelID = newQueue.getChannelID();
- if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannelID); }
-
int consumerID = connectionEndpoint.getServerPeer().getNextObjectID();
int prefetchSize = connectionEndpoint.getPrefetchSize();
@@ -1369,9 +1392,9 @@
private BrowserDelegate createFailoverBrowserDelegateInternal(JBossDestination jmsDestination,
String selector,
- Long ocid) throws Throwable
+ Long oldChannelID) throws Throwable
{
- log.debug(this + " creating FAILOVER browser for failed channel " + ocid + " for " +
+ log.debug(this + " creating FAILOVER browser for failed channel " + oldChannelID + " for " +
jmsDestination + (selector == null ? "" : ", selector '" + selector + "'"));
if (postOffice.isLocal())
@@ -1379,19 +1402,38 @@
throw new IllegalStateException("Cannot failover on a non-clustered post office!");
}
- long oldChannelID = ocid.longValue();
+ Binding binding = null;
- Binding binding = ((ClusteredPostOffice)postOffice).getBindingforChannelId(oldChannelID);
+ if (oldChannelID != null)
+ {
+ binding = postOffice.getBindingforChannelId(oldChannelID.longValue());
+ }
+ else
+ {
+ // locate the binding based on destination name and the failed node ID
+ Collection c = postOffice.
+ getBindingForCondition(new JMSCondition(true, jmsDestination.getName()));
+ for(Iterator i = c.iterator(); i.hasNext(); )
+ {
+ Binding b = (Binding)i.next();
+ if (connectionEndpoint.getFailedNodeID().equals(b.getFailedNodeID()))
+ {
+ binding = b;
+ break;
+ }
+ }
+ }
+
if (binding == null)
{
- throw new IllegalStateException("Can't find failed over channel " + oldChannelID);
+ throw new IllegalStateException("Can't find failed over " +
+ (oldChannelID != null ?
+ "channel " + oldChannelID : "queue " + jmsDestination.getName()));
}
Channel newChannel = binding.getQueue();
- if (trace) { log.trace(this + " failing over from channel " + oldChannelID + " to channel " + newChannel.getChannelID()); }
-
int browserID = connectionEndpoint.getServerPeer().getNextObjectID();
ServerBrowserEndpoint ep = new ServerBrowserEndpoint(this, browserID, newChannel, selector);
Modified: trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/messaging/core/plugin/contract/PostOffice.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -32,14 +32,13 @@
*
* A PostOffice
*
- * A post office holds bindings of queues to conditions.
+ * A post office holds bindings of queues to conditions. When routing a reference, the post office
+ * routes the reference to any binding whose condition matches the condition specified in the call
+ * to route(...)
*
- * When routing a reference, the post office routes the reference to any binding whose
- * condition matches the condition specified in the call to route(...)
+ * Currently we only support conditions where the condition is an exact text match, and there is a
+ * single binding per queue.
*
- * Currently we only support conditions where the condition is an exact text match, and
- * there is a single binding per queue.
- *
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:clebert.suconic at jboss.com">Clebert Suconic</a>
* @author <a href="mailto:ovidiu at jboss.org">Ovidiu Feodorov</a>
@@ -58,17 +57,12 @@
/**
* List the bindings that match the specified condition
- * @param condition
- * @return
- * @throws Exception
*/
- Collection listBindingsForCondition(Condition condition) throws Exception;
+ Collection getBindingForCondition(Condition condition) throws Exception;
+
/**
- * Get the binding for the specified queue name
- * @param queueName
- * @return
- * @throws Exception
+ * Get the binding for the specified queue name.
*/
Binding getBindingForQueueName(String queueName) throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/src/main/org/jboss/messaging/core/plugin/postoffice/DefaultPostOffice.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -257,7 +257,7 @@
}
}
- public Collection listBindingsForCondition(Condition condition) throws Exception
+ public Collection getBindingForCondition(Condition condition) throws Exception
{
return listBindingsForConditionInternal(condition, true);
}
Modified: trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java
===================================================================
--- trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2007-01-06 06:27:32 UTC (rev 1910)
+++ trunk/tests/src/org/jboss/test/messaging/core/plugin/postoffice/DefaultPostOfficeTest.java 2007-01-06 06:35:08 UTC (rev 1911)
@@ -247,22 +247,22 @@
office.bindQueue(new SimpleCondition("condition2"), queue8);
- Collection bindings = office.listBindingsForCondition(new SimpleCondition("dummy"));
+ Collection bindings = office.getBindingForCondition(new SimpleCondition("dummy"));
assertNotNull(bindings);
assertTrue(bindings.isEmpty());
//We don't match on substrings
- bindings = office.listBindingsForCondition(new SimpleCondition("condition123"));
+ bindings = office.getBindingForCondition(new SimpleCondition("condition123"));
assertNotNull(bindings);
assertTrue(bindings.isEmpty());
//We don't currently support hierarchies
- bindings = office.listBindingsForCondition(new SimpleCondition("condition1.subcondition"));
+ bindings = office.getBindingForCondition(new SimpleCondition("condition1.subcondition"));
assertNotNull(bindings);
assertTrue(bindings.isEmpty());
//We currently just do an exact match
- bindings = office.listBindingsForCondition(new SimpleCondition("condition1"));
+ bindings = office.getBindingForCondition(new SimpleCondition("condition1"));
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -272,7 +272,7 @@
assertEquivalent((Binding)iter.next(), binding3);
assertEquivalent((Binding)iter.next(), binding4);
- bindings = office.listBindingsForCondition(new SimpleCondition("condition2"));
+ bindings = office.getBindingForCondition(new SimpleCondition("condition2"));
assertNotNull(bindings);
assertEquals(4, bindings.size());
@@ -1079,9 +1079,16 @@
protected void assertEquivalent(Binding binding1, Binding binding2)
{
assertEquals(binding1.getNodeID(), binding2.getNodeID());
- assertEquals(binding1.getQueue().getName(), binding2.getQueue().getName());
- String selector1 = binding1.getQueue().getFilter() != null ? binding1.getQueue().getFilter().getFilterString() : null;
- String selector2 = binding2.getQueue().getFilter() != null ? binding2.getQueue().getFilter().getFilterString() : null;
+ assertEquals(binding1.getQueue().getName(), binding2.getQueue().getName());
+
+ String selector1 =
+ binding1.getQueue().getFilter() != null ?
+ binding1.getQueue().getFilter().getFilterString() : null;
+
+ String selector2 =
+ binding2.getQueue().getFilter() != null ?
+ binding2.getQueue().getFilter().getFilterString() : null;
+
assertEquals(selector1, selector2);
assertEquals(binding1.getQueue().getChannelID(), binding2.getQueue().getChannelID());
assertEquals(binding1.getQueue().isRecoverable(), binding2.getQueue().isRecoverable());
More information about the jboss-cvs-commits
mailing list