[jboss-cvs] JBoss Messaging SVN: r5554 - in trunk: src/main/org/jboss/messaging/core/postoffice and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Dec 21 03:52:10 EST 2008
Author: timfox
Date: 2008-12-21 03:52:08 -0500 (Sun, 21 Dec 2008)
New Revision: 5554
Modified:
trunk/.project
trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
Log:
Some tweaks to preack
Modified: trunk/.project
===================================================================
--- trunk/.project 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/.project 2008-12-21 08:52:08 UTC (rev 5554)
@@ -1,6 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<projectDescription>
- <name>jboss-messaging</name>
+ <name>trunk</name>
<comment></comment>
<projects>
</projects>
@@ -10,8 +10,14 @@
<arguments>
</arguments>
</buildCommand>
+ <buildCommand>
+ <name>net.sourceforge.metrics.builder</name>
+ <arguments>
+ </arguments>
+ </buildCommand>
</buildSpec>
<natures>
<nature>org.eclipse.jdt.core.javanature</nature>
+ <nature>net.sourceforge.metrics.nature</nature>
</natures>
</projectDescription>
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/Bindings.java 2008-12-21 08:52:08 UTC (rev 5554)
@@ -20,13 +20,13 @@
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
*/
-
package org.jboss.messaging.core.postoffice;
import java.util.List;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.transaction.Transaction;
/**
* A Bindings
@@ -41,8 +41,10 @@
{
List<Binding> getBindings();
- List<MessageReference> route(ServerMessage message);
+ List<MessageReference> route(final ServerMessage message);
+ List<MessageReference> route(ServerMessage message, Transaction tx);
+
void addBinding(Binding binding);
void removeBinding(Binding binding);
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/BindingsImpl.java 2008-12-21 08:52:08 UTC (rev 5554)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.transaction.Transaction;
/**
* A BindingsImpl
@@ -83,7 +84,68 @@
numberExclusive.decrementAndGet();
}
}
+
+ public List<MessageReference> route(final ServerMessage message)
+ {
+ return route(message, null);
+ }
+
+ public List<MessageReference> route(final ServerMessage message, final Transaction tx)
+ {
+ if (numberExclusive.get() > 0)
+ {
+ // We need to round robin
+
+ List<MessageReference> refs = new ArrayList<MessageReference>(1);
+ Binding binding = getNext(message);
+
+ if (binding != null)
+ {
+ Queue queue = binding.getQueue();
+
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+ }
+
+ return refs;
+ }
+ else
+ {
+ // They all get the message
+
+ // TODO - this can be optimised to avoid a copy
+
+ if (!bindings.isEmpty())
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ for (Binding binding : bindings)
+ {
+ Queue queue = binding.getQueue();
+
+ Filter filter = queue.getFilter();
+
+ //Note we ignore any exclusive - this structure is concurrent so one could have been added
+ //since the initial check on number of exclusive
+ if (!binding.isExclusive() && (filter == null || filter.match(message)))
+ {
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+ }
+ }
+
+ return refs;
+ }
+ else
+ {
+ return Collections.<MessageReference>emptyList();
+ }
+ }
+ }
+
private Binding getNext(final ServerMessage message)
{
//It's not an exact round robin under concurrent access but that doesn't matter
@@ -161,60 +223,4 @@
binding = null;
}
-
- public List<MessageReference> route(final ServerMessage message)
- {
- if (numberExclusive.get() > 0)
- {
- // We need to round robin
-
- List<MessageReference> refs = new ArrayList<MessageReference>(1);
-
- Binding binding = getNext(message);
-
- if (binding != null)
- {
- Queue queue = binding.getQueue();
-
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
- }
-
- return refs;
- }
- else
- {
- // They all get the message
-
- // TODO - this can be optimised to avoid a copy
-
- if (!bindings.isEmpty())
- {
- List<MessageReference> refs = new ArrayList<MessageReference>();
-
- for (Binding binding : bindings)
- {
- Queue queue = binding.getQueue();
-
- Filter filter = queue.getFilter();
-
- //Note we ignore any exclusive - this structure is concurrent so one could have been added
- //since the initial check on number of exclusive
- if (!binding.isExclusive() && (filter == null || filter.match(message)))
- {
- MessageReference reference = message.createReference(queue);
-
- refs.add(reference);
- }
- }
-
- return refs;
- }
- else
- {
- return Collections.<MessageReference>emptyList();
- }
- }
- }
}
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2008-12-21 08:52:08 UTC (rev 5554)
@@ -333,7 +333,7 @@
if (bindings != null)
{
- references = bindings.route(message);
+ references = bindings.route(message, null);
computePaging(address, message, references);
}
@@ -360,7 +360,7 @@
if (bindings != null)
{
- references = bindings.route(message);
+ references = bindings.route(message, tx);
computePaging(address, message, references);
}
@@ -400,6 +400,12 @@
}
}
}
+
+// TODO - bindings.route does not need to return a messagereference list, instead bindings route should actually create the refs and send them to the queues
+// then we can get rid of the deliver call too
+//
+// for a transaction bindings.route
+//
if (deliver)
{
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-21 08:52:08 UTC (rev 5554)
@@ -536,9 +536,13 @@
ref.getQueue().referenceHandled();
}
-
-
+ if (preAcknowledge)
+ {
+ //With pre-ack, we ack *before* sending to the client
+ doAck(ref);
+ }
+
// TODO: get rid of the instanceof by something like message.isLargeMessage()
if (message instanceof ServerLargeMessage)
{
@@ -567,12 +571,8 @@
else
{
sendStandardMessage(ref, message);
-
- if (preAcknowledge)
- {
- doAck(ref);
- }
}
+
return HandleStatus.HANDLED;
}
finally
@@ -636,15 +636,12 @@
private final MessageReference ref;
- private boolean sentFirstMessage = false;
+ private volatile boolean sentFirstMessage = false;
- // To avoid ACK to be called twice, as sendLargeMessage could be waiting another call because of flowControl
- private boolean acked = false;
-
/** The current position on the message being processed */
- private long positionPendingLargeMessage;
+ private volatile long positionPendingLargeMessage;
- private SessionReceiveContinuationMessage readAheadChunk;
+ private volatile SessionReceiveContinuationMessage readAheadChunk;
public LargeMessageSender(final ServerLargeMessage message, final MessageReference ref)
{
@@ -673,7 +670,6 @@
if (!sentFirstMessage)
{
-
sentFirstMessage = true;
MessagingBuffer headerBuffer = new ByteBufferWrapper(ByteBuffer.allocate(pendingLargeMessage.getPropertiesEncodeSize()));
@@ -736,21 +732,6 @@
pendingLargeMessage.releaseResources();
- if (preAcknowledge && !acked)
- {
- acked = true;
- try
- {
- doAck(ref);
- }
- catch (Exception e)
- {
- // Is there anything we could do here besides logging?
- // The message was already sent, and this shouldn't happen
- log.error("Error while ACKing reference " + ref, e);
- }
- }
-
largeMessageSender = null;
return true;
Modified: trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-20 18:31:30 UTC (rev 5553)
+++ trunk/src/main/org/jboss/messaging/core/transaction/impl/TransactionImpl.java 2008-12-21 08:52:08 UTC (rev 5554)
@@ -23,7 +23,6 @@
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
-import org.jboss.messaging.core.message.impl.MessageImpl;
import org.jboss.messaging.core.paging.PageTransactionInfo;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.impl.PageTransactionInfoImpl;
@@ -43,6 +42,8 @@
*
* @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
* @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
+ *
+ * TODO - this should be refactored to use transaction operations for adding, acking paging stuff etc
*/
public class TransactionImpl implements Transaction
{
More information about the jboss-cvs-commits
mailing list