[hornetq-commits] JBoss hornetq SVN: r8349 - in branches/20-optimisation: src/main/org/hornetq/core/client and 17 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Nov 20 12:41:41 EST 2009


Author: timfox
Date: 2009-11-20 12:41:40 -0500 (Fri, 20 Nov 2009)
New Revision: 8349

Modified:
   branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
   branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
Log:
WIBBLE WOBBLE WABBLE WOOOO

Modified: branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/buffers/HornetQByteBufferBackedChannelBuffer.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -376,6 +376,9 @@
    
    public HornetQBuffer copy()
    {
-      return new HornetQByteBufferBackedChannelBuffer(ByteBuffer.wrap(buffer.array().clone()));
+      ByteBuffer newBuffer = ByteBuffer.allocate(buffer.remaining());
+      newBuffer.put(buffer);
+      newBuffer.flip();
+      return new HornetQByteBufferBackedChannelBuffer(newBuffer);
    }
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/ClientMessage.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -36,8 +36,6 @@
    
    void acknowledge() throws HornetQException;   
    
-   void decode(HornetQBuffer buffer);
-   
    void resetBuffer();
         
    //FIXME - the following are only used for large messages - they should be put somewhere else:
@@ -56,8 +54,6 @@
     * @throws HornetQException
     */
    boolean waitOutputStreamCompletion(long timeMilliseconds) throws HornetQException;
-   
-   void decodeHeadersAndProperties(HornetQBuffer buffer);
-      
+            
    void setBodyInputStream(InputStream bodyInputStream);  
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -581,7 +581,7 @@
     * @parameter discountSlowConsumer When dealing with slowConsumers, we need to discount one credit that was pre-sent when the first receive was called. For largeMessage that is only done at the latest packet
     * */
    public void flowControl(final int messageBytes, final boolean discountSlowConsumer) throws HornetQException
-   {
+   { 
       if (clientWindowSize >= 0)
       {
          creditsToSend += messageBytes;

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientMessageImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -132,30 +132,8 @@
    @Override
    public void afterSend()
    {      
-      //temp hack
-      
-//      ChannelBuffer cb = (ChannelBuffer)buffer.getUnderlyingBuffer();
-//      
-//      ChannelBuffer cbCopy = cb.copy(0, cb.capacity());
-//      
-//      this.buffer = new ChannelBufferWrapper(cbCopy); 
-      
-     // resetBuffer();
-      
-      
    }
-
-   public void resetBuffer()
-   {     
-      //There is a bug in Netty which requires us to initially write a byte
-      if (buffer.capacity() == 0)
-      {
-         buffer.writeByte((byte)0);
-      }
-
-      buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);      
-   }
-   
+     
    @Override
    public String toString()
    {

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -696,7 +696,7 @@
          {
             log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
          }
-
+       
          clMessage.setFlowControlSize(message.getPacketSize());
 
          consumer.handleMessage(message.getClientMessage());
@@ -1372,7 +1372,7 @@
       // consumer
 
       if (windowSize != 0)
-      {
+      {         
          channel.send(new SessionConsumerFlowCreditMessage(consumerID, windowSize));
       }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -700,7 +700,7 @@
                }
 
                ServerMessage notificationMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
-                                                                         HornetQChannelBuffers.EMPTY_BUFFER);
+                                                                         HornetQChannelBuffers.dynamicBuffer(1500));
 
                // Notification messages are always durable so the user can choose whether to add a durable queue to
                // consume

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -192,5 +192,11 @@
    void afterSend();
    
    boolean isBufferWritten();
+   
+   boolean isEncodedToBuffer();
+   
+   void decodeFromWire(HornetQBuffer buffer);
+   
+   void decodeHeadersAndProperties(HornetQBuffer buffer);
 
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -29,7 +29,9 @@
 import org.hornetq.core.message.BodyEncoder;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.message.PropertyConversionException;
+import org.hornetq.core.remoting.impl.wireformat.PacketImpl;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -91,7 +93,12 @@
    protected byte priority;
 
    protected HornetQBuffer buffer;
-
+   
+   private int encodeSize;
+   
+   //This means does the buffer contain an accurate encoding of the message?
+   protected boolean encodedToBuffer;
+      
    // Constructors --------------------------------------------------
 
    protected MessageImpl()
@@ -141,7 +148,16 @@
       return false;
    }
    
-   private int encodeSize;
+   public void resetBuffer()
+   {     
+      //There is a bug in Netty which requires us to initially write a byte
+      if (buffer.capacity() == 0)
+      {
+         buffer.writeByte((byte)0);
+      }
+
+      buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT);        
+   }
    
    public int getEncodeSize()
    {
@@ -154,12 +170,9 @@
    }
 
    public void encodeHeadersAndProperties(final HornetQBuffer buffer)
-   {
-      //log.info("starting encode message at " + buffer.writerIndex());
-      buffer.writeLong(messageID);      
-     // log.info("encoded id " + messageID + " at index " + buffer.writerIndex());
-      buffer.writeSimpleString(destination);
-      //log.info("encoded destination " + destination + " at index " + buffer.writerIndex());
+   {     
+      buffer.writeLong(messageID);          
+      buffer.writeSimpleString(destination);    
       buffer.writeByte(type);
       buffer.writeBoolean(durable);
       buffer.writeLong(expiration);
@@ -169,20 +182,24 @@
       encodeSize = buffer.writerIndex();
    }
    
-   public void decode(final HornetQBuffer buffer)
+   public void decodeFromWire(final HornetQBuffer buffer)
    {
       decodeHeadersAndProperties(buffer);
 
       this.buffer = buffer;
+      
+      this.encodedToBuffer = true;
    }
-    
+   
+   public boolean isEncodedToBuffer()
+   {
+      return this.encodedToBuffer;
+   }
+       
    public void decodeHeadersAndProperties(final HornetQBuffer buffer)
-   {
-     // log.info("starting decode at " + buffer.readerIndex());
-      messageID = buffer.readLong();
-     // log.info("decoded message id " + messageID + " at index " + buffer.readerIndex());
-      destination = buffer.readSimpleString();    
-     // log.info("decoded destination " + destination + " at index " + buffer.readerIndex());
+   {     
+      messageID = buffer.readLong();    
+      destination = buffer.readSimpleString();         
       type = buffer.readByte();
       durable = buffer.readBoolean();
       expiration = buffer.readLong();

Modified: branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/paging/impl/PagedMessageImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
       {
          message = storage.createLargeMessage();
          HornetQBuffer buffer = HornetQChannelBuffers.dynamicBuffer(largeMessageLazyData); 
-         message.decode(buffer);
+         message.decodeHeadersAndProperties(buffer);
          largeMessageLazyData = null;
       }
       return message;
@@ -107,7 +107,7 @@
          
          message = new ServerMessageImpl();
          
-         message.decode(buffer);
+         message.decodeHeadersAndProperties(buffer);
       }
 
    }

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/FileLargeServerMessage.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -165,7 +165,7 @@
 //   }
 
    @Override
-   public void decode(final HornetQBuffer buffer)
+   public void decodeHeadersAndProperties(final HornetQBuffer buffer)
    {
       file = null;
       decodeHeadersAndProperties(buffer);

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -451,6 +451,8 @@
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
       
+      log.info("storing message");
+      
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
 
       if (message.isLargeMessage())
@@ -1674,7 +1676,7 @@
        */
       public void decode(final HornetQBuffer buffer)
       {
-         message.decode(buffer);
+         message.decodeHeadersAndProperties(buffer);
       }
 
       /* (non-Javadoc)

Modified: branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -780,8 +780,7 @@
       {
          // First send a reset message
 
-         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.EMPTY_BUFFER);
-         // message.setDurable(true);
+         ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.dynamicBuffer(50));        
 
          message.setDestination(queueName);
          message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
@@ -1000,7 +999,7 @@
 
    private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
    {
-      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.EMPTY_BUFFER);
+      ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID(), HornetQChannelBuffers.dynamicBuffer(100));
 
       message.setDestination(queueName);
       // message.setDurable(true);

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -329,7 +329,7 @@
    // ----------------------------------------------------
 
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
-   {                 
+   {                             
       final Packet packet = decoder.decode(buffer);
       
       if (executor == null || packet.getType() == PacketImpl.PING)

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -83,13 +83,36 @@
    {
       return deliveryCount;
    }
-
+      
    @Override
    public HornetQBuffer encode(final RemotingConnection connection)
    {
       //We re-use the same packet buffer - but we need to change the extra data
+      
       HornetQBuffer buffer = serverMessage.getBuffer();
-      
+           
+      if (serverMessage.isEncodedToBuffer())
+      {
+         //It's already encoded - we just need to change the extra data at the end
+         //so we need to jump to the after body position
+         
+         buffer.setIndex(0, serverMessage.getEndMessagePosition());         
+      }
+      else
+      {
+         int afterBody = buffer.writerIndex();
+         
+         //Message hasn't been encoded yet - probably it's something like a notification message generated on the server
+         
+         // We now write the message headers and properties
+         serverMessage.encodeHeadersAndProperties(buffer);
+         
+         serverMessage.setEndMessagePosition(buffer.writerIndex());
+         
+         //Now we need to fill in the afterBody 
+         buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, afterBody);
+      }
+                             
       buffer.writeLong(consumerID);
       buffer.writeInt(deliveryCount);
       
@@ -106,35 +129,49 @@
       buffer.writeLong(channelID);
                       
       //And fill in the message id, since this was set on the server side so won't already be in the buffer
-      buffer.setIndex(0, buffer.writerIndex() + DataConstants.SIZE_INT);
+      
+      buffer.writerIndex(buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE));            
       buffer.writeLong(serverMessage.getMessageID());
       
+      //Set the reader and writer position to be read fully by remoting
       buffer.setIndex(0, size);
 
       return buffer;
    }
 
-   public void decodeRest(final HornetQBuffer buffer)
+   @Override
+   public void decode(final HornetQBuffer buffer)
    {
+      channelID = buffer.readLong();
+      
       clientMessage = new ClientMessageImpl();
       
       // We read the position of the end of the body - this is where the message headers and properties are stored
       int afterBody = buffer.readInt();
       
+      //At this point standard headers have been decoded and we are positioned at the beginning of the body
+      int bodyStart = buffer.readerIndex();
+      
       // We now read message headers/properties
 
-      buffer.setIndex(afterBody, buffer.writerIndex());
+      buffer.readerIndex(afterBody);
             
-      clientMessage.decode(buffer);
+      clientMessage.decodeFromWire(buffer);
       
+      // And read the extra data
+            
       consumerID = buffer.readLong();
-      
+        
       deliveryCount = buffer.readInt();
       
       clientMessage.setDeliveryCount(deliveryCount);
       
-      buffer.resetReaderIndex();
+      size = buffer.readerIndex();
+
+      // Set reader index back to beginning of body
       
+      buffer.readerIndex(bodyStart);
+      
       clientMessage.setBuffer(buffer);
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -114,10 +114,12 @@
        * 
        *  
        */
+
       HornetQBuffer buffer = sentMessage.getBuffer();
 
       // The body will already be written (if any) at this point, so we take note of the position of the end of the
       // body
+      
       int afterBody = buffer.writerIndex();
 
       // We now write the message headers and properties
@@ -173,20 +175,19 @@
 
       buffer.setIndex(afterBody, buffer.writerIndex());
 
-      receivedMessage.decode(buffer);
+      receivedMessage.decodeFromWire(buffer);
+           
+      //We store the position of the end of the encoded message, where the extra data starts - this
+      //will be needed if we re-deliver this packet, since we need to reset to there to rewrite the extra data
+      //for the different packet
+      receivedMessage.setEndMessagePosition(buffer.readerIndex());
 
       // And we read extra data in the packet
 
       requiresResponse = buffer.readBoolean();
-
-      // We set reader index back to the beginning of the buffer so it can be easily read if then delivered
-      // to a client, and we set writer index to just after where the headers/properties were encoded so that it can
-      // be fileld in with extra data required when delivering the packet to the client (e.g. delivery count, consumer
-      // id)
-
-      buffer.setIndex(0, buffer.writerIndex() - DataConstants.SIZE_BOOLEAN);
    }
 
+   
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -60,4 +60,11 @@
    boolean page(long transactionID, boolean duplicateDetection) throws Exception;
 
    boolean storeIsPaging();
+   
+   void setNeedsEncoding();
+   
+   
+   void setEndMessagePosition(int pos);
+   
+   int getEndMessagePosition();
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -605,7 +605,7 @@
    }
 
    public synchronized void cancel(final MessageReference reference) throws Exception
-   {
+   {      
       if (checkDLQ(reference))
       {
          if (!scheduledDeliveryHandler.checkAndSchedule(reference))
@@ -1109,7 +1109,7 @@
     * Attempt to deliver all the messages in the queue
     */
    private synchronized void deliver()
-   {
+   {     
       if (paused || handlers.isEmpty())
       {
          return;
@@ -1213,7 +1213,7 @@
    }
 
    private synchronized boolean directDeliver(final MessageReference reference)
-   {
+   {    
       if (paused || handlers.isEmpty())
       {
          return false;
@@ -1308,13 +1308,13 @@
       boolean add = false;
 
       if (direct && !paused)
-      {
+      {         
          // Deliver directly
 
          boolean delivered = directDeliver(ref);
 
          if (!delivered)
-         {
+         {          
             add = true;
 
             direct = false;

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -184,9 +184,9 @@
    }
 
    public HandleStatus handle(final MessageReference ref) throws Exception
-   {
+   { 
       if (availableCredits != null && availableCredits.get() <= 0)
-      {
+      {         
          return HandleStatus.BUSY;
       }
       
@@ -347,7 +347,7 @@
             promptDelivery(false);
 
             ServerMessage forcedDeliveryMessage = new ServerMessageImpl(storageManager.generateUniqueID(),
-                                                                        HornetQChannelBuffers.EMPTY_BUFFER);
+                                                                        HornetQChannelBuffers.dynamicBuffer(100));
 
             forcedDeliveryMessage.putLongProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE, sequence);
             forcedDeliveryMessage.setDestination(messageQueue.getName());
@@ -409,7 +409,7 @@
    }
 
    public void receiveCredits(final int credits) throws Exception
-   {
+   {      
       if (credits == -1)
       {
          // No flow control
@@ -582,7 +582,7 @@
       channel.send(packet);
       
       if (availableCredits != null)
-      {
+      {         
          availableCredits.addAndGet(-packet.getPacketSize());
       }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -16,6 +16,7 @@
 import java.io.InputStream;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.hornetq.core.buffers.HornetQChannelBuffers;
 import org.hornetq.core.logging.Logger;
 import org.hornetq.core.message.Message;
 import org.hornetq.core.message.impl.MessageImpl;
@@ -25,6 +26,7 @@
 import org.hornetq.core.server.MessageReference;
 import org.hornetq.core.server.Queue;
 import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -62,8 +64,10 @@
     * Construct a MessageImpl from storage
     */
    public ServerMessageImpl(final long messageID)
-   {
+   {      
       super(messageID);
+      
+      log.info("creating server message from storage, with id " + messageID);
    }
    
    /*
@@ -74,8 +78,11 @@
       super(messageID);
       
       this.buffer = buffer;
+      
+      //Must align the body after the packet headers
+      resetBuffer();
    }
-      
+        
    /*
     * Copy constructor
     */
@@ -253,6 +260,8 @@
 
          putLongProperty(HDR_ACTUAL_EXPIRY_TIME, actualExpiryTime);
       }
+      
+      setNeedsEncoding();
    }
 
    public void setPagingStore(final PagingStore pagingStore)
@@ -305,15 +314,8 @@
       }
    }
    
-   // EncodingSupport implementation
+
    
-   // Used when storing to/from journal
-   
-   public void encode(HornetQBuffer buffer)
-   {
-      
-   }
-   
    @Override
    public String toString()
    {
@@ -332,5 +334,83 @@
    {
       return null;
    }
+   
+   
+   
+   // Encoding stuff
+   
+   
+   public void setNeedsEncoding()
+   {
+      //This wil force the message to be re-encoded if it gets sent to a client
+      //Typically this is called after properties or headers are changed on the server side
+      this.encodedToBuffer = false;
+   }
+   
+   private int endMessagePosition;
+   
+   public void setEndMessagePosition(int pos)
+   {
+      this.endMessagePosition = pos;
+   }
+   
+   public int getEndMessagePosition()
+   {
+      return this.endMessagePosition;
+   }
+   
+   // EncodingSupport implementation
+   
+   // Used when storing to/from journal
+   
+   public void encode(final HornetQBuffer buffer)
+   {
+      //Encode the message to a buffer for storage in the journal
+      
+      if (this.encodedToBuffer)
+      {
+         //The body starts after the standard packet headers
+         int bodyStart = PacketImpl.PACKET_HEADERS_SIZE;
+         
+         int end = this.endMessagePosition;
+         
+         buffer.writeBytes(this.buffer, bodyStart, end);
+      }
+      else
+      {
+         //encodeToBuffer();
+         
+         throw new IllegalStateException("Not encoded to buffer and storing to journal");
+      }      
+   }
+   
+   public void decode(final HornetQBuffer buffer)
+   {
+      //TODO optimise
+      
+      log.info("decoding server message");
+      
+      this.buffer = HornetQChannelBuffers.dynamicBuffer(1500);
+      
+      //work around Netty bug
+      this.buffer.writeByte((byte)0);
+      
+      this.buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+      
+      this.buffer.writeBytes(buffer, 0, buffer.readableBytes());
+      
+      
+      //Position to beginning of encoded message headers/properties
+      
+      int msgHeadersPos = this.buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
+      
+      this.buffer.readerIndex(msgHeadersPos);
+      
+      this.decodeHeadersAndProperties(this.buffer);
+      
+      log.info("priority is now " + this.getPriority());
+      
+      
+   }
 
 }

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelBufferWrapper.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -13,6 +13,7 @@
 
 package org.hornetq.integration.transports.netty;
 
+import org.hornetq.core.logging.Logger;
 import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.utils.DataConstants;
 import org.hornetq.utils.SimpleString;
@@ -30,6 +31,9 @@
  */
 public class ChannelBufferWrapper implements HornetQBuffer
 {
+   private static final Logger log = Logger.getLogger(ChannelBufferWrapper.class);
+
+   
    private final ChannelBuffer buffer;
 
    /**
@@ -280,8 +284,8 @@
     * @see org.hornetq.core.remoting.spi.HornetQBuffer#readString()
     */
    public String readString()
-   {
-      int len = readInt();
+   {      
+      int len = readInt();      
       char[] chars = new char[len];
       for (int i = 0; i < len; i++)
       {
@@ -378,12 +382,12 @@
     * @see org.hornetq.core.remoting.spi.HornetQBuffer#writeString(java.lang.String)
     */
    public void writeString(final String val)
-   {
+   {      
       writeInt(val.length());
       for (int i = 0; i < val.length(); i++)
       {
          writeShort((short)val.charAt(i));
-      }
+      }      
    }
 
    /* (non-Javadoc)

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/ChannelPipelineSupport.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -47,7 +47,7 @@
    public static void addCodecFilter(final ChannelPipeline pipeline, final BufferHandler handler)
    {
       assert pipeline != null;
-      pipeline.addLast("decoder", new HornetQFrameDecoder(handler));
+      pipeline.addLast("decoder", new HornetQFrameDecoder2());
    }
 
    public static void addSSLFilter(final ChannelPipeline pipeline, final SSLContext context, final boolean client) throws Exception

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder2.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -27,8 +27,6 @@
 /**
  * A Netty decoder used to decode messages.
  *
- * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
- * @author <a href="ataylor at redhat.com">Andy Taylor</a>
  * @author <a href="tlee at redhat.com">Trustin Lee</a>
  *
  * @version $Revision: 7839 $, $Date: 2009-08-21 02:26:39 +0900 (2009-08-21, 금) $
@@ -49,7 +47,7 @@
       {
          if (previousData.readableBytes() + in.readableBytes() < SIZE_INT) {
             // XXX Length is unknown. Bet at 512. Tune this value.
-            append(in, 512); 
+            append(in, 1500); 
             return;
          }
          

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/EncodeSizeTest.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -74,7 +74,7 @@
                        
          ServerMessage serverMessage = new ServerMessageImpl();
          
-         serverMessage.decode(buffer);
+         serverMessage.decodeHeadersAndProperties(buffer);
          
          int serverEncodeSize = serverMessage.getEncodeSize();
 

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -23,6 +23,7 @@
 import org.hornetq.core.config.TransportConfiguration;
 import org.hornetq.core.config.impl.ConfigurationImpl;
 import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
 import org.hornetq.core.server.HornetQ;
 import org.hornetq.core.server.HornetQServer;
 import org.hornetq.jms.client.HornetQTextMessage;
@@ -78,57 +79,10 @@
 
       ClientProducer producer = session.createProducer(QUEUE);
 
-      final int numMessages = 10000;
+      final int numMessages = 10;
 
       for (int i = 0; i < numMessages; i++)
-      {
-         /*
-          * Like this:
-          * 
-          * ClientMessage message = producer.createMessage(...);
-          * 
-          * message.putStringProperty("foo", "bar");
-          * 
-          * message.encodeToBuffer(); [this sets the destination from the producer, and encodes]
-          * 
-          * message.getBuffer().writeString("testINVMCoreClient");
-          *          
-          * message.send();
-          * 
-          * OR, another option:
-          * 
-          * Get rid of client producer altogether,
-          * 
-          * Have send direct on the session, and destination must be set explicitly
-          * 
-          * e.g.
-          * 
-          * ClientMessage message = session.createMessage(...)
-          * 
-          * message.putStringProperty("foo", "bar");
-          * 
-          * message.setDestination("foo");
-          * 
-          * message.writeBody();
-          * 
-          * message.getBuffer().writeString("testINVMCoreClient");
-          * 
-          * message.send();
-          * 
-          * 
-          * ORRR
-          * 
-          * we don't write the headers and properties until *AFTER* the body
-          * 
-          * giving this format:
-          * body length
-          * body
-          * headers + properties
-          * 
-          * this means we don't need an encodeToBuffer() method!!
-          * 
-          */
-         
+      {         
          ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
                                                              false,
                                                              0,
@@ -143,8 +97,6 @@
          
          message.setDestination(QUEUE);
          
-         message.encodeToBuffer();
-
          message.getBuffer().writeString("testINVMCoreClient");
 
          producer.send(message);
@@ -159,10 +111,14 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message2 = consumer.receive();
+         
+         HornetQBuffer buffer = message2.getBuffer();
+         
+         assertEquals("testINVMCoreClient", buffer.readString());
 
-         assertEquals("testINVMCoreClient", message2.getBuffer().readString());
-
          message2.acknowledge();
+         
+         log.info("got message " + i);
       }
 
       session.close();

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/MessageDurabilityTest.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -61,7 +61,7 @@
 
       ClientProducer producer = session.createProducer(address);
       producer.send(session.createClientMessage(!durable));
-
+      
       restart();
 
       session.start();

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/opt/SendTest.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -161,7 +161,7 @@
 
       String str = new String(bytes);
       
-      final int warmup = 50000;
+      final int warmup = 500000;
       
       log.info("Warming up");
       
@@ -181,7 +181,7 @@
       
       log.info("** WARMUP DONE");
       
-      final int numMessages = 1000000;
+      final int numMessages = 2000000;
       
       tm = sess.createTextMessage();
 

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/unit/core/message/impl/MessageImplTest.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -63,7 +63,7 @@
          HornetQBuffer buffer = HornetQChannelBuffers.buffer(message.getEncodeSize()); 
          message.encode(buffer);      
          Message message2 = new ClientMessageImpl(false);      
-         message2.decode(buffer);      
+         message2.decodeHeadersAndProperties(buffer);      
          assertMessagesEquivalent(message, message2);
       }
    }

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-20 16:52:13 UTC (rev 8348)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/util/ServiceTestBase.java	2009-11-20 17:41:40 UTC (rev 8349)
@@ -353,7 +353,7 @@
 
    public String getTextMessage(ClientMessage m)
    {
-      m.getBuffer().resetReaderIndex();
+      //m.getBuffer().resetReaderIndex();
       return m.getBuffer().readString();
    }
 



More information about the hornetq-commits mailing list