[jboss-cvs] JBoss Messaging SVN: r4155 - in trunk: src/main/org/jboss/messaging/core/message and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed May 7 02:39:39 EDT 2008
Author: timfox
Date: 2008-05-07 02:39:39 -0400 (Wed, 07 May 2008)
New Revision: 4155
Modified:
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/message/Message.java
trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
Log:
Moved delivery id onto message
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -33,7 +33,6 @@
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -75,7 +74,7 @@
private final int tokenBatchSize;
- private final PriorityLinkedList<ConsumerDeliverMessage> buffer = new PriorityLinkedListImpl<ConsumerDeliverMessage>(10);
+ private final PriorityLinkedList<Message> buffer = new PriorityLinkedListImpl<Message>(10);
private volatile Thread receiverThread;
@@ -169,9 +168,9 @@
if (!closed && !buffer.isEmpty())
{
- ConsumerDeliverMessage m = buffer.removeFirst();
+ Message m = buffer.removeFirst();
- boolean expired = m.getMessage().isExpired();
+ boolean expired = m.isExpired();
session.delivered(m.getDeliveryID(), expired);
@@ -189,7 +188,7 @@
}
}
- return m.getMessage();
+ return m;
}
else
{
@@ -284,7 +283,7 @@
return targetID;
}
- public void handleMessage(final ConsumerDeliverMessage message) throws Exception
+ public void handleMessage(final Message message) throws Exception
{
if (closed)
{
@@ -320,7 +319,7 @@
{
//Dispatch it directly on remoting thread
- boolean expired = message.getMessage().isExpired();
+ boolean expired = message.isExpired();
session.delivered(message.getDeliveryID(), expired);
@@ -328,7 +327,7 @@
if (!expired)
{
- handler.onMessage(message.getMessage());
+ handler.onMessage(message);
}
}
else
@@ -337,7 +336,7 @@
synchronized (this)
{
- buffer.addLast(message, message.getMessage().getPriority());
+ buffer.addLast(message, message.getPriority());
}
sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
@@ -349,7 +348,7 @@
synchronized (this)
{
- buffer.addLast(message, message.getMessage().getPriority());
+ buffer.addLast(message, message.getPriority());
notify();
}
@@ -443,7 +442,7 @@
//ordering. If we just added a Runnable with the message to the executor immediately as we get it
//we could not do that
- ConsumerDeliverMessage message;
+ Message message;
synchronized (this)
{
@@ -452,7 +451,7 @@
if (message != null)
{
- boolean expired = message.getMessage().isExpired();
+ boolean expired = message.isExpired();
session.delivered(message.getDeliveryID(), expired);
@@ -462,7 +461,7 @@
{
onMessageThread = Thread.currentThread();
- handler.onMessage(message.getMessage());
+ handler.onMessage(message);
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -8,7 +8,7 @@
import org.jboss.messaging.core.client.ClientConsumer;
import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
+import org.jboss.messaging.core.message.Message;
/**
*
@@ -21,7 +21,7 @@
{
long getClientTargetID();
- void handleMessage(ConsumerDeliverMessage message) throws Exception;
+ void handleMessage(Message message) throws Exception;
void recover(long lastDeliveryID) throws MessagingException;
}
Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -45,7 +45,7 @@
{
ConsumerDeliverMessage message = (ConsumerDeliverMessage) packet;
- clientConsumer.handleMessage(message);
+ clientConsumer.handleMessage(message.getMessage());
}
else
{
Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -21,7 +21,6 @@
*/
package org.jboss.messaging.core.message;
-import java.util.List;
import java.util.Map;
import org.jboss.messaging.core.server.Queue;
@@ -107,6 +106,14 @@
* @param priority
*/
void setPriority(byte priority);
+
+ int getDeliveryCount();
+
+ void setDeliveryCount(int deliveryCount);
+
+ long getDeliveryID();
+
+ void setDeliveryID(long deliveryID);
/**
* Binds a header. If the header map previously contained a mapping for this name, the old value
@@ -173,18 +180,6 @@
int getType();
/**
- *
- * @return The delivery count of the message - only available on the client side
- */
- int getDeliveryCount();
-
- /**
- * Set the delivery count of the message
- * @param count
- */
- void setDeliveryCount(int count);
-
- /**
* Get the connection id
* @return the connection id
*/
Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -80,6 +80,8 @@
private int deliveryCount;
+ private long deliveryID;
+
// Constructors --------------------------------------------------
/*
@@ -278,6 +280,16 @@
{
return this.deliveryCount;
}
+
+ public void setDeliveryID(final long deliveryID)
+ {
+ this.deliveryID = deliveryID;
+ }
+
+ public long getDeliveryID()
+ {
+ return this.deliveryID;
+ }
public boolean isExpired()
{
@@ -371,6 +383,8 @@
out.writeByte(priority);
out.writeInt(deliveryCount);
+
+ out.writeLong(deliveryID);
if (payload != null)
{
@@ -402,6 +416,8 @@
deliveryCount = in.readInt();
+ deliveryID = in.readLong();
+
int length = in.readInt();
if (length == 0)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -43,7 +43,7 @@
{
byte[] encodedMsg = StreamUtils.toBytes(packet.getMessage());
- return SIZE_INT + encodedMsg.length + SIZE_LONG;
+ return SIZE_INT + encodedMsg.length;
}
@Override
@@ -52,7 +52,6 @@
byte[] encodedMsg = StreamUtils.toBytes(message.getMessage());
out.putInt(encodedMsg.length);
out.put(encodedMsg);
- out.putLong(message.getDeliveryID());
encodedMsg = null;
}
@@ -65,9 +64,8 @@
in.get(encodedMsg);
Message message = new MessageImpl();
StreamUtils.fromBytes(message, encodedMsg);
- long deliveryID = in.getLong();
- return new ConsumerDeliverMessage(message, deliveryID);
+ return new ConsumerDeliverMessage(message);
}
// Package protected ---------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -59,5 +59,4 @@
void rewind();
byte[] array();
-
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -29,13 +29,13 @@
// Attributes ----------------------------------------------------
- protected final IoBuffer buffer;
+ private final IoBuffer buffer;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public BufferWrapper(IoBuffer buffer)
+ public BufferWrapper(final IoBuffer buffer)
{
assert buffer != null;
@@ -86,7 +86,7 @@
return buffer.get();
}
- public void get(byte[] b)
+ public void get(final byte[] b)
{
buffer.get(b);
}
@@ -124,10 +124,6 @@
return (b == TRUE);
}
-
-
-
-
public void putNullableString(final String nullableString)
{
if (nullableString == null)
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -25,20 +25,17 @@
private final Message message;
- private final long deliveryID;
-
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ConsumerDeliverMessage(final Message message, final long deliveryID)
+ public ConsumerDeliverMessage(final Message message)
{
super(CONS_DELIVER);
assert message != null;
this.message = message;
- this.deliveryID = deliveryID;
}
// Public --------------------------------------------------------
@@ -48,17 +45,11 @@
return message;
}
- public long getDeliveryID()
- {
- return deliveryID;
- }
-
@Override
public String toString()
{
StringBuffer buf = new StringBuffer(getParentString());
buf.append(", message=" + message);
- buf.append(", deliveryID=" + deliveryID);
buf.append("]");
return buf.toString();
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -24,7 +24,6 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.Message;
import org.jboss.messaging.core.message.MessageReference;
-import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.remoting.PacketSender;
import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
import org.jboss.messaging.core.server.Delivery;
@@ -84,8 +83,10 @@
copy.setDeliveryCount(reference.getDeliveryCount() + 1);
- ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy, deliveryID);
+ copy.setDeliveryID(deliveryID);
+ ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy);
+
message.setTargetID(consumerID);
message.setExecutorID(sessionID);
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java 2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java 2008-05-07 06:39:39 UTC (rev 4155)
@@ -679,19 +679,17 @@
public void testConsumerDeliverMessage() throws Exception
{
Message msg = new MessageImpl();
- ConsumerDeliverMessage message = new ConsumerDeliverMessage(msg,
- randomLong());
+ ConsumerDeliverMessage message = new ConsumerDeliverMessage(msg);
AbstractPacketCodec codec = new ConsumerDeliverMessageCodec();
Packet decodedPacket = encodeAndCheckBytesAndDecode(message, codec,
- StreamUtils.toBytes(msg), message.getDeliveryID());
+ StreamUtils.toBytes(msg));
assertTrue(decodedPacket instanceof ConsumerDeliverMessage);
ConsumerDeliverMessage decodedMessage = (ConsumerDeliverMessage) decodedPacket;
assertEquals(CONS_DELIVER, decodedMessage.getType());
assertEquals(message.getMessage().getMessageID(), decodedMessage
.getMessage().getMessageID());
- assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
}
public void testSessionAcknowledgeMessage() throws Exception
More information about the jboss-cvs-commits
mailing list