[jboss-cvs] JBoss Messaging SVN: r4155 - in trunk: src/main/org/jboss/messaging/core/message and 6 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed May 7 02:39:39 EDT 2008


Author: timfox
Date: 2008-05-07 02:39:39 -0400 (Wed, 07 May 2008)
New Revision: 4155

Modified:
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
   trunk/src/main/org/jboss/messaging/core/message/Message.java
   trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java
   trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
   trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
Log:
Moved delivery id onto message


Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -33,7 +33,6 @@
 import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerFlowTokenMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
 
@@ -75,7 +74,7 @@
    
    private final int tokenBatchSize;
    
-   private final PriorityLinkedList<ConsumerDeliverMessage> buffer = new PriorityLinkedListImpl<ConsumerDeliverMessage>(10);
+   private final PriorityLinkedList<Message> buffer = new PriorityLinkedListImpl<Message>(10);
    
    private volatile Thread receiverThread;
    
@@ -169,9 +168,9 @@
                     
             if (!closed && !buffer.isEmpty())
             {                              
-               ConsumerDeliverMessage m = buffer.removeFirst();
+               Message m = buffer.removeFirst();
                
-               boolean expired = m.getMessage().isExpired();
+               boolean expired = m.isExpired();
                
                session.delivered(m.getDeliveryID(), expired);
                
@@ -189,7 +188,7 @@
                   }
                }
                                  
-               return m.getMessage();
+               return m;
             }
             else
             {
@@ -284,7 +283,7 @@
       return targetID;
    }
 
-   public void handleMessage(final ConsumerDeliverMessage message) throws Exception
+   public void handleMessage(final Message message) throws Exception
    {
       if (closed)
       {
@@ -320,7 +319,7 @@
          {
             //Dispatch it directly on remoting thread
             
-            boolean expired = message.getMessage().isExpired();
+            boolean expired = message.isExpired();
 
             session.delivered(message.getDeliveryID(), expired);
             
@@ -328,7 +327,7 @@
 
             if (!expired)
             {
-               handler.onMessage(message.getMessage());
+               handler.onMessage(message);
             }
          }
          else
@@ -337,7 +336,7 @@
          	
          	synchronized (this)
          	{
-         		buffer.addLast(message, message.getMessage().getPriority());
+         		buffer.addLast(message, message.getPriority());
          	}
          	            	
          	sessionExecutor.execute(new Runnable() { public void run() { callOnMessage(); } } );
@@ -349,7 +348,7 @@
       	
       	synchronized (this)
       	{
-      		buffer.addLast(message, message.getMessage().getPriority());
+      		buffer.addLast(message, message.getPriority());
          	      		
       		notify();
       	}
@@ -443,7 +442,7 @@
    		//ordering. If we just added a Runnable with the message to the executor immediately as we get it
    		//we could not do that
    		
-   		ConsumerDeliverMessage message;
+   		Message message;
    		
    		synchronized (this)
    		{
@@ -452,7 +451,7 @@
    		
    		if (message != null)
    		{      		
-      		boolean expired = message.getMessage().isExpired();
+      		boolean expired = message.isExpired();
    
             session.delivered(message.getDeliveryID(), expired);
             
@@ -462,7 +461,7 @@
             {
          		onMessageThread = Thread.currentThread();
          		
-               handler.onMessage(message.getMessage());
+               handler.onMessage(message);
             }
    		}
 		}

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerInternal.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -8,7 +8,7 @@
 
 import org.jboss.messaging.core.client.ClientConsumer;
 import org.jboss.messaging.core.exception.MessagingException;
-import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
+import org.jboss.messaging.core.message.Message;
 
 /**
  * 
@@ -21,7 +21,7 @@
 {   
    long getClientTargetID();
    
-   void handleMessage(ConsumerDeliverMessage message) throws Exception;
+   void handleMessage(Message message) throws Exception;
    
    void recover(long lastDeliveryID) throws MessagingException;
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientConsumerPacketHandler.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -45,7 +45,7 @@
          {
             ConsumerDeliverMessage message = (ConsumerDeliverMessage) packet;
             
-            clientConsumer.handleMessage(message);
+            clientConsumer.handleMessage(message.getMessage());
          }
          else
          {

Modified: trunk/src/main/org/jboss/messaging/core/message/Message.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/Message.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/message/Message.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -21,7 +21,6 @@
   */
 package org.jboss.messaging.core.message;
 
-import java.util.List;
 import java.util.Map;
 
 import org.jboss.messaging.core.server.Queue;
@@ -107,6 +106,14 @@
     * @param priority
     */
    void setPriority(byte priority);
+   
+   int getDeliveryCount();
+   
+   void setDeliveryCount(int deliveryCount);
+   
+   long getDeliveryID();
+   
+   void setDeliveryID(long deliveryID);
 
    /**
     * Binds a header. If the header map previously contained a mapping for this name, the old value
@@ -173,18 +180,6 @@
    int getType();   
    
    /**
-    * 
-    * @return The delivery count of the message - only available on the client side
-    */
-   int getDeliveryCount();
-   
-   /**
-    * Set the delivery count of the message
-    * @param count
-    */
-   void setDeliveryCount(int count);
-   
-   /**
     * Get the connection id
     * @return the connection id
     */

Modified: trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/message/impl/MessageImpl.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -80,6 +80,8 @@
    
    private int deliveryCount;
    
+   private long deliveryID;
+   
    // Constructors --------------------------------------------------
 
    /*
@@ -278,6 +280,16 @@
    {
       return this.deliveryCount;
    }
+   
+   public void setDeliveryID(final long deliveryID)
+   {
+      this.deliveryID = deliveryID;
+   }
+   
+   public long getDeliveryID()
+   {
+      return this.deliveryID;
+   }
 
    public boolean isExpired()
    {
@@ -371,6 +383,8 @@
       out.writeByte(priority);
       
       out.writeInt(deliveryCount);
+      
+      out.writeLong(deliveryID);
 
       if (payload != null)
       {
@@ -402,6 +416,8 @@
 
       deliveryCount = in.readInt();
       
+      deliveryID = in.readLong();
+      
       int length = in.readInt();
 
       if (length == 0)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/ConsumerDeliverMessageCodec.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -43,7 +43,7 @@
    {
       byte[] encodedMsg = StreamUtils.toBytes(packet.getMessage());
    	
-   	return SIZE_INT + encodedMsg.length + SIZE_LONG; 
+   	return SIZE_INT + encodedMsg.length;
    }
    
    @Override
@@ -52,7 +52,6 @@
       byte[] encodedMsg = StreamUtils.toBytes(message.getMessage());
       out.putInt(encodedMsg.length);
       out.put(encodedMsg);
-      out.putLong(message.getDeliveryID());
       encodedMsg = null;
    }
 
@@ -65,9 +64,8 @@
       in.get(encodedMsg);
       Message message = new MessageImpl();
       StreamUtils.fromBytes(message, encodedMsg);
-      long deliveryID = in.getLong();
 
-      return new ConsumerDeliverMessage(message, deliveryID);
+      return new ConsumerDeliverMessage(message);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/codec/RemotingBuffer.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -59,5 +59,4 @@
    void rewind();
 
    byte[] array();
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/mina/BufferWrapper.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -29,13 +29,13 @@
 	
    // Attributes ----------------------------------------------------
 
-   protected final IoBuffer buffer;
+   private final IoBuffer buffer;
 
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public BufferWrapper(IoBuffer buffer)
+   public BufferWrapper(final IoBuffer buffer)
    {
       assert buffer != null;
 
@@ -86,7 +86,7 @@
       return buffer.get();
    }
 
-   public void get(byte[] b)
+   public void get(final byte[] b)
    {
       buffer.get(b);
    }
@@ -124,10 +124,6 @@
       return (b == TRUE);
    }
 
-   
-  
-   
-   
    public void putNullableString(final String nullableString)
    {
       if (nullableString == null)

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/ConsumerDeliverMessage.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -25,20 +25,17 @@
 
    private final Message message;
 
-   private final long deliveryID;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public ConsumerDeliverMessage(final Message message, final long deliveryID)
+   public ConsumerDeliverMessage(final Message message)
    {
       super(CONS_DELIVER);
 
       assert message != null;
 
       this.message = message;
-      this.deliveryID = deliveryID;
    }
 
    // Public --------------------------------------------------------
@@ -48,17 +45,11 @@
       return message;
    }
 
-   public long getDeliveryID()
-   {
-      return deliveryID;
-   }
-
    @Override
    public String toString()
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", message=" + message);
-      buf.append(", deliveryID=" + deliveryID);   
       buf.append("]");
       return buf.toString();
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/DeliveryImpl.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -24,7 +24,6 @@
 import org.jboss.messaging.core.logging.Logger;
 import org.jboss.messaging.core.message.Message;
 import org.jboss.messaging.core.message.MessageReference;
-import org.jboss.messaging.core.persistence.StorageManager;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.impl.wireformat.ConsumerDeliverMessage;
 import org.jboss.messaging.core.server.Delivery;
@@ -84,8 +83,10 @@
       
       copy.setDeliveryCount(reference.getDeliveryCount() + 1);
       
-      ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy, deliveryID);
+      copy.setDeliveryID(deliveryID);
       
+      ConsumerDeliverMessage message = new ConsumerDeliverMessage(copy);
+      
       message.setTargetID(consumerID);
       message.setExecutorID(sessionID);
       

Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-06 20:02:50 UTC (rev 4154)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/remoting/impl/wireformat/PacketTypeTest.java	2008-05-07 06:39:39 UTC (rev 4155)
@@ -679,19 +679,17 @@
    public void testConsumerDeliverMessage() throws Exception
    {
       Message msg = new MessageImpl();
-      ConsumerDeliverMessage message = new ConsumerDeliverMessage(msg,
-            randomLong());
+      ConsumerDeliverMessage message = new ConsumerDeliverMessage(msg);
       AbstractPacketCodec codec = new ConsumerDeliverMessageCodec();
 
       Packet decodedPacket = encodeAndCheckBytesAndDecode(message, codec,
-            StreamUtils.toBytes(msg), message.getDeliveryID());
+            StreamUtils.toBytes(msg));
 
       assertTrue(decodedPacket instanceof ConsumerDeliverMessage);
       ConsumerDeliverMessage decodedMessage = (ConsumerDeliverMessage) decodedPacket;
       assertEquals(CONS_DELIVER, decodedMessage.getType());
       assertEquals(message.getMessage().getMessageID(), decodedMessage
             .getMessage().getMessageID());
-      assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
    }
 
    public void testSessionAcknowledgeMessage() throws Exception




More information about the jboss-cvs-commits mailing list