[hornetq-commits] JBoss hornetq SVN: r8321 - in branches/20-optimisation: src/main/org/hornetq/core/client/impl and 9 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Thu Nov 19 06:23:15 EST 2009


Author: timfox
Date: 2009-11-19 06:23:14 -0500 (Thu, 19 Nov 2009)
New Revision: 8321

Modified:
   branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java
   branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
   branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
   branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
   branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
   branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
   branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
   branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java
   branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
Log:
optimisation

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/ClientSession.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -98,20 +98,10 @@
 
    ClientProducer createProducer(SimpleString address, int rate) throws HornetQException;
 
-   ClientProducer createProducer(SimpleString address,
-                                 int maxRate,
-                                 boolean blockOnNonPersistentSend,
-                                 boolean blockOnPersistentSend) throws HornetQException;
-
    ClientProducer createProducer(String address) throws HornetQException;
 
    ClientProducer createProducer(String address, int rate) throws HornetQException;
 
-   ClientProducer createProducer(String address,
-                                 int maxRate,
-                                 boolean blockOnNonPersistentSend,
-                                 boolean blockOnPersistentSend) throws HornetQException;
-
    SessionQueueQueryResponseMessage queueQuery(SimpleString queueName) throws HornetQException;
 
    SessionBindingQueryResponseMessage bindingQuery(SimpleString address) throws HornetQException;

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientProducerCreditsImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -66,8 +66,6 @@
 
       checkCredits(credits);
 
-      log.info("trying to acquire " + credits);
-      
       semaphore.acquire(credits);
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -438,7 +438,7 @@
 
    public ClientProducer createProducer(final SimpleString address, final int maxRate) throws HornetQException
    {
-      return createProducer(address, maxRate, blockOnNonPersistentSend, blockOnPersistentSend);
+      return internalCreateProducer(address, maxRate);
    }
 
    public ClientProducer createProducer(final String address, final int rate) throws HornetQException
@@ -446,22 +446,6 @@
       return createProducer(toSimpleString(address), rate);
    }
 
-   public ClientProducer createProducer(final SimpleString address,
-                                        final int maxRate,
-                                        final boolean blockOnNonPersistentSend,
-                                        final boolean blockOnPersistentSend) throws HornetQException
-   {
-      return internalCreateProducer(address, maxRate, blockOnNonPersistentSend, blockOnPersistentSend);
-   }
-
-   public ClientProducer createProducer(final String address,
-                                        final int maxRate,
-                                        final boolean blockOnNonPersistentSend,
-                                        final boolean blockOnPersistentSend) throws HornetQException
-   {
-      return createProducer(toSimpleString(address), maxRate, blockOnNonPersistentSend, blockOnPersistentSend);
-   }
-
    public XAResource getXAResource()
    {
       return this;
@@ -1397,9 +1381,7 @@
    }
 
    private ClientProducer internalCreateProducer(final SimpleString address,
-                                                 final int maxRate,
-                                                 final boolean blockOnNonPersistentSend,
-                                                 final boolean blockOnPersistentSend) throws HornetQException
+                                                 final int maxRate) throws HornetQException
    {
       checkClosed();
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/client/impl/DelegatingSession.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -228,14 +228,6 @@
       return session.createProducer();
    }
 
-   public ClientProducer createProducer(SimpleString address,
-                                        int maxRate,
-                                        boolean blockOnNonPersistentSend,
-                                        boolean blockOnPersistentSend) throws HornetQException
-   {
-      return session.createProducer(address, maxRate, blockOnNonPersistentSend, blockOnPersistentSend);
-   }
-
    public ClientProducer createProducer(SimpleString address, int rate) throws HornetQException
    {
       return session.createProducer(address, rate);
@@ -246,14 +238,6 @@
       return session.createProducer(address);
    }
 
-   public ClientProducer createProducer(String address,
-                                        int maxRate,
-                                        boolean blockOnNonPersistentSend,
-                                        boolean blockOnPersistentSend) throws HornetQException
-   {
-      return session.createProducer(address, maxRate, blockOnNonPersistentSend, blockOnPersistentSend);
-   }
-
    public ClientProducer createProducer(String address, int rate) throws HornetQException
    {
       return session.createProducer(address, rate);

Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/AbstractSequentialFactory.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -52,7 +52,7 @@
                                     final boolean logRates)
    {
       this.journalDir = journalDir;
-      log.info("** buffered?" + buffered);
+ 
       if (buffered)
       {
          timedBuffer = new TimedBuffer(bufferSize, bufferTimeout, flushOnSync, logRates);

Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/JournalImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -926,8 +926,6 @@
 
          callback = getSyncCallback(sync);
          
-        // log.info("callback is " + callback);
-
          lockAppend.lock();
          try
          {
@@ -957,7 +955,6 @@
       if (callback != null)
       {
          callback.waitCompletion();
-        // log.info("waited completion");
       }
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/NIOSequentialFile.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -139,14 +139,14 @@
       try
       {
          int bytesRead = channel.read(bytes);
-         
+
          if (callback != null)
          {
             callback.done();
          }
-         
+
          bytes.flip();
-         
+
          return bytesRead;
       }
       catch (Exception e)
@@ -160,16 +160,12 @@
       }
 
    }
-   
-   public static AtomicInteger numSyncs = new AtomicInteger(0);
 
    public void sync() throws Exception
    {
       if (channel != null)
       {
          channel.force(false);
-         
-         numSyncs.incrementAndGet();
       }
    }
 
@@ -208,7 +204,7 @@
       {
          throw new NullPointerException("callback parameter need to be set");
       }
-      
+
       try
       {
          internalWrite(bytes, sync, callback);
@@ -233,8 +229,6 @@
     */
    private void internalWrite(final ByteBuffer bytes, final boolean sync, final IOCompletion callback) throws Exception
    {
-      //log.info("writing " + bytes.limit() + " bytes");
-      
       position.addAndGet(bytes.limit());
 
       channel.write(bytes);

Modified: branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/journal/impl/TimedBuffer.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -95,8 +95,7 @@
    // Public --------------------------------------------------------
 
    public TimedBuffer(final int size, final long timeout, final boolean flushOnSync, final boolean logRates)
-   {
-      log.info("creating timed buffer, log rates is " + logRates);
+   {     
       bufferSize = 490 * 1024;
       this.logRates = logRates;
       if (logRates)
@@ -230,8 +229,6 @@
 
    public synchronized void addBytes(final byte[] bytes, final boolean sync, final IOCompletion callback)
    {
-     //  log.info("timedbuffer addbytes, " + bytes.length + " sync " + sync);
-
       if (buffer.writerIndex() == 0)
       {
          // Resume latch
@@ -253,17 +250,12 @@
 
          if (flushOnSync)
          {
-            log.info("flushing on sync record added");
-
             flush();
          }
       }
       
-      //log.info("buffer writer index is now " + buffer.writerIndex());
-
       if (buffer.writerIndex() == bufferLimit)
       {
-         log.info("flushing because reached buffer limit");
          flush();
       }
    }
@@ -272,8 +264,6 @@
    {
       if (buffer.writerIndex() > 0)
       {
-         //log.info("actually flushing");
-         
          latchTimer.up();
 
          int pos = buffer.writerIndex();
@@ -321,7 +311,6 @@
          {
             if (bufferObserver != null)
             {
-                //log.info("flushing on timer");
                 flush();
             }
          }
@@ -352,20 +341,15 @@
             {
                double rate = 1000 * ((double)bytesFlushed) / (now - lastExecution);
                log.info("Write rate = " + rate + " bytes / sec or " + (long)(rate / (1024 * 1024)) + " MiB / sec");
-               double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);
-               double numSyncs = 1000 * ((double)NIOSequentialFile.numSyncs.get()) / (now - lastExecution);
+               double flushRate = 1000 * ((double)flushesDone) / (now - lastExecution);              
                log.info("Flush rate = " + flushRate + " flushes / sec");
-               log.info("numSyncs " + numSyncs);
             }
 
             lastExecution = now;
 
             bytesFlushed = 0;
 
-            flushesDone = 0;
-
-            NIOSequentialFile.numSyncs.set(0);
-
+            flushesDone = 0;           
          }
       }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/message/impl/MessageImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -145,8 +145,6 @@
    {
 //      return getHeadersAndPropertiesEncodeSize() + SIZE_INT + getBodySize();
       
-      //log.info("getting encode size, writer index is " + buffer.writerIndex(), new Exception());
-      
       return buffer.writerIndex() - PACKET_HEADERS_SIZE;
    }
   
@@ -167,27 +165,17 @@
       properties.encode(buffer);
    }
    
-//   public void encodeBody(final HornetQBuffer buffer)
-//   {
-//      HornetQBuffer localBody = getBuffer();
-//      buffer.writeBytes(localBody.array(), 0, localBody.writerIndex());
-//   }
-
    public void decode(final HornetQBuffer buffer)
    {
       decodeHeadersAndProperties(buffer);
 
       this.buffer = buffer;
    }
-   
- 
-
+    
    public void decodeHeadersAndProperties(final HornetQBuffer buffer)
    {
-      messageID = buffer.readLong();
-      log.info("message id is " + messageID);
-      destination = buffer.readSimpleString();
-      log.info("destination is " + destination);
+      messageID = buffer.readLong();     
+      destination = buffer.readSimpleString();     
       type = buffer.readByte();
       durable = buffer.readBoolean();
       expiration = buffer.readLong();

Modified: branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -451,8 +451,6 @@
          throw new HornetQException(HornetQException.ILLEGAL_STATE, "MessageId was not assigned to Message");
       }
       
-     // log.info("calling store msg");
-
       // Note that we don't sync, the add reference that comes immediately after will sync if appropriate
 
       if (message.isLargeMessage())
@@ -469,26 +467,22 @@
    }
 
    public void storeReference(final long queueID, final long messageID, final boolean last) throws Exception
-   {
-      //log.info("calling store reference " + syncNonTransactional);
+   {     
       messageJournal.appendUpdateRecord(messageID, ADD_REF, new RefEncoding(queueID), last && syncNonTransactional);
    }
 
    public void storeAcknowledge(final long queueID, final long messageID) throws Exception
-   {
-      log.info("calling acknowledge");
+   {      
       messageJournal.appendUpdateRecord(messageID, ACKNOWLEDGE_REF, new RefEncoding(queueID), syncNonTransactional);
    }
 
    public void deleteMessage(final long messageID) throws Exception
-   {
-      log.info("calling delete message");
+   {     
       messageJournal.appendDeleteRecord(messageID, syncNonTransactional);
    }
 
    public void updateScheduledDeliveryTime(final MessageReference ref) throws Exception
-   {
-      log.info("calling update sched delivery");
+   {      
       ScheduledDeliveryEncoding encoding = new ScheduledDeliveryEncoding(ref.getScheduledDeliveryTime(), ref.getQueue()
                                                                                                             .getID());
 
@@ -499,16 +493,14 @@
    }
 
    public void storeDuplicateID(final SimpleString address, final byte[] duplID, final long recordID) throws Exception
-   {
-      log.info("calling store dupl id");
+   {      
       DuplicateIDEncoding encoding = new DuplicateIDEncoding(address, duplID);
 
       messageJournal.appendAddRecord(recordID, DUPLICATE_ID, encoding, syncNonTransactional);
    }
 
    public void deleteDuplicateID(long recordID) throws Exception
-   {
-      log.info("calling delete dupl id");
+   {      
       messageJournal.appendDeleteRecord(recordID, syncNonTransactional);
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/AbstractBufferHandler.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -29,8 +29,6 @@
 
    public int isReadyToHandle(final HornetQBuffer buffer)
    {
-      log.info("calling buffer is ready to handle");
-
       if (buffer.readableBytes() < DataConstants.SIZE_INT)
       {
          return -1;
@@ -38,8 +36,6 @@
 
       int length = buffer.readInt();
 
-      log.info("length is " + length + " readable bytes is " + buffer.readableBytes());
-
       if (buffer.readableBytes() < length)
       {
          return -1;

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -146,8 +146,6 @@
    // This must never called by more than one thread concurrently
    public void send(final Packet packet, final boolean flush)
    {      
-      log.info("Sending packet on channel " + packet);
-      
       synchronized (sendLock)
       {
          packet.setChannelID(id);

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -157,8 +157,6 @@
    {
       final byte packetType = in.readByte();
       
-      log.info("Packet type is " + packetType);
-
       Packet packet;
 
       switch (packetType)

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/RemotingConnectionImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -330,8 +330,6 @@
 
    public void bufferReceived(final Object connectionID, final HornetQBuffer buffer)
    {                 
-      log.info("decoding packet " + buffer);
-      
       final Packet packet = decoder.decode(buffer);
       
       if (executor == null || packet.getType() == PacketImpl.PING)

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionReceiveMessage.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -101,23 +101,13 @@
    @Override
    public HornetQBuffer encode(final RemotingConnection connection)
    {
-      log.info("Encoding session send message, consumer id is " + consumerID + " delivery count is " + deliveryCount);
-      
       HornetQBuffer buffer = serverMessage.getBuffer();
       
-      log.info("** DELIVERING writer index is " + buffer.writerIndex());
-                  
-      
-      log.info("** WRITING CONSUMER ID AT POS " + buffer.writerIndex());
-      
       buffer.writeLong(consumerID);
       buffer.writeInt(deliveryCount);
-
-      // At this point, the rest of the message has already been encoded into the buffer
+      
       size = buffer.writerIndex();
       
-      log.info("size is " + size);
-
       buffer.setIndex(0, 0);
 
       // The standard header fields
@@ -126,6 +116,10 @@
       buffer.writeInt(len);
       buffer.writeByte(type);
       buffer.writeLong(channelID);
+                      
+      //And fill in the message id, since this was set on the server side so won't already be in the buffer
+      buffer.setIndex(0, buffer.writerIndex() + DataConstants.SIZE_INT);
+      buffer.writeLong(serverMessage.getMessageID());
       
       buffer.setIndex(0, size);
 
@@ -137,7 +131,7 @@
       clientMessage = new ClientMessageImpl();
       
       //fast forward past the size byte
-      buffer.readInt();
+      int size = buffer.readInt();
       
       clientMessage.decode(buffer);
       
@@ -147,26 +141,16 @@
       
       //Now we need to fast forward past the body part
       
-      int size = buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
+      //int size = buffer.readInt(PacketImpl.PACKET_HEADERS_SIZE);
       
       buffer.setIndex(size, buffer.writerIndex());
                   
-      log.info("decoded receive message");
-      
-      log.info("*** READING CONSUMER ID AT POS " + buffer.readerIndex());
-      
       consumerID = buffer.readLong();
       
-      log.info("consumer id is " + consumerID);
-
       deliveryCount = buffer.readInt();
       
-      log.info("delivery count is " + deliveryCount);
-      
       clientMessage.setDeliveryCount(deliveryCount);
       
-      //clientMessage.getBuffer().resetReaderIndex();
-      
       //Reset buffer to beginning of body
       buffer.setIndex(bodyBeginning, buffer.writerIndex());
       

Modified: branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/remoting/impl/wireformat/SessionSendMessage.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -80,18 +80,15 @@
    @Override
    public HornetQBuffer encode(final RemotingConnection connection)
    {
-      log.info("Encoding session send message");
-      
       HornetQBuffer buffer = sentMessage.getBuffer();
       
-      log.info("ENCODE ** size is " + buffer.writerIndex());
+      int afterBody = buffer.writerIndex();
       
       buffer.writeBoolean(requiresResponse);
 
       // At this point, the rest of the message has already been encoded into the buffer
       size = buffer.writerIndex();
             
-
       buffer.setIndex(0, 0);
 
       // The standard header fields
@@ -100,8 +97,10 @@
       buffer.writeInt(len);
       buffer.writeByte(type);
       buffer.writeLong(channelID);
-      buffer.writeInt(size);
       
+      //This last byte we write marks the position of the end of the message body where we store extra data for the packet
+      buffer.writeInt(afterBody);
+      
       buffer.setIndex(0, size);
 
       return buffer;
@@ -114,22 +113,17 @@
 
       sentMessage = receivedMessage;
       
-      //fast forward past the size byte
-      buffer.readInt();
+      //Read the position of after the body where extra data is stored
+      int afterBody = buffer.readInt();
 
-      log.info("********** server message ");
-                 
       receivedMessage.decode(buffer);
-            
-      receivedMessage.getBuffer().resetReaderIndex();
-
-      requiresResponse = buffer.readBoolean();
       
-      //reset the writer index back one boolean since when we deliver to the client we will write the extra fields on here
+      buffer.setIndex(afterBody, buffer.writerIndex());
       
-      //buffer.setIndex(0, buffer.writerIndex() - DataConstants.SIZE_BOOLEAN);
-      
-      log.info("SEND MESSAGE DECODE, WRITER INDEX IS " + buffer.writerIndex());
+      requiresResponse = buffer.readBoolean();   
+            
+      receivedMessage.getBuffer().resetReaderIndex();
+             
    }
 
    public int getRequiredBufferSize()

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -210,7 +210,7 @@
          }
 
          final ServerMessage message = ref.getMessage();
-
+         
          if (filter != null && !filter.match(message))
          {
             return HandleStatus.NO_MATCH;
@@ -449,7 +449,7 @@
 
       // Acknowledge acknowledges all refs delivered by the consumer up to and including the one explicitly
       // acknowledged
-
+      
       MessageReference ref;
       do
       {
@@ -584,8 +584,6 @@
          availableCredits.addAndGet(-packet.getRequiredBufferSize());
       }
 
-      log.info("*** delivering message to client");
-      
       channel.send(packet);
    }
 

Modified: branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -1455,20 +1455,18 @@
 
    public void handleSend(final SessionSendMessage packet)
    {
-      log.info("Got message on server");
-      
       Packet response = null;
 
       ServerMessage message = packet.getServerMessage();
       
-      log.info("server message is " + message);
-
+      //log.info("Got msg on server");
+      
       try
       {
          long id = storageManager.generateUniqueID();
 
          message.setMessageID(id);
-
+         
          if (message.getDestination().equals(managementAddress))
          {
             // It's a management message
@@ -1479,7 +1477,10 @@
          {
             send(message);
          }
-
+         
+         //log.info("requires response "+ packet.isRequiresResponse());
+         
+         
          if (packet.isRequiresResponse())
          {
             response = new NullResponseMessage();
@@ -1544,7 +1545,7 @@
 
             currentLargeMessage = null;
          }
-
+         
          if (packet.isRequiresResponse())
          {
             response = new NullResponseMessage();

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/HornetQFrameDecoder.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -74,8 +74,6 @@
       
       newBuffer.readInt();
       
-     // log.info("Now ri, wi " + newBuffer.readerIndex() + ", " + newBuffer.writerIndex());
-      
       return newBuffer;
    }
 }

Modified: branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/integration/transports/netty/NettyConnection.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -119,8 +119,6 @@
 
    public void write(final HornetQBuffer buffer, final boolean flush)
    {
-      log.info("writing buffer " + buffer.readerIndex() + " writer " + buffer.writerIndex());
-      
       ChannelFuture future = channel.write(buffer.getUnderlyingBuffer());
 
       if (flush)

Modified: branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java
===================================================================
--- branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/src/main/org/hornetq/jms/client/HornetQTextMessage.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -55,14 +55,6 @@
 
    // Constructors --------------------------------------------------
    
-   /*
-    * This constructor is used to construct messages prior to sending
-    */
-//   public HornetQTextMessage()
-//   {
-//      super(HornetQTextMessage.TYPE);
-//   }
-
    public HornetQTextMessage(final ClientSession session)
    {
       super(HornetQTextMessage.TYPE, session);
@@ -113,8 +105,12 @@
 
    // HornetQRAMessage override -----------------------------------------
    
+   private SimpleString dest = new SimpleString("jms.queue.test_queue");
+   
    public void doBeforeSend() throws Exception
-   {
+   {   
+      message.setDestination(dest);
+      
       message.encodeToBuffer();
       
       message.getBuffer().writeNullableSimpleString(text);

Modified: branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java
===================================================================
--- branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-19 03:01:55 UTC (rev 8320)
+++ branches/20-optimisation/tests/src/org/hornetq/tests/integration/client/CoreClientTest.java	2009-11-19 11:23:14 UTC (rev 8321)
@@ -78,10 +78,45 @@
 
       ClientProducer producer = session.createProducer(QUEUE);
 
-      final int numMessages = 1;
+      final int numMessages = 10000;
 
       for (int i = 0; i < numMessages; i++)
       {
+         /*
+          * Like this:
+          * 
+          * ClientMessage message = producer.createMessage(...);
+          * 
+          * message.putStringProperty("foo", "bar");
+          * 
+          * message.encodeToBuffer(); [this sets the destination from the producer, and encodes]
+          * 
+          * message.getBuffer().writeString("testINVMCoreClient");
+          *          
+          * message.send();
+          * 
+          * OR, another option:
+          * 
+          * Get rid of client producer altogether,
+          * 
+          * Have send direct on the session, and destination must be set explicitly
+          * 
+          * e.g.
+          * 
+          * ClientMessage message = session.createMessage(...)
+          * 
+          * message.putStringProperty("foo", "bar");
+          * 
+          * message.setDestination("foo");
+          * 
+          * message.encodeToBuffer();
+          * 
+          * message.getBuffer().writeString("testINVMCoreClient");
+          * 
+          * message.send();
+          * 
+          */
+         
          ClientMessage message = session.createClientMessage(HornetQTextMessage.TYPE,
                                                              false,
                                                              0,
@@ -90,13 +125,16 @@
 
          message.putStringProperty("foo", "bar");
 
+         //One way around the setting destination problem is as follows -
+         //Remove destination as an attribute from client producer.
+         //The destination always has to be set explicity before sending a message
+         
          message.setDestination(QUEUE);
-
+         
          message.encodeToBuffer();
 
          message.getBuffer().writeString("testINVMCoreClient");
 
-         log.info("sending message " + i);
          producer.send(message);
       }
 



More information about the hornetq-commits mailing list