[jboss-cvs] JBoss Messaging SVN: r3685 - in trunk: src/main/org/jboss/jms/client/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Feb 8 11:25:03 EST 2008


Author: timfox
Date: 2008-02-08 11:25:03 -0500 (Fri, 08 Feb 2008)
New Revision: 3685

Modified:
   trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
   trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
   trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
   trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
   trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
   trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
   trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
Log:
Fixed test, just one to go now!


Modified: trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/JBossMessageProducer.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -332,10 +332,10 @@
 
       if (this.defaultDestination != null && !this.defaultDestination.equals(destination))
       {
-         throw new UnsupportedOperationException("Where a default destination is specified " +
-                                                 "for the sender and a destination is " +
-                                                 "specified in the arguments to the send, " +
-                                                 "these destinations must be equal");
+         throw new JMSException("Where a default destination is specified " +
+                                "for the sender and a destination is " +
+                                "specified in the arguments to the send, " +
+                                "these destinations must be equal");
       }
       
       JBossMessage jbm;

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientConsumerImpl.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -274,13 +274,6 @@
       return id;
    }
 
-//   public void changeRate(float newRate) throws MessagingException
-//   {
-//      checkClosed();
-//
-//      remotingConnection.send(id, new ConsumerFlowTokenMessage(newRate), true);
-//   }
-
    public void handleMessage(final DeliverMessage message) throws Exception
    {
       synchronized (mainLock)
@@ -312,8 +305,6 @@
          // Add it to the buffer
          Message coreMessage = message.getMessage();
 
-         coreMessage.setDeliveryCount(message.getDeliveryCount());
-
          buffer.addLast(message, coreMessage.getPriority());
 
          if (receiverThread != null)

Modified: trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/client/impl/ClientSessionImpl.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -498,6 +498,9 @@
    {
       try
       {                              
+         //Need to flush any acks to server first
+         acknowledgeInternal(false);
+                  
          SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(id, new SessionXAForgetMessage(xid));
          
          if (response.isError())
@@ -595,7 +598,6 @@
    {
       try
       {
-
          SessionXARollbackMessage packet = new SessionXARollbackMessage(xid);
          
          SessionXAResponseMessage response = (SessionXAResponseMessage)remotingConnection.send(id, packet);
@@ -637,10 +639,16 @@
          
          if (flags == XAResource.TMJOIN)
          {
+            //Need to flush any acks to server first
+            acknowledgeInternal(false);
+                        
             packet = new SessionXAJoinMessage(xid);                  
          }
          else if (flags == XAResource.TMRESUME)
          {
+            //Need to flush any acks to server first
+            acknowledgeInternal(false);
+                        
             packet = new SessionXAResumeMessage(xid);
          }
          else if (flags == XAResource.TMNOFLAGS)

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerConsumerEndpoint.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -111,8 +111,6 @@
       
       this.autoDeleteQueue = autoDeleteQueue;
       
-      log.info("Enable flow control is " + enableFlowControl);
-      
       this.enableFlowControl = enableFlowControl;
       
       // adding the consumer to the queue

Modified: trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -250,40 +250,7 @@
          throw new IllegalStateException("Cannot find consumer with id " + consumerId + " to remove");
       }      
    }
-
-//   void localClose() throws Exception
-//   {
-//      Map<String, ServerConsumerEndpoint> consumersClone = new HashMap<String, ServerConsumerEndpoint>(consumers);
-//      
-//      for (ServerConsumerEndpoint consumer: consumersClone.values())
-//      {
-//         consumer.close();
-//      }
-//
-//      consumers.clear();
-//
-//      Map<String, ServerBrowserEndpoint> browsersClone = new HashMap<String, ServerBrowserEndpoint>(browsers);
-//      
-//      for (ServerBrowserEndpoint browser: browsersClone.values())
-//      {
-//         browser.close();
-//      }
-//
-//      consumers.clear();
-//
-//      browsers.clear();
-//
-//      rollback();
-//
-//      executor.shutdown();
-//
-//      deliveries.clear();
-//
-//      sp.removeSession(id);
-//
-//      closed = true;
-//   }
-
+   
    synchronized void handleDelivery(MessageReference ref, ServerConsumerEndpoint consumer, PacketSender sender) throws Exception
    {
       // FIXME - we shouldn't have to pass in the packet Sender - this should be

Modified: trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/impl/DeliveryImpl.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -22,6 +22,7 @@
 package org.jboss.messaging.core.impl;
 
 import org.jboss.messaging.core.Delivery;
+import org.jboss.messaging.core.Message;
 import org.jboss.messaging.core.MessageReference;
 import org.jboss.messaging.core.remoting.PacketSender;
 import org.jboss.messaging.core.remoting.wireformat.DeliverMessage;
@@ -67,10 +68,19 @@
    
    public void deliver() throws Exception
    {
-      DeliverMessage message = new DeliverMessage(reference.getMessage(),
-                                                  deliveryID,
-                                                  reference.getDeliveryCount() + 1);
+      /*
+      Note we copy the message before sending.
+      This is because delivery count may be different for the same message sent to different topic subscribers
+      And invm the same message instance would otherwise be passed to all consumers
+      For the non INVM case this copy is unncessary and can be optimised away TODO - although the overhead of
+      copying is actually quite small
+      */
+      Message copy = reference.getMessage().copy();
       
+      copy.setDeliveryCount(reference.getDeliveryCount() + 1);
+      
+      DeliverMessage message = new DeliverMessage(copy, deliveryID);
+      
       message.setTargetID(consumerID);
       
       sender.send(message);

Modified: trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/impl/MessageImpl.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -95,10 +95,6 @@
     */
    private Semaphore refsLock = new Semaphore(1);
    
-      
-   //FIXME - does scheduledDeliveryTime belong on message? surely on SessionSendMessage
-  // private long scheduledDeliveryTime;
-         
    // Constructors --------------------------------------------------
 
    /*

Modified: trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/remoting/codec/DeliverMessageCodec.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -38,16 +38,14 @@
    {
       byte[] encodedMsg = encodeMessage(message.getMessage());
       long deliveryID = message.getDeliveryID();
-      int deliveryCount = message.getDeliveryCount();
 
-      int bodyLength = INT_LENGTH + encodedMsg.length
+      int bodyLength = encodedMsg.length
             + LONG_LENGTH + INT_LENGTH;
       out.putInt(bodyLength);
 
       out.putInt(encodedMsg.length);
       out.put(encodedMsg);
       out.putLong(deliveryID);
-      out.putInt(deliveryCount);
    }
 
    @Override
@@ -65,9 +63,8 @@
       in.get(encodedMsg);
       Message msg = decodeMessage(encodedMsg);
       long deliveryID = in.getLong();
-      int deliveryCount = in.getInt();
 
-      return new DeliverMessage(msg, deliveryID, deliveryCount);
+      return new DeliverMessage(msg, deliveryID);
    }
 
    // Package protected ---------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/src/main/org/jboss/messaging/core/remoting/wireformat/DeliverMessage.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -23,20 +23,15 @@
 
    // Attributes ----------------------------------------------------
 
-   //FIXME - we do not need all these fields
-   
    private final Message message;
 
    private final long deliveryID;
 
-   private final int deliveryCount;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public DeliverMessage(Message message, long deliveryID,
-                         int deliveryCount)
+   public DeliverMessage(Message message, long deliveryID)
    {
       super(SESS_DELIVER);
 
@@ -44,7 +39,6 @@
 
       this.message = message;
       this.deliveryID = deliveryID;
-      this.deliveryCount = deliveryCount;
    }
 
    // Public --------------------------------------------------------
@@ -59,18 +53,12 @@
       return deliveryID;
    }
 
-   public int getDeliveryCount()
-   {
-      return deliveryCount;
-   }
-
    @Override
    public String toString()
    {
       StringBuffer buf = new StringBuffer(getParentString());
       buf.append(", message=" + message);
-      buf.append(", deliveryID=" + deliveryID);
-      buf.append(", deliveryCount=" + deliveryCount);
+      buf.append(", deliveryID=" + deliveryID);   
       buf.append("]");
       return buf.toString();
    }

Modified: trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-08 15:07:26 UTC (rev 3684)
+++ trunk/tests/src/org/jboss/messaging/core/remoting/wireformat/test/unit/PacketTypeTest.java	2008-02-08 16:25:03 UTC (rev 3685)
@@ -675,13 +675,12 @@
    public void testDeliverMessage() throws Exception
    {
       Message msg = new MessageImpl();
-      DeliverMessage message = new DeliverMessage(msg, randomLong(), 23);
+      DeliverMessage message = new DeliverMessage(msg, randomLong());
 
       AbstractPacketCodec codec = new DeliverMessageCodec();
       SimpleRemotingBuffer buffer = encode(message, codec);
       checkHeader(buffer, message);
-      checkBody(buffer, encodeMessage(msg), message.getDeliveryID(), message
-            .getDeliveryCount());
+      checkBody(buffer, encodeMessage(msg), message.getDeliveryID());
       buffer.rewind();
 
       AbstractPacket decodedPacket = codec.decode(buffer);
@@ -692,8 +691,6 @@
       assertEquals(message.getMessage().getMessageID(), decodedMessage
             .getMessage().getMessageID());
       assertEquals(message.getDeliveryID(), decodedMessage.getDeliveryID());
-      assertEquals(message.getDeliveryCount(), decodedMessage
-            .getDeliveryCount());
    }
 
    public void testSessionAcknowledgeMessage() throws Exception




More information about the jboss-cvs-commits mailing list