[jboss-cvs] JBoss Messaging SVN: r6427 - branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Tue Apr 14 15:32:48 EDT 2009
Author: clebert.suconic at jboss.com
Date: 2009-04-14 15:32:48 -0400 (Tue, 14 Apr 2009)
New Revision: 6427
Modified:
branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java
Log:
Changes before merge
Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-14 18:12:50 UTC (rev 6426)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/ClientConsumerImpl.java 2009-04-14 19:32:48 UTC (rev 6427)
@@ -678,6 +678,13 @@
// Now we wait for any current handler runners to run.
waitForOnMessageToComplete();
+
+ if (currentLargeMessageBuffer != null)
+ {
+ currentLargeMessageBuffer.close();
+ currentLargeMessageBuffer = null;
+ }
+
closed = true;
Modified: branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java
===================================================================
--- branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java 2009-04-14 18:12:50 UTC (rev 6426)
+++ branches/Branch_Temp_Clebert_LargeMessage/src/main/org/jboss/messaging/core/client/impl/LargeMessageBuffer.java 2009-04-14 19:32:48 UTC (rev 6427)
@@ -152,6 +152,13 @@
}
}
+ public synchronized void close()
+ {
+ this.packets.offer(new SessionReceiveContinuationMessage());
+ this.streamEnded = true;
+ notifyAll();
+ }
+
public synchronized void setOutputStream(final OutputStream output) throws MessagingException
{
while (true)
@@ -997,6 +1004,13 @@
{
throw new IndexOutOfBoundsException();
}
+
+ if (currentPacket.getConsumerID() == 0)
+ {
+ streamEnded = true;
+ return;
+ }
+
consumerInternal.flowControl(currentPacket.getPacketSize(), true);
packetPosition += sizeToAdd;
@@ -1023,7 +1037,7 @@
{
throw new IllegalAccessError("LargeMessage have read-only and one-way buffers");
}
- while (index >= packetLastPosition)
+ while (index >= packetLastPosition && ! streamEnded)
{
popPacket();
}
More information about the jboss-cvs-commits
mailing list