[jboss-cvs] JBoss Messaging SVN: r6427 - branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Tue Apr 14 15:32:48 EDT 2009


Author: clebert.suconic at jboss.com
Date: 2009-04-14 15:32:48 -0400 (Tue, 14 Apr 2009)
New Revision: 6427

Modified:
   branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
   branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java
Log:
Changes before merge

Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-14 18:12:50 UTC (rev 6426)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java	2009-04-14 19:32:48 UTC (rev 6427)
@@ -678,6 +678,13 @@
 
          // Now we wait for any current handler runners to run.
          waitForOnMessageToComplete();
+         
+         if (currentLargeMessageBuffer != null)
+         {
+            currentLargeMessageBuffer.close();
+            currentLargeMessageBuffer = null;
+         }
+         
 
          closed = true;
 

Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java	2009-04-14 18:12:50 UTC (rev 6426)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java	2009-04-14 19:32:48 UTC (rev 6427)
@@ -152,6 +152,13 @@
       }
    }
    
+   public synchronized void close()
+   {
+      this.packets.offer(new SessionReceiveContinuationMessage());
+      this.streamEnded = true;
+      notifyAll();
+   }
+   
    public synchronized void setOutputStream(final OutputStream output) throws MessagingException
    {
       while (true)
@@ -997,6 +1004,13 @@
          {
             throw new IndexOutOfBoundsException();
          }
+         
+         if (currentPacket.getConsumerID() == 0)
+         {
+            streamEnded = true;
+            return;
+         }
+         
          consumerInternal.flowControl(currentPacket.getPacketSize(), true);
 
          packetPosition += sizeToAdd;
@@ -1023,7 +1037,7 @@
       {
          throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
       }
-      while (index >= packetLastPosition)
+      while (index >= packetLastPosition && ! streamEnded)
       {
          popPacket();
       }




More information about the jboss-cvs-commits mailing list