Author: timfox
Date: 2009-11-20 12:41:40 -0500 (Fri, 20 Nov 2009)
New Revision: 8349
Modified:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
WIBBLE WOBBLE WABBLE WOOOO
Modified:
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -376,6 +376,9 @@
public HornetQBuffer copy()
{
- return new
HornetQByteBufferBackedChannelBuffer(ByteBuffer.wrap(buffer.array().clone()));
+ ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining());
+ newBuffer.put(buffer);
+ newBuffer.flip();
+ return new HornetQByteBufferBackedChannelBuffer(newBuffer);
}
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -36,8 +36,6 @@
void acknowledge() throws HornetQException;
- void decode(HornetQBuffer buffer);
-
void resetBuffer();
//FIXME - the following are only used for large messages - they should be put
somewhere else:
@@ -56,8 +54,6 @@
* @throws HornetQException
*/
boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
-
- void decodeHeadersAndProperties(HornetQBuffer buffer);
-
+
void setBodyInputStream(InputStream bodyInputStream);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -581,7 +581,7 @@
* @parameter discountSlowConsumer When dealing with slowConsumers, we need to
discount one credit that was pre-sent when the first receive was called. For largeMessage
that is only done at the latest packet
* */
public void flowControl(final int messageBytes, final boolean discountSlowConsumer)
throws HornetQException
- {
+ {
if (clientWindowSize >= 0)
{
creditsToSend += messageBytes;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -132,30 +132,8 @@
@Override
public void afterSend()
{
- //temp hack
-
-// ChannelBuffer cb = (ChannelBuffer)buffer.getUnderlyingBuffer();
-//
-// ChannelBuffer cbCopy = cb.copy(0, cb.capacity());
-//
-// this.buffer = new ChannelBufferWrapper(cbCopy);
-
- // resetBuffer();
-
-
}
-
- public void resetBuffer()
- {
- //There is a bug in Netty which requires us to initially write a byte
- if (buffer.capacity() == 0)
- {
- buffer.writeByte((byte)0);
- }
-
- buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);
- }
-
+
@Override
public String toString()
{
Modified:
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -696,7 +696,7 @@
{
log.trace("Setting up flowControlSize to " +
message.getPacketSize() + " on message = " + clMessage);
}
-
+
clMessage.setFlowControlSize(message.getPacketSize());
consumer.handleMessage(message.getClientMessage());
@@ -1372,7 +1372,7 @@
// consumer
if (windowSize != 0)
- {
+ {
channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -700,7 +700,7 @@
}
ServerMessage notificationMessage = new
ServerMessageImpl(storageManager.generateUniqueID(),
-
HornetQChannelBuffers.EMPTY_BUFFER);
+
HornetQChannelBuffers.dynamicBuffer(1500));
// Notification messages are always durable so the user can choose whether
to add a durable queue to
// consume
Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-20
16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -192,5 +192,11 @@
void afterSend();
boolean isBufferWritten();
+
+ boolean isEncodedToBuffer();
+
+ void decodeFromWire(HornetQBuffer buffer);
+
+ void decodeHeadersAndProperties(HornetQBuffer buffer);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -29,7 +29,9 @@
import org.hornetq.core.message.BodyEncoder;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.PropertyConversionException;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -91,7 +93,12 @@
protected byte priority;
protected HornetQBuffer buffer;
-
+
+ private int encodeSize;
+
+ //This means does the buffer contain an accurate encoding of the message?
+ protected boolean encodedToBuffer;
+
// Constructors --------------------------------------------------
protected MessageImpl()
@@ -141,7 +148,16 @@
return false;
}
- private int encodeSize;
+ public void resetBuffer()
+ {
+ //There is a bug in Netty which requires us to initially write a byte
+ if (buffer.capacity() == 0)
+ {
+ buffer.writeByte((byte)0);
+ }
+
+ buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);
+ }
public int getEncodeSize()
{
@@ -154,12 +170,9 @@
}
public void encodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- //log.info("starting encode message at " + buffer.writerIndex());
- buffer.writeLong(messageID);
- // log.info("encoded id " + messageID + " at index " +
buffer.writerIndex());
- buffer.writeSimpleString(destination);
- //log.info("encoded destination " + destination + " at index "
+ buffer.writerIndex());
+ {
+ buffer.writeLong(messageID);
+ buffer.writeSimpleString(destination);
buffer.writeByte(type);
buffer.writeBoolean(durable);
buffer.writeLong(expiration);
@@ -169,20 +182,24 @@
encodeSize = buffer.writerIndex();
}
- public void decode(final HornetQBuffer buffer)
+ public void decodeFromWire(final HornetQBuffer buffer)
{
decodeHeadersAndProperties(buffer);
this.buffer = buffer;
+
+ this.encodedToBuffer = true;
}
-
+
+ public boolean isEncodedToBuffer()
+ {
+ return this.encodedToBuffer;
+ }
+
public void decodeHeadersAndProperties(final HornetQBuffer buffer)
- {
- // log.info("starting decode at " + buffer.readerIndex());
- messageID = buffer.readLong();
- // log.info("decoded message id " + messageID + " at index " +
buffer.readerIndex());
- destination = buffer.readSimpleString();
- // log.info("decoded destination " + destination + " at index "
+ buffer.readerIndex());
+ {
+ messageID = buffer.readLong();
+ destination = buffer.readSimpleString();
type = buffer.readByte();
durable = buffer.readBoolean();
expiration = buffer.readLong();
Modified:
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
{
message = storage.createLargeMessage();
HornetQBuffer buffer =
HornetQChannelBuffers.dynamicBuffer(largeMessageLazyData);
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
largeMessageLazyData = null;
}
return message;
@@ -107,7 +107,7 @@
message = new ServerMessageImpl();
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
}
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -165,7 +165,7 @@
// }
@Override
- public void decode(final HornetQBuffer buffer)
+ public void decodeHeadersAndProperties(final HornetQBuffer buffer)
{
file = null;
decodeHeadersAndProperties(buffer);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -451,6 +451,8 @@
throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was
not assigned to Message");
}
+ log.info("storing message");
+
// Note that we don't sync, the add reference that comes immediately after will
sync if appropriate
if (message.isLargeMessage())
@@ -1674,7 +1676,7 @@
*/
public void decode(final HornetQBuffer buffer)
{
- message.decode(buffer);
+ message.decodeHeadersAndProperties(buffer);
}
/* (non-Javadoc)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -780,8 +780,7 @@
{
// First send a reset message
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(),
HornetQChannelBuffers.EMPTY_BUFFER);
- // message.setDurable(true);
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(),
HornetQChannelBuffers.dynamicBuffer(50));
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -1000,7 +999,7 @@
private ServerMessage createQueueInfoMessage(final NotificationType type, final
SimpleString queueName)
{
- ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(),
HornetQChannelBuffers.EMPTY_BUFFER);
+ ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(),
HornetQChannelBuffers.dynamicBuffer(100));
message.setDestination(queueName);
// message.setDurable(true);
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -329,7 +329,7 @@
// ----------------------------------------------------
public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
- {
+ {
final Packet packet = decoder.decode(buffer);
if (executor == null || packet.getType() == PacketImpl.PING)
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -83,13 +83,36 @@
{
return deliveryCount;
}
-
+
@Override
public HornetQBuffer encode(final RemotingConnection connection)
{
//We re-use the same packet buffer - but we need to change the extra data
+
HornetQBuffer buffer = serverMessage.getBuffer();
-
+
+ if (serverMessage.isEncodedToBuffer())
+ {
+ //It's already encoded - we just need to change the extra data at the end
+ //so we need to jump to the after body position
+
+ buffer.setIndex(0, serverMessage.getEndMessagePosition());
+ }
+ else
+ {
+ int afterBody = buffer.writerIndex();
+
+ //Message hasn't been encoded yet - probably it's something like a
notification message generated on the server
+
+ // We now write the message headers and properties
+ serverMessage.encodeHeadersAndProperties(buffer);
+
+ serverMessage.setEndMessagePosition(buffer.writerIndex());
+
+ //Now we need to fill in the afterBody
+ buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, afterBody);
+ }
+
buffer.writeLong(consumerID);
buffer.writeInt(deliveryCount);
@@ -106,35 +129,49 @@
buffer.writeLong(channelID);
//And fill in the message id, since this was set on the server side so won't
already be in the buffer
- buffer.setIndex(0, buffer.writerIndex() + DataConstants.SIZE_INT);
+
+ buffer.writerIndex(buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE));
buffer.writeLong(serverMessage.getMessageID());
+ //Set the reader and writer position to be read fully by remoting
buffer.setIndex(0, size);
return buffer;
}
- public void decodeRest(final HornetQBuffer buffer)
+ @Override
+ public void decode(final HornetQBuffer buffer)
{
+ channelID = buffer.readLong();
+
clientMessage = new ClientMessageImpl();
// We read the position of the end of the body - this is where the message headers
and properties are stored
int afterBody = buffer.readInt();
+ //At this point standard headers have been decoded and we are positioned at the
beginning of the body
+ int bodyStart = buffer.readerIndex();
+
// We now read message headers/properties
- buffer.setIndex(afterBody, buffer.writerIndex());
+ buffer.readerIndex(afterBody);
- clientMessage.decode(buffer);
+ clientMessage.decodeFromWire(buffer);
+ // And read the extra data
+
consumerID = buffer.readLong();
-
+
deliveryCount = buffer.readInt();
clientMessage.setDeliveryCount(deliveryCount);
- buffer.resetReaderIndex();
+ size = buffer.readerIndex();
+
+ // Set reader index back to beginning of body
+ buffer.readerIndex(bodyStart);
+
clientMessage.setBuffer(buffer);
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -114,10 +114,12 @@
*
*
*/
+
HornetQBuffer buffer = sentMessage.getBuffer();
// The body will already be written (if any) at this point, so we take note of the
position of the end of the
// body
+
int afterBody = buffer.writerIndex();
// We now write the message headers and properties
@@ -173,20 +175,19 @@
buffer.setIndex(afterBody, buffer.writerIndex());
- receivedMessage.decode(buffer);
+ receivedMessage.decodeFromWire(buffer);
+
+ //We store the position of the end of the encoded message, where the extra data
starts - this
+ //will be needed if we re-deliver this packet, since we need to reset to there to
rewrite the extra data
+ //for the different packet
+ receivedMessage.setEndMessagePosition(buffer.readerIndex());
// And we read extra data in the packet
requiresResponse = buffer.readBoolean();
-
- // We set reader index back to the beginning of the buffer so it can be easily read
if then delivered
- // to a client, and we set writer index to just after where the headers/properties
were encoded so that it can
- // be fileld in with extra data required when delivering the packet to the client
(e.g. delivery count, consumer
- // id)
-
- buffer.setIndex(0, buffer.writerIndex() - DataConstants.SIZE_BOOLEAN);
}
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -60,4 +60,11 @@
boolean page(long transactionID, boolean duplicateDetection) throws Exception;
boolean storeIsPaging();
+
+ void setNeedsEncoding();
+
+
+ void setEndMessagePosition(int pos);
+
+ int getEndMessagePosition();
}
Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -605,7 +605,7 @@
}
public synchronized void cancel(final MessageReference reference) throws Exception
- {
+ {
if (checkDLQ(reference))
{
if (!scheduledDeliveryHandler.checkAndSchedule(reference))
@@ -1109,7 +1109,7 @@
* Attempt to deliver all the messages in the queue
*/
private synchronized void deliver()
- {
+ {
if (paused || handlers.isEmpty())
{
return;
@@ -1213,7 +1213,7 @@
}
private synchronized boolean directDeliver(final MessageReference reference)
- {
+ {
if (paused || handlers.isEmpty())
{
return false;
@@ -1308,13 +1308,13 @@
boolean add = false;
if (direct && !paused)
- {
+ {
// Deliver directly
boolean delivered = directDeliver(ref);
if (!delivered)
- {
+ {
add = true;
direct = false;
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -184,9 +184,9 @@
}
public HandleStatus handle(final MessageReference ref) throws Exception
- {
+ {
if (availableCredits != null && availableCredits.get() <= 0)
- {
+ {
return HandleStatus.BUSY;
}
@@ -347,7 +347,7 @@
promptDelivery(false);
ServerMessage forcedDeliveryMessage = new
ServerMessageImpl(storageManager.generateUniqueID(),
-
HornetQChannelBuffers.EMPTY_BUFFER);
+
HornetQChannelBuffers.dynamicBuffer(100));
forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE,
sequence);
forcedDeliveryMessage.setDestination(messageQueue.getName());
@@ -409,7 +409,7 @@
}
public void receiveCredits(final int credits) throws Exception
- {
+ {
if (credits == -1)
{
// No flow control
@@ -582,7 +582,7 @@
channel.send(packet);
if (availableCredits != null)
- {
+ {
availableCredits.addAndGet(-packet.getPacketSize());
}
Modified:
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -16,6 +16,7 @@
import java.io.InputStream;
import java.util.concurrent.atomic.AtomicInteger;
+import org.hornetq.core.buffers.HornetQChannelBuffers;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.Message;
import org.hornetq.core.message.impl.MessageImpl;
@@ -25,6 +26,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
import org.hornetq.utils.TypedProperties;
@@ -62,8 +64,10 @@
* Construct a MessageImpl from storage
*/
public ServerMessageImpl(final long messageID)
- {
+ {
super(messageID);
+
+ log.info("creating server message from storage, with id " + messageID);
}
/*
@@ -74,8 +78,11 @@
super(messageID);
this.buffer = buffer;
+
+ //Must align the body after the packet headers
+ resetBuffer();
}
-
+
/*
* Copy constructor
*/
@@ -253,6 +260,8 @@
putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
}
+
+ setNeedsEncoding();
}
public void setPagingStore(final PagingStore pagingStore)
@@ -305,15 +314,8 @@
}
}
- // EncodingSupport implementation
+
- // Used when storing to/from journal
-
- public void encode(HornetQBuffer buffer)
- {
-
- }
-
@Override
public String toString()
{
@@ -332,5 +334,83 @@
{
return null;
}
+
+
+
+ // Encoding stuff
+
+
+ public void setNeedsEncoding()
+ {
+ //This wil force the message to be re-encoded if it gets sent to a client
+ //Typically this is called after properties or headers are changed on the server
side
+ this.encodedToBuffer = false;
+ }
+
+ private int endMessagePosition;
+
+ public void setEndMessagePosition(int pos)
+ {
+ this.endMessagePosition = pos;
+ }
+
+ public int getEndMessagePosition()
+ {
+ return this.endMessagePosition;
+ }
+
+ // EncodingSupport implementation
+
+ // Used when storing to/from journal
+
+ public void encode(final HornetQBuffer buffer)
+ {
+ //Encode the message to a buffer for storage in the journal
+
+ if (this.encodedToBuffer)
+ {
+ //The body starts after the standard packet headers
+ int bodyStart = PacketImpl.PACKET_HEADERS_SIZE;
+
+ int end = this.endMessagePosition;
+
+ buffer.writeBytes(this.buffer, bodyStart, end);
+ }
+ else
+ {
+ //encodeToBuffer();
+
+ throw new IllegalStateException("Not encoded to buffer and storing to
journal");
+ }
+ }
+
+ public void decode(final HornetQBuffer buffer)
+ {
+ //TODO optimise
+
+ log.info("decoding server message");
+
+ this.buffer = HornetQChannelBuffers.dynamicBuffer(1500);
+
+ //work around Netty bug
+ this.buffer.writeByte((byte)0);
+
+ this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+ this.buffer.writeBytes(buffer, 0, buffer.readableBytes());
+
+
+ //Position to beginning of encoded message headers/properties
+
+ int msgHeadersPos = this.buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
+
+ this.buffer.readerIndex(msgHeadersPos);
+
+ this.decodeHeadersAndProperties(this.buffer);
+
+ log.info("priority is now " + this.getPriority());
+
+
+ }
}
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -13,6 +13,7 @@
package org.hornetq.integration.transports.netty;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.utils.DataConstants;
import org.hornetq.utils.SimpleString;
@@ -30,6 +31,9 @@
*/
public class ChannelBufferWrapper implements HornetQBuffer
{
+ private static final Logger log = Logger.getLogger(ChannelBufferWrapper.class);
+
+
private final ChannelBuffer buffer;
/**
@@ -280,8 +284,8 @@
* @see org.hornetq.core.remoting.spi.HornetQBuffer#readString()
*/
public String readString()
- {
- int len = readInt();
+ {
+ int len = readInt();
char[] chars = new char[len];
for (int i = 0; i < len; i++)
{
@@ -378,12 +382,12 @@
* @see org.hornetq.core.remoting.spi.HornetQBuffer#writeString(java.lang.String)
*/
public void writeString(final String val)
- {
+ {
writeInt(val.length());
for (int i = 0; i < val.length(); i++)
{
writeShort((short)val.charAt(i));
- }
+ }
}
/* (non-Javadoc)
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -47,7 +47,7 @@
public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler
handler)
{
assert pipeline != null;
- pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+ pipeline.addLast("decoder", new HornetQFrameDecoder2());
}
public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext
context, final boolean client) throws Exception
Modified:
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
---
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -27,8 +27,6 @@
/**
* A Netty decoder used to decode messages.
*
- * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a>
- * @author <a href="ataylor(a)redhat.com">Andy Taylor</a>
* @author <a href="tlee(a)redhat.com">Trustin Lee</a>
*
* @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
@@ -49,7 +47,7 @@
{
if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
// XXX Length is unknown. Bet at 512. Tune this value.
- append(in, 512);
+ append(in, 1500);
return;
}
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
ServerMessage serverMessage = new ServerMessageImpl();
- serverMessage.decode(buffer);
+ serverMessage.decodeHeadersAndProperties(buffer);
int serverEncodeSize = serverMessage.getEncodeSize();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -23,6 +23,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.jms.client.HornetQTextMessage;
@@ -78,57 +79,10 @@
ClientProducer producer = session.createProducer(QUEUE);
- final int numMessages = 10000;
+ final int numMessages = 10;
for (int i = 0; i < numMessages; i++)
- {
- /*
- * Like this:
- *
- * ClientMessage message = producer.createMessage(...);
- *
- * message.putStringProperty("foo", "bar");
- *
- * message.encodeToBuffer(); [this sets the destination from the producer, and
encodes]
- *
- * message.getBuffer().writeString("testINVMCoreClient");
- *
- * message.send();
- *
- * OR, another option:
- *
- * Get rid of client producer altogether,
- *
- * Have send direct on the session, and destination must be set explicitly
- *
- * e.g.
- *
- * ClientMessage message = session.createMessage(...)
- *
- * message.putStringProperty("foo", "bar");
- *
- * message.setDestination("foo");
- *
- * message.writeBody();
- *
- * message.getBuffer().writeString("testINVMCoreClient");
- *
- * message.send();
- *
- *
- * ORRR
- *
- * we don't write the headers and properties until *AFTER* the body
- *
- * giving this format:
- * body length
- * body
- * headers + properties
- *
- * this means we don't need an encodeToBuffer() method!!
- *
- */
-
+ {
ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
false,
0,
@@ -143,8 +97,6 @@
message.setDestination(QUEUE);
- message.encodeToBuffer();
-
message.getBuffer().writeString("testINVMCoreClient");
producer.send(message);
@@ -159,10 +111,14 @@
for (int i = 0; i < numMessages; i++)
{
ClientMessage message2 = consumer.receive();
+
+ HornetQBuffer buffer = message2.getBuffer();
+
+ assertEquals("testINVMCoreClient", buffer.readString());
- assertEquals("testINVMCoreClient",
message2.getBuffer().readString());
-
message2.acknowledge();
+
+ log.info("got message " + i);
}
session.close();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -61,7 +61,7 @@
ClientProducer producer = session.createProducer(address);
producer.send(session.createClientMessage(!durable));
-
+
restart();
session.start();
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-20
16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -161,7 +161,7 @@
String str = new String(bytes);
- final int warmup = 50000;
+ final int warmup = 500000;
log.info("Warming up");
@@ -181,7 +181,7 @@
log.info("** WARMUP DONE");
- final int numMessages = 1000000;
+ final int numMessages = 2000000;
tm = sess.createTextMessage();
Modified:
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -63,7 +63,7 @@
HornetQBuffer buffer = HornetQChannelBuffers.buffer(message.getEncodeSize());
message.encode(buffer);
Message message2 = new ClientMessageImpl(false);
- message2.decode(buffer);
+ message2.decodeHeadersAndProperties(buffer);
assertMessagesEquivalent(message, message2);
}
}
Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
---
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-20
16:52:13 UTC (rev 8348)
+++
branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java 2009-11-20
17:41:40 UTC (rev 8349)
@@ -353,7 +353,7 @@
public String getTextMessage(ClientMessage m)
{
- m.getBuffer().resetReaderIndex();
+ //m.getBuffer().resetReaderIndex();
return m.getBuffer().readString();
}