[hornetq-commits] JBoss hornetq SVN: r8069 - in trunk: src/main/org/hornetq/core/postoffice/impl and 3 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Thu Oct 8 13:09:12 EDT 2009
Author: timfox
Date: 2009-10-08 13:09:12 -0400 (Thu, 08 Oct 2009)
New Revision: 8069
Modified:
trunk/src/main/org/hornetq/core/postoffice/Binding.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
more routing refactoring
Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -15,6 +15,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -43,12 +44,11 @@
boolean isHighAcceptPriority(ServerMessage message);
- //TODO find a better way
- void willRoute(ServerMessage message);
-
boolean isExclusive();
long getID();
int getDistance();
+
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -15,10 +15,8 @@
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -28,7 +26,6 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
-import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
@@ -199,9 +196,7 @@
if (theBinding != null)
{
- theBinding.willRoute(message);
-
- theBinding.getBindable().route(message, context);
+ theBinding.route(message, context);
}
}
@@ -230,8 +225,6 @@
}
else
{
- Set<Bindable> chosen = new HashSet<Bindable>();
-
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
@@ -341,18 +334,11 @@
if (theBinding != null)
{
- theBinding.willRoute(message);
-
- chosen.add(theBinding.getBindable());
+ theBinding.route(message, context);
}
routingNamePositions.put(routingName, pos);
}
-
- for (Bindable bindable : chosen)
- {
- bindable.route(message, context);
- }
}
}
}
@@ -371,9 +357,7 @@
if (binding != null)
{
- binding.willRoute(message);
-
- binding.getBindable().route(message, context);
+ binding.route(message, context);
}
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -19,6 +19,7 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -109,8 +110,9 @@
return true;
}
- public void willRoute(final ServerMessage message)
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ divert.route(message, context);
}
public int getDistance()
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
import org.hornetq.utils.SimpleString;
@@ -144,10 +145,11 @@
return false;
}
- public void willRoute(final ServerMessage message)
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ queue.route(message, context);
}
-
+
public boolean isQueueBinding()
{
return true;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -528,16 +528,16 @@
{
SimpleString address = message.getDestination();
- byte[] duplicateIDBytes = null;
-
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
+
+ byte[] duplicateIDBytes = null;
if (duplicateID != null)
{
cache = getDuplicateIDCache(message.getDestination());
-
+
if (duplicateID instanceof SimpleString)
{
duplicateIDBytes = ((SimpleString)duplicateID).getData();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -26,6 +26,7 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.utils.SimpleString;
@@ -183,12 +184,8 @@
return false;
}
- public void willRoute(final ServerMessage message)
- {
- //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
-
- //TODO - this can be optimised
-
+ public void route(final ServerMessage message, final RoutingContext context)
+ {
byte[] ids = (byte[])message.getProperty(idsHeaderName);
if (ids == null)
@@ -208,9 +205,16 @@
buff.putLong(remoteQueueID);
- message.putBytesProperty(idsHeaderName, ids);
+ message.putBytesProperty(idsHeaderName, ids);
+
+ if (!context.getQueues().contains(this.storeAndForwardQueue))
+ {
+ //There can be many remote bindings for the same node, we only want to add the message once to
+ //the s & f queue for that node
+ context.getQueues().add(storeAndForwardQueue);
+ }
}
-
+
public synchronized void addConsumer(final SimpleString filterString) throws Exception
{
if (filterString != null)
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -11,7 +11,6 @@
* permissions and limitations under the License.
*/
-
package org.hornetq.core.transaction;
/**
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -29,6 +29,7 @@
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
@@ -1009,11 +1010,12 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
+ * @see org.hornetq.core.postoffice.Binding#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public void willRoute(final ServerMessage message)
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
}
More information about the hornetq-commits
mailing list