[hornetq-commits] JBoss hornetq SVN: r8403 - in branches/20-optimisation: src/main/org/hornetq/core/message and 8 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Wed Nov 25 15:11:08 EST 2009


Author: timfox
Date: 2009-11-25 15:11:08 -0500 (Wed, 25 Nov 2009)
New Revision: 8403

Modified:
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
optimisation

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -697,6 +697,7 @@
             log.trace("Setting up flowControlSize to " + message.getPacketSize() + " on message = " + clMessage);
          }
        
+        // log.info("setting flow control size as " + message.getPacketSize());
          clMessage.setFlowControlSize(message.getPacketSize());
 
          consumer.handleMessage(clMessage);

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/Message.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/Message.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -14,12 +14,10 @@
 package org.hornetq.core.message;
 
 import java.io.InputStream;
-import java.io.OutputStream;
 import java.util.Map;
 import java.util.Set;
 
 import org.hornetq.core.buffers.HornetQBuffer;
-import org.hornetq.core.exception.HornetQException;
 import org.hornetq.utils.SimpleString;
 import org.hornetq.utils.TypedProperties;
 
@@ -74,12 +72,8 @@
    
    void decodeFromBuffer(HornetQBuffer buffer);
    
-   HornetQBuffer encodeToBuffer();
-   
    int getEndOfMessagePosition();
    
-  // void setEndOfBodyPosition();
-   
    int getEndOfBodyPosition();
    
    void checkCopy();
@@ -88,7 +82,7 @@
    
    void resetCopied();
    
-//   void resetEndOfBodyPosition();
+   HornetQBuffer getEncodedBuffer();
    
    // Properties
    // ------------------------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -96,6 +96,18 @@
 
    protected HornetQBuffer buffer;
 
+   protected ResetLimitWrappedHornetQBuffer bodyBuffer;
+
+   protected boolean bufferValid;
+
+   private int endOfBodyPosition = -1;
+
+   private int endOfMessagePosition;
+
+   private boolean copied = true;
+
+   private boolean bufferUsed;
+
    // Constructors --------------------------------------------------
 
    protected MessageImpl()
@@ -135,27 +147,38 @@
       createBody(initialMessageBufferSize);
    }
 
-   private void createBody(final int initialMessageBufferSize)
+   /*
+    * Copy constructor
+    */
+   protected MessageImpl(final MessageImpl other)
    {
-      buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
+      messageID = other.getMessageID();
+      destination = other.getDestination();
+      type = other.getType();
+      durable = other.isDurable();
+      expiration = other.getExpiration();
+      timestamp = other.getTimestamp();
+      priority = other.getPriority();
+      properties = new TypedProperties(other.getProperties());
 
-      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
-      buffer.writeByte((byte)0);
+      bufferValid = other.bufferValid;
+      endOfBodyPosition = other.endOfBodyPosition;
+      endOfMessagePosition = other.endOfMessagePosition;
+      copied = other.copied;
 
-      int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
-
-      buffer.setIndex(limit, limit);
-
-      // endOfBodyPosition = limit;
+      // We need to copy the underlying buffer too, since the different messsages thereafter might have different
+      // properties set on them, making their encoding different
+      buffer = other.buffer.copy(0, other.buffer.capacity());
+      buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
    }
 
    // Message implementation ----------------------------------------
 
    public int getEncodeSize()
    {
-      int headersPropsSize = this.getHeadersAndPropertiesEncodeSize();
+      int headersPropsSize = getHeadersAndPropertiesEncodeSize();
 
-      int bodyPos = this.endOfBodyPosition == -1 ? buffer.writerIndex() : this.endOfBodyPosition;
+      int bodyPos = endOfBodyPosition == -1 ? buffer.writerIndex() : endOfBodyPosition;
 
       int bodySize = bodyPos - PacketImpl.PACKET_HEADERS_SIZE - DataConstants.SIZE_INT;
 
@@ -174,29 +197,48 @@
    }
 
    public void encodeHeadersAndProperties(final HornetQBuffer buffer)
-   {      
+   {
       buffer.writeLong(messageID);
-      buffer.writeSimpleString(destination);      
+      buffer.writeSimpleString(destination);
       buffer.writeByte(type);
       buffer.writeBoolean(durable);
       buffer.writeLong(expiration);
       buffer.writeLong(timestamp);
-      buffer.writeByte(priority);      
-      properties.encode(buffer);      
+      buffer.writeByte(priority);
+      properties.encode(buffer);
    }
 
    public void decodeHeadersAndProperties(final HornetQBuffer buffer)
    {
-      messageID = buffer.readLong();      
-      destination = buffer.readSimpleString();      
+      messageID = buffer.readLong();
+      destination = buffer.readSimpleString();
       type = buffer.readByte();
       durable = buffer.readBoolean();
       expiration = buffer.readLong();
       timestamp = buffer.readLong();
-      priority = buffer.readByte();            
+      priority = buffer.readByte();
       properties.decode(buffer);
    }
 
+   public HornetQBuffer getBodyBuffer()
+   {
+      if (bodyBuffer == null)
+      {
+         if (buffer instanceof LargeMessageBuffer == false)
+         {
+            bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
+                                                            buffer,
+                                                            this);
+         }
+         else
+         {
+            return buffer;
+         }
+      }
+
+      return bodyBuffer;
+   }
+
    public long getMessageID()
    {
       return messageID;
@@ -295,6 +337,99 @@
       return map;
    }
 
+   public void decodeFromBuffer(final HornetQBuffer buffer)
+   {
+      this.buffer = buffer;
+
+      decode();
+   }
+
+   public void bodyChanged()
+   {
+      // If the body is changed we must copy the buffer otherwise can affect the previously sent message
+      // which might be in the Netty write queue
+      checkCopy();
+
+      bufferValid = false;
+
+      endOfBodyPosition = -1;
+   }
+
+   public void checkCopy()
+   {
+      if (!copied)
+      {
+         forceCopy();
+
+         copied = true;
+      }
+   }
+
+   public void resetCopied()
+   {
+      copied = false;
+   }
+
+   public int getEndOfMessagePosition()
+   {
+      return endOfMessagePosition;
+   }
+
+   public int getEndOfBodyPosition()
+   {
+      return endOfBodyPosition;
+   }
+
+   // Encode to journal or paging
+   public void encode(final HornetQBuffer buff)
+   {
+      encodeToBuffer();
+
+      buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
+   }
+
+   // Decode from journal or paging
+   public void decode(final HornetQBuffer buff)
+   {
+      int start = buff.readerIndex();
+
+      endOfBodyPosition = buff.readInt();
+
+      endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+
+      int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
+
+      buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
+
+      buffer.writeBytes(buff, start, length);
+
+      decode();
+
+      buff.readerIndex(start + length);
+   }
+
+   public synchronized HornetQBuffer getEncodedBuffer()
+   {
+      HornetQBuffer buff = encodeToBuffer();
+
+      if (bufferUsed)
+      {
+         HornetQBuffer copied = buff.copy(0, buff.capacity());
+
+         copied.setIndex(0, endOfMessagePosition);
+
+         return copied;
+      }
+      else
+      {
+         buffer.setIndex(0, endOfMessagePosition);
+
+         bufferUsed = true;
+
+         return buffer;
+      }
+   }
+
    // Properties
    // ---------------------------------------------------------------------------------------
 
@@ -657,123 +792,68 @@
 
    // Private -------------------------------------------------------
 
-   // Inner classes -------------------------------------------------
-
-   private class DecodingContext implements BodyEncoder
+   // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
+   // many queues - the first caller in this case will actually encode it
+   private synchronized HornetQBuffer encodeToBuffer()
    {
-      private int lastPos = 0;
-
-      public DecodingContext()
+      if (!bufferValid)
       {
-      }
+         if (bufferUsed)
+         {
+            // Cannot use same buffer - must copy
 
-      public void open()
-      {
-      }
+            forceCopy();
+         }
 
-      public void close()
-      {
-      }
-
-      public int encode(ByteBuffer bufferRead) throws HornetQException
-      {
-         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
-         return encode(buffer, bufferRead.capacity());
-      }
-
-      public int encode(HornetQBuffer bufferOut, int size)
-      {
-         bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
-         lastPos += size;
-         return size;
-      }
-   }
-
-   protected ResetLimitWrappedHornetQBuffer bodyBuffer;
-
-   public HornetQBuffer getBodyBuffer()
-   {
-      if (bodyBuffer == null)
-      {
-         if (buffer instanceof LargeMessageBuffer == false)
+         if (endOfBodyPosition == -1)
          {
-            bodyBuffer = new ResetLimitWrappedHornetQBuffer(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT,
-                                                            buffer,
-                                                            this);
+            // Means sending message for first time
+            endOfBodyPosition = buffer.writerIndex();
          }
-         else
-         {
-            return buffer;
-         }
-      }
 
-      return bodyBuffer;
-   }
+         // write it
+         buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
 
-   protected boolean bufferValid;
+         // Position at end of body and skip past the message end position int
+         buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
 
-   private int endOfBodyPosition = -1;
+         encodeHeadersAndProperties(buffer);
 
-   private int endOfMessagePosition;
+         // Write end of message position
 
-   private boolean copied = true;
+         endOfMessagePosition = buffer.writerIndex();
 
-   /*
-    * Copy constructor
-    */
-   protected MessageImpl(final MessageImpl other, final boolean shallow)
-   {     
-      messageID = other.getMessageID();
-      destination = other.getDestination();
-      type = other.getType();
-      durable = other.isDurable();
-      expiration = other.getExpiration();
-      timestamp = other.getTimestamp();
-      priority = other.getPriority();
-      properties = new TypedProperties(other.getProperties());
+         buffer.setInt(endOfBodyPosition, endOfMessagePosition);
 
-      this.bufferValid = other.bufferValid;
-      this.endOfBodyPosition = other.endOfBodyPosition;
-      this.endOfMessagePosition = other.endOfMessagePosition;
-      this.copied = other.copied;
+         bufferValid = true;
+      }
 
-      if (shallow)
-      {
-         this.buffer = other.buffer;
-      }
-      else
-      {
-         // We need to copy the underlying buffer too, since the different messsages thereafter might have different
-         // properties set on them, making their encoding different
-         buffer = other.buffer.copy(0, other.buffer.capacity());
-         buffer.setIndex(other.buffer.readerIndex(), other.buffer.writerIndex());
-      }
+      return buffer;
    }
 
-   public void bodyChanged()
+   private void decode()
    {
-      // If the body is changed we must copy the buffer otherwise can affect the previously sent message
-      // which might be in the Netty write queue
-      checkCopy();
+      endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
 
-      bufferValid = false;
+      buffer.readerIndex(endOfBodyPosition + DataConstants.SIZE_INT);
 
-      this.endOfBodyPosition = -1;
+      decodeHeadersAndProperties(buffer);
+
+      endOfMessagePosition = buffer.readerIndex();
+
+      bufferValid = true;
    }
 
-   public void checkCopy()
+   private void createBody(final int initialMessageBufferSize)
    {
-      if (!copied)
-      {
-         forceCopy();
+      buffer = HornetQBuffers.dynamicBuffer(initialMessageBufferSize);
 
-         copied = true;
-      }
-   }
+      // There's a bug in netty which means a dynamic buffer won't resize until you write a byte
+      buffer.writeByte((byte)0);
 
-   public void resetCopied()
-   {
-      copied = false;
+      int limit = PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT;
+
+      buffer.setIndex(limit, limit);
    }
 
    private void forceCopy()
@@ -782,102 +862,46 @@
 
       buffer = buffer.copy(0, buffer.capacity());
 
-      buffer.setIndex(0, this.endOfBodyPosition);
+      buffer.setIndex(0, endOfBodyPosition);
 
       if (bodyBuffer != null)
       {
          bodyBuffer.setBuffer(buffer);
       }
-   }
 
-   public int getEndOfMessagePosition()
-   {
-      return this.endOfMessagePosition;
+      bufferUsed = false;
    }
 
-   public int getEndOfBodyPosition()
-   {
-      return this.endOfBodyPosition;
-   }
+   // Inner classes -------------------------------------------------
 
-   // Encode to journal or paging
-   public void encode(HornetQBuffer buff)
+   private final class DecodingContext implements BodyEncoder
    {
-      encodeToBuffer();
+      private int lastPos = 0;
 
-      buff.writeBytes(buffer, PacketImpl.PACKET_HEADERS_SIZE, endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE);
-   }
+      public DecodingContext()
+      {
+      }
 
-   // Decode from journal or paging
-   public void decode(HornetQBuffer buff)
-   {
-      int start = buff.readerIndex();
+      public void open()
+      {
+      }
 
-      endOfBodyPosition = buff.readInt();
+      public void close()
+      {
+      }
 
-      endOfMessagePosition = buff.getInt(endOfBodyPosition - PacketImpl.PACKET_HEADERS_SIZE + start);
+      public int encode(final ByteBuffer bufferRead) throws HornetQException
+      {
+         HornetQBuffer buffer = HornetQBuffers.wrappedBuffer(bufferRead);
+         return encode(buffer, bufferRead.capacity());
+      }
 
-      int length = endOfMessagePosition - PacketImpl.PACKET_HEADERS_SIZE;
-
-      buffer.setIndex(0, PacketImpl.PACKET_HEADERS_SIZE);
-
-      buffer.writeBytes(buff, start, length);
-
-      decode();
-
-      buff.readerIndex(start + length);
-   }
-
-   // This must be synchronized as it can be called concurrently id the message is being delivered concurently to
-   // many queues - the first caller in this case will actually encode it
-   public synchronized HornetQBuffer encodeToBuffer()
-   {      
-      if (!bufferValid)
+      public int encode(final HornetQBuffer bufferOut, final int size)
       {
-         if (endOfBodyPosition == -1)
-         {
-            // Means sending message for first time
-            endOfBodyPosition = buffer.writerIndex();
-         }
-
-         // write it
-         buffer.setInt(PacketImpl.PACKET_HEADERS_SIZE, endOfBodyPosition);
-
-         // Position at end of body and skip past the message end position int
-         buffer.setIndex(0, endOfBodyPosition + DataConstants.SIZE_INT);
-
-         encodeHeadersAndProperties(buffer);
-
-         // Write end of message position
-
-         this.endOfMessagePosition = buffer.writerIndex();
-
-         buffer.setInt(endOfBodyPosition, endOfMessagePosition);
-
-         this.bufferValid = true;
+         bufferOut.writeBytes(getWholeBuffer(), lastPos, size);
+         lastPos += size;
+         return size;
       }
-
-      return buffer;
    }
 
-   public void decode()
-   {
-      this.endOfBodyPosition = buffer.getInt(PacketImpl.PACKET_HEADERS_SIZE);
-
-      buffer.readerIndex(this.endOfBodyPosition + DataConstants.SIZE_INT);
-
-      this.decodeHeadersAndProperties(buffer);
-
-      this.endOfMessagePosition = buffer.readerIndex();
-
-      this.bufferValid = true;
-   }
-
-   public void decodeFromBuffer(HornetQBuffer buffer)
-   {
-      this.buffer = buffer;
-
-      decode();
-   }
-
 }

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/LargeServerMessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -74,7 +74,7 @@
     */
    private LargeServerMessageImpl(final LargeServerMessageImpl copy, final SequentialFile fileCopy, final long newID)
    {
-      super(copy, true);
+      super(copy);
       this.linkMessage = copy;
       storageManager = copy.storageManager;
       file = fileCopy;

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -235,7 +235,7 @@
       size = buffer.readerIndex();
    }
 
-   public final int getPacketSize()
+   public int getPacketSize()
    {
       if (size == -1)
       {

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -69,19 +69,19 @@
    @Override
    public HornetQBuffer encode(final RemotingConnection connection)
    {
-      HornetQBuffer orig = message.encodeToBuffer();
+      HornetQBuffer buffer = message.getEncodedBuffer();
       
-      //Now we must copy this buffer, before sending to Netty, as it could be concurrently delivered to many consumers
+      //Sanity check
+      if (buffer.writerIndex() != message.getEndOfMessagePosition())
+      {
+         throw new IllegalStateException("Wrong encode position");
+      }
       
-      HornetQBuffer buffer = orig.copy(0, orig.capacity());
-
-      buffer.setIndex(0, message.getEndOfMessagePosition());
-        
-      buffer.writeLong(consumerID);
+      buffer.writeLong(consumerID);      
       buffer.writeInt(deliveryCount);
-      
+                 
       size = buffer.writerIndex();
-                       
+                
       //Write standard headers
       
       int len = size - DataConstants.SIZE_INT;
@@ -90,7 +90,7 @@
       buffer.setLong(DataConstants.SIZE_INT + DataConstants.SIZE_BYTE, channelID);
       
       //Position reader for reading by Netty
-      buffer.readerIndex(0);
+      buffer.setIndex(0, size);
       
       return buffer;
    }
@@ -107,7 +107,7 @@
       deliveryCount = buffer.readInt();  
       
       size = buffer.readerIndex();
-      
+
       //Need to position buffer for reading
       
       buffer.setIndex(PacketImpl.PACKET_HEADERS_SIZE + DataConstants.SIZE_INT, message.getEndOfBodyPosition());    

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -72,19 +72,16 @@
    @Override
    public HornetQBuffer encode(final RemotingConnection connection)
    {
-      //this isn't right when forwarding a message that has been already received - because writerindex will
-      //be pointing at end of message
+      HornetQBuffer buffer = message.getEncodedBuffer();
       
-      HornetQBuffer orig = message.encodeToBuffer();
+      //Sanity check
+      if (buffer.writerIndex() != message.getEndOfMessagePosition())
+      {
+         throw new IllegalStateException("Wrong encode position");
+      }
       
-      //FIXME - for now we are copying due to concurrent sends to many bridges on the server
-      
-      HornetQBuffer buffer = orig.copy(0, orig.capacity());
-      
-      buffer.setIndex(0, message.getEndOfMessagePosition());
-      
       buffer.writeBoolean(requiresResponse);
-      
+  
       size = buffer.writerIndex();
                        
       //Write standard headers
@@ -108,8 +105,12 @@
       //Buffer comes in after having read standard headers and positioned at Beginning of body part
       
       message.decodeFromBuffer(buffer);
-      
-      requiresResponse = buffer.readBoolean();     
+
+      int ri = buffer.readerIndex();
+
+      requiresResponse = buffer.readBoolean(); 
+            
+      buffer.readerIndex(ri);
    }
    
    // Private -------------------------------------------------------

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/ServerMessage.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -43,8 +43,6 @@
 
    ServerMessage copy();
    
-   ServerMessage shallowCopy();
-
    int getMemoryEstimate();
 
    int getRefCount();

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -411,12 +411,12 @@
 
          if (flowRecord != null)
          {
-            // We make a shallow copy of the message, then we strip out the unwanted routing id headers and leave
+            // We make a  copy of the message, then we strip out the unwanted routing id headers and leave
             // only
             // the one pertinent for the destination node - this is important since different queues on different
             // nodes could have same queue ids
             // Note we must copy since same message may get routed to other nodes which require different headers
-            message = message.shallowCopy();
+            message = message.copy();
 
             // TODO - we can optimise this
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -185,9 +185,10 @@
 
    public HandleStatus handle(final MessageReference ref) throws Exception
    {
+      //log.info("handling message");
       if (availableCredits != null && availableCredits.get() <= 0)
       {
-
+        // log.info("busy");
          return HandleStatus.BUSY;
       }
 
@@ -417,7 +418,7 @@
 
    public void receiveCredits(final int credits) throws Exception
    {
-
+  //    log.info("Receiving credits " + credits);
       if (credits == -1)
       {
          // No flow control
@@ -591,8 +592,11 @@
 
       if (availableCredits != null)
       {
+        //log.info("Subtracting credits " + packet.getPacketSize());
          availableCredits.addAndGet(-packet.getPacketSize());
       }
+      
+    //  log.info("delivered message");
 
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerMessageImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -68,9 +68,9 @@
    /*
     * Copy constructor
     */
-   protected ServerMessageImpl(final ServerMessageImpl other, final boolean shallow)
+   protected ServerMessageImpl(final ServerMessageImpl other)
    {
-      super(other, shallow);           
+      super(other);           
    }
 
    public void setMessageID(final long id)
@@ -164,7 +164,7 @@
 
    public ServerMessage copy(final long newID)
    {
-      ServerMessage m = new ServerMessageImpl(this, false);
+      ServerMessage m = new ServerMessageImpl(this);
 
       m.setMessageID(newID);
 
@@ -173,14 +173,9 @@
 
    public ServerMessage copy()
    {
-      return new ServerMessageImpl(this, false);
+      return new ServerMessageImpl(this);
    }
    
-   public ServerMessage shallowCopy()
-   {
-      return new ServerMessageImpl(this, true);
-   }
-
    public ServerMessage makeCopyForExpiryOrDLA(final long newID, final boolean expiry) throws Exception
    {
       /*

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -1574,6 +1574,8 @@
       final CreditManagerHolder holder = this.getCreditManagerHolder(address);
 
       int credits = packet.getCredits();
+      
+      //log.info("requesting credits " + credits);
 
       int gotCredits = holder.manager.acquireCredits(credits, new CreditsAvailableRunnable()
       {
@@ -1594,6 +1596,8 @@
             }
          }
       });
+      
+      //log.info("got credits " + gotCredits);
 
       if (gotCredits > 0)
       {

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -71,7 +71,7 @@
       server.start();
 
       ClientSessionFactory sf = new ClientSessionFactoryImpl(new TransportConfiguration(connectorFactoryClassName));
-      sf.setConsumerWindowSize(0);
+     // sf.setConsumerWindowSize(0);
 
       ClientSession session = sf.createSession(false, true, true);
 
@@ -103,7 +103,7 @@
       }
 
       log.info("sent messages");
-
+      
       ClientConsumer consumer = session.createConsumer(QUEUE);
       
       session.start();
@@ -112,6 +112,8 @@
       {
          ClientMessage message2 = consumer.receive();
          
+        // log.info("got message " + i);
+         
          HornetQBuffer buffer = message2.getBodyBuffer();
          
          assertEquals("testINVMCoreClient", buffer.readString());

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/ProducerFlowControlTest.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -67,7 +67,7 @@
 
    public void testFlowControlMultipleConsumers() throws Exception
    {
-      testFlowControl(1000, 500, 10 * 1024, 1024, 1024, 1024, 5, 1, 0, false);
+      testFlowControl(1000, 500, -1, 1024, 1024, 1024, 5, 1, 0, false);
    }
 
    public void testFlowControlZeroConsumerWindowSize() throws Exception
@@ -134,7 +134,7 @@
    {
       testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 1, 1, 0, true, 1000, true);
    }
-   
+
    public void testFlowControlLargeMessages7() throws Exception
    {
       testFlowControl(1000, 10000, 100 * 1024, 1024, 1024, 1024, 2, 2, 0, true, 1000, true);
@@ -212,7 +212,7 @@
       {
          session.createQueue(address, new SimpleString(queueName + i), null, false);
       }
-      
+
       final byte[] bytes = RandomUtil.randomBytes(messageSize);
 
       class MyHandler implements MessageHandler
@@ -230,7 +230,7 @@
                byte[] bytesRead = new byte[messageSize];
 
                message.getBodyBuffer().readBytes(bytesRead);
-               
+
                assertEqualsByteArrays(bytes, bytesRead);
 
                message.acknowledge();
@@ -244,13 +244,14 @@
                {
                   Thread.sleep(consumerDelay);
                }
+
             }
             catch (Exception e)
             {
                log.error("Failed to handle message", e);
-                              
+
                this.exception = e;
-               
+
                latch.countDown();
             }
          }
@@ -285,12 +286,10 @@
 
       long start = System.currentTimeMillis();
 
-      
-
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createClientMessage(false);
-         
+
          message.getBodyBuffer().writeBytes(bytes);
 
          for (int j = 0; j < numProducers; j++)
@@ -389,7 +388,7 @@
       byte[] bytes = new byte[0];
 
       ClientMessage message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       producer.send(message);
@@ -421,7 +420,7 @@
       t.start();
 
       ClientMessage message2 = session.createClientMessage(false);
-      
+
       message2.getBodyBuffer().writeBytes(bytes);
 
       producer2.send(message2);
@@ -474,7 +473,7 @@
       byte[] bytes = new byte[0];
 
       ClientMessage message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       producer.send(message);
@@ -504,7 +503,7 @@
       assertEquals(1, waiting);
 
       message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       producer2.send(message);
@@ -552,7 +551,7 @@
       byte[] bytes = new byte[0];
 
       ClientMessage message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       producer.send(message);
@@ -600,7 +599,7 @@
       session.close();
 
       message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       producer3.send(message);
@@ -648,7 +647,7 @@
       byte[] bytes = new byte[2000];
 
       ClientMessage message = session.createClientMessage(false);
-      
+
       message.getBodyBuffer().writeBytes(bytes);
 
       final AtomicBoolean closed = new AtomicBoolean(false);
@@ -721,7 +720,7 @@
       for (int i = 0; i < numMessages; i++)
       {
          ClientMessage message = session.createClientMessage(false);
-         
+
          message.getBodyBuffer().writeBytes(bytes);
 
          producer.send(message);
@@ -738,4 +737,72 @@
       server.stop();
    }
 
+   //Not technically a flow control test, but what the hell
+   public void testMultipleConsumers() throws Exception
+   {
+      HornetQServer server = createServer(false, isNetty());
+
+      server.start();
+
+      ClientSessionFactory sf = createFactory(isNetty());
+
+      final ClientSession session = sf.createSession(false, true, true, true);
+
+      session.createQueue("address", "queue1", null, false);
+      session.createQueue("address", "queue2", null, false);
+      session.createQueue("address", "queue3", null, false);
+      session.createQueue("address", "queue4", null, false);
+      session.createQueue("address", "queue5", null, false);
+
+      ClientConsumer consumer1 = session.createConsumer("queue1");
+      ClientConsumer consumer2 = session.createConsumer("queue2");
+      ClientConsumer consumer3 = session.createConsumer("queue3");
+      ClientConsumer consumer4 = session.createConsumer("queue4");
+      ClientConsumer consumer5 = session.createConsumer("queue5");
+
+      ClientProducer producer = session.createProducer("address");
+
+      byte[] bytes = new byte[2000];
+
+      ClientMessage message = session.createClientMessage(false);
+
+      message.getBodyBuffer().writeBytes(bytes);
+
+      final int numMessages = 1000;
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         producer.send(message);
+      }
+      
+      session.start();
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg = consumer1.receive(1000);
+
+         assertNotNull(msg);
+
+         msg = consumer2.receive(5000);
+
+         assertNotNull(msg);
+
+         msg = consumer3.receive(5000);
+
+         assertNotNull(msg);
+
+         msg = consumer4.receive(5000);
+
+         assertNotNull(msg);
+
+         msg = consumer5.receive(5000);
+
+         assertNotNull(msg);
+      }
+      
+      session.close();
+
+      server.stop();
+   }
+
 }

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-25 11:11:45 UTC (rev 8402)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/cluster/distribution/ClusterTestBase.java	2009-11-25 20:11:08 UTC (rev 8403)
@@ -88,14 +88,13 @@
       checkFreePort(PORTS);
 
       clearData();
-      
+
       consumers = new ConsumerHolder[MAX_CONSUMERS];
 
       servers = new HornetQServer[MAX_SERVERS];
 
       sfs = new ClientSessionFactory[MAX_SERVERS];
 
-      
    }
 
    @Override
@@ -207,15 +206,15 @@
                                   final int consumerCount,
                                   final boolean local) throws Exception
    {
-//      System.out.println("waiting for bindings on node " + node +
-//                         " address " +
-//                         address +
-//                         " count " +
-//                         count +
-//                         " consumerCount " +
-//                         consumerCount +
-//                         " local " +
-//                         local);
+      // System.out.println("waiting for bindings on node " + node +
+      // " address " +
+      // address +
+      // " count " +
+      // count +
+      // " consumerCount " +
+      // consumerCount +
+      // " local " +
+      // local);
       HornetQServer server = this.servers[node];
 
       if (server == null)
@@ -442,7 +441,8 @@
 
             producer.send(message);
          }
-      } finally
+      }
+      finally
       {
          session.close();
       }
@@ -676,7 +676,9 @@
             if (j != (Integer)(message.getObjectProperty(COUNT_PROP)))
             {
                outOfOrder = true;
-               System.out.println("Message j=" + j + " was received out of order = " + message.getObjectProperty(COUNT_PROP));
+               System.out.println("Message j=" + j +
+                                  " was received out of order = " +
+                                  message.getObjectProperty(COUNT_PROP));
             }
          }
       }
@@ -823,7 +825,7 @@
                   message.acknowledge();
                }
 
-               log.info("consumer " + consumerIDs[i] +" returns " + count);
+               log.info("consumer " + consumerIDs[i] + " returns " + count);
             }
             else
             {
@@ -1165,8 +1167,6 @@
 
       Map<String, Object> params = generateParams(node, netty);
 
-
-
       if (netty)
       {
          TransportConfiguration nettytc = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
@@ -1175,7 +1175,7 @@
       else
       {
          TransportConfiguration invmtc = new TransportConfiguration(INVM_ACCEPTOR_FACTORY, params);
-         configuration.getAcceptorConfigurations().add(invmtc);  
+         configuration.getAcceptorConfigurations().add(invmtc);
       }
 
       HornetQServer server;



More information about the hornetq-commits mailing list