[jboss-cvs] JBoss Messaging SVN: r3685 - in trunk: src/main/org/jboss/jms/client/impl and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Fri Feb 8 11:25:03 EST 2008
Author: timfox
Date: 2008-02-08 11:25:03 -0500 (Fri, 08 Feb 2008)
New Revision: 3685
Modified:
trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
Fixed test, just one to go now!
Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -332,10 +332,10 @@
if (this.defaultDestination != null && !this.defaultDestination.equals(destination))
{
- throw new UnsupportedOperationException("Where a default destination is specified " +
- "for the sender and a destination is " +
- "specified in the arguments to the send, " +
- "these destinations must be equal");
+ throw new JMSException("Where a default destination is specified " +
+ "for the sender and a destination is " +
+ "specified in the arguments to the send, " +
+ "these destinations must be equal");
}
JBossMessage jbm;
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -274,13 +274,6 @@
return id;
}
-// public void changeRate(float newRate) throws MessagingException
-// {
-// checkClosed();
-//
-// remotingConnection.send(id, new ConsumerFlowTokenMessage(newRate), true);
-// }
-
public void handleMessage(final DeliverMessage message) throws Exception
{
synchronized (mainLock)
@@ -312,8 +305,6 @@
// Add it to the buffer
Message coreMessage = message.getMessage();
- coreMessage.setDeliveryCount(message.getDeliveryCount());
-
buffer.addLast(message, coreMessage.getPriority());
if (receiverThread != null)
Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -498,6 +498,9 @@
{
try
{
+ //Need to flush any acks to server first
+ acknowledgeInternal(false);
+
SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(id, new SessionXAForgetMessage(xid));
if (response.isError())
@@ -595,7 +598,6 @@
{
try
{
-
SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(id, packet);
@@ -637,10 +639,16 @@
if (flags == XAResource.TMJOIN)
{
+ //Need to flush any acks to server first
+ acknowledgeInternal(false);
+
packet = new SessionXAJoinMessage(xid);
}
else if (flags == XAResource.TMRESUME)
{
+ //Need to flush any acks to server first
+ acknowledgeInternal(false);
+
packet = new SessionXAResumeMessage(xid);
}
else if (flags == XAResource.TMNOFLAGS)
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -111,8 +111,6 @@
this.autoDeleteQueue = autoDeleteQueue;
- log.info("Enable flow control is " + enableFlowControl);
-
this.enableFlowControl = enableFlowControl;
// adding the consumer to the queue
Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -250,40 +250,7 @@
throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
}
}
-
-// void localClose() throws Exception
-// {
-// Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
-//
-// for (ServerConsumerEndpoint consumer: consumersClone.values())
-// {
-// consumer.close();
-// }
-//
-// consumers.clear();
-//
-// Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
-//
-// for (ServerBrowserEndpoint browser: browsersClone.values())
-// {
-// browser.close();
-// }
-//
-// consumers.clear();
-//
-// browsers.clear();
-//
-// rollback();
-//
-// executor.shutdown();
-//
-// deliveries.clear();
-//
-// sp.removeSession(id);
-//
-// closed = true;
-// }
-
+
synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
{
// FIXME - we shouldn't have to pass in the packet Sender - this should be
Modified: trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.impl;
import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Message;
import org.jboss.messaging.core.MessageReference;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -67,10 +68,19 @@
public void deliver() throws Exception
{
- DeliverMessage message = new DeliverMessage(reference.getMessage(),
- deliveryID,
- reference.getDeliveryCount() + 1);
+ /*
+ Note we copy the message before sending.
+ This is because delivery count may be different for the same message sent to different topic subscribers
+ And invm the same message instance would otherwise be passed to all consumers
+ For the non INVM case this copy is unncessary and can be optimised away TODO - although the overhead of
+ copying is actually quite small
+ */
+ Message copy = reference.getMessage().copy();
+ copy.setDeliveryCount(reference.getDeliveryCount() + 1);
+
+ DeliverMessage message = new DeliverMessage(copy, deliveryID);
+
message.setTargetID(consumerID);
sender.send(message);
Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -95,10 +95,6 @@
*/
private Semaphore refsLock = new Semaphore(1);
-
- //FIXME - does scheduledDeliveryTime belong on message? surely on SessionSendMessage
- // private long scheduledDeliveryTime;
-
// Constructors --------------------------------------------------
/*
Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -38,16 +38,14 @@
{
byte[] encodedMsg = encodeMessage(message.getMessage());
long deliveryID = message.getDeliveryID();
- int deliveryCount = message.getDeliveryCount();
- int bodyLength = INT_LENGTH + encodedMsg.length
+ int bodyLength = encodedMsg.length
+ LONG_LENGTH + INT_LENGTH;
out.putInt(bodyLength);
out.putInt(encodedMsg.length);
out.put(encodedMsg);
out.putLong(deliveryID);
- out.putInt(deliveryCount);
}
@Override
@@ -65,9 +63,8 @@
in.get(encodedMsg);
Message msg = decodeMessage(encodedMsg);
long deliveryID = in.getLong();
- int deliveryCount = in.getInt();
- return new DeliverMessage(msg, deliveryID, deliveryCount);
+ return new DeliverMessage(msg, deliveryID);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -23,20 +23,15 @@
// Attributes ----------------------------------------------------
- //FIXME - we do not need all these fields
-
private final Message message;
private final long deliveryID;
- private final int deliveryCount;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public DeliverMessage(Message message, long deliveryID,
- int deliveryCount)
+ public DeliverMessage(Message message, long deliveryID)
{
super(SESS_DELIVER);
@@ -44,7 +39,6 @@
this.message = message;
this.deliveryID = deliveryID;
- this.deliveryCount = deliveryCount;
}
// Public --------------------------------------------------------
@@ -59,18 +53,12 @@
return deliveryID;
}
- public int getDeliveryCount()
- {
- return deliveryCount;
- }
-
@Override
public String toString()
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", message=" + message);
- buf.append(", deliveryID=" + deliveryID);
- buf.append(", deliveryCount=" + deliveryCount);
+ buf.append(", deliveryID=" + deliveryID);
buf.append("]");
return buf.toString();
}
Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java 2008-02-08 16:25:03 UTC (rev 3685)
@@ -675,13 +675,12 @@
public void testDeliverMessage() throws Exception
{
Message msg = new MessageImpl();
- DeliverMessage message = new DeliverMessage(msg, randomLong(), 23);
+ DeliverMessage message = new DeliverMessage(msg, randomLong());
AbstractPacketCodec codec = new DeliverMessageCodec();
SimpleRemotingBuffer buffer = encode(message, codec);
checkHeader(buffer, message);
- checkBody(buffer, encodeMessage(msg), message.getDeliveryID(), message
- .getDeliveryCount());
+ checkBody(buffer, encodeMessage(msg), message.getDeliveryID());
buffer.rewind();
AbstractPacket decodedPacket = codec.decode(buffer);
@@ -692,8 +691,6 @@
assertEquals(message.getMessage().getMessageID(), decodedMessage
.getMessage().getMessageID());
assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
- assertEquals(message.getDeliveryCount(), decodedMessage
- .getDeliveryCount());
}
public void testSessionAcknowledgeMessage() throws Exception
More information about the jboss-cvs-commits
mailing list