[jboss-cvs] JBoss Messaging SVN: r3397 - in branches/Branch_Stable: src/main/org/jboss/jms/server/endpoint and 3 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Dec 3 08:56:39 EST 2007
Author: timfox
Date: 2007-12-03 08:56:39 -0500 (Mon, 03 Dec 2007)
New Revision: 3397
Modified:
branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
Log:
Wait for last message before closing - knock on from remoting problem
Modified: branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/client/delegate/ClientSessionDelegate.java 2007-12-03 13:56:39 UTC (rev 3397)
@@ -458,7 +458,15 @@
{
boolean oneway = !(m.isReliable() || strictTck);
- RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway);
+ long sequence = -1;
+ if (oneway)
+ {
+ SessionState sstate = (SessionState)state;
+ sequence = sstate.getNPSendSequence();
+ sstate.incNpSendSequence();
+ }
+
+ RequestSupport req = new SessionSendRequest(id, version, m, checkForDuplicates, oneway, sequence);
if (oneway)
{
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/ServerSessionEndpoint.java 2007-12-03 13:56:39 UTC (rev 3397)
@@ -126,6 +126,8 @@
private static final long DELIVERY_WAIT_TIMEOUT = 5 * 1000;
+ private static final long CLOSE_WAIT_TIMEOUT = 10 * 1000;
+
// Static ---------------------------------------------------------------------------------------
// Attributes -----------------------------------------------------------------------------------
@@ -173,6 +175,11 @@
private boolean waitingToClose = false;
+ private Object waitLock = new Object();
+
+ private long lastSequence = -1;
+
+
// Constructors ---------------------------------------------------------------------------------
ServerSessionEndpoint(String sessionID, ServerConnectionEndpoint connectionEndpoint,
@@ -332,14 +339,58 @@
{
if (trace) log.trace(this + " closing");
+ //Need to wait for messages to arrive or they may be lost if connection/session is closed too
+ //quickly after sending
+
+ if (sequence != -1)
+ {
+ synchronized (waitLock)
+ {
+ long wait = CLOSE_WAIT_TIMEOUT;
+
+ while (lastSequence != sequence - 1 && wait > 0)
+ {
+ long start = System.currentTimeMillis();
+ try
+ {
+ waitLock.wait(wait);
+ }
+ catch (InterruptedException e)
+ {
+ }
+ wait -= (System.currentTimeMillis() - start);
+ }
+
+ if (wait <= 0)
+ {
+ log.warn("Timed out waiting for last message");
+ }
+ }
+ }
+
return -1;
}
public void send(JBossMessage message, final boolean checkForDuplicates) throws JMSException
{
+ throw new IllegalStateException("Should not be handled here");
+ }
+
+ public void send(JBossMessage message, final boolean checkForDuplicates, long sequence) throws JMSException
+ {
try
{
- connectionEndpoint.sendMessage(message, null, checkForDuplicates);
+ connectionEndpoint.sendMessage(message, null, checkForDuplicates);
+
+ if (sequence != -1)
+ {
+ synchronized (waitLock)
+ {
+ this.lastSequence = sequence;
+
+ waitLock.notifyAll();
+ }
+ }
}
catch (Throwable t)
{
@@ -566,8 +617,6 @@
}
deliveryIdSequence = maxDeliveryId + 1;
-
- // log.info("*** set deliveryIdSequence to " + deliveryIdSequence);
}
catch (Throwable t)
{
@@ -1253,8 +1302,6 @@
long deliveryId = deliveryIdSequence++;
- // log.info(System.identityHashCode(this) + " Delivery id is now " + deliveryId);
-
if (trace) { log.trace("Delivery id is now " + deliveryId); }
//TODO can't we combine flags isRetainDeliveries and isReplicating - surely they're mutually exclusive?
Modified: branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/server/endpoint/advised/SessionAdvised.java 2007-12-03 13:56:39 UTC (rev 3397)
@@ -78,9 +78,14 @@
public void send(JBossMessage msg, boolean checkForDuplicates) throws JMSException
{
- ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates);
+ throw new IllegalStateException("Invocation should not be handled here");
}
+ public void send(JBossMessage msg, boolean checkForDuplicates, long sequence) throws JMSException
+ {
+ ((ServerSessionEndpoint)endpoint).send(msg, checkForDuplicates, sequence);
+ }
+
public ConsumerDelegate createConsumerDelegate(JBossDestination destination, String selector,
boolean noLocal, String subscriptionName,
boolean connectionConsumer, boolean autoFlowControl) throws JMSException
Modified: branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java
===================================================================
--- branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/src/main/org/jboss/jms/wireformat/SessionSendRequest.java 2007-12-03 13:56:39 UTC (rev 3397)
@@ -48,6 +48,7 @@
private JBossMessage msg;
private boolean checkForDuplicates;
private boolean oneway;
+ private long sequence;
public SessionSendRequest()
{
@@ -57,12 +58,14 @@
byte version,
JBossMessage msg,
boolean checkForDuplicates,
- boolean oneway)
+ boolean oneway,
+ long sequence)
{
super(objectId, PacketSupport.REQ_SESSION_SEND, version);
this.msg = msg;
this.checkForDuplicates = checkForDuplicates;
this.oneway = oneway;
+ this.sequence = sequence;
}
public void read(DataInputStream is) throws Exception
@@ -78,6 +81,8 @@
checkForDuplicates = is.readBoolean();
oneway = is.readBoolean();
+
+ sequence = is.readLong();
}
public ResponseSupport serverInvoke() throws Exception
@@ -87,7 +92,7 @@
if (advised != null)
{
- advised.send(msg, checkForDuplicates);
+ advised.send(msg, checkForDuplicates, sequence);
}
else
{
@@ -112,6 +117,8 @@
os.writeBoolean(oneway);
+ os.writeLong(sequence);
+
os.flush();
}
Modified: branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java
===================================================================
--- branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-12-03 10:21:41 UTC (rev 3396)
+++ branches/Branch_Stable/tests/src/org/jboss/test/messaging/jms/WireFormatTest.java 2007-12-03 13:56:39 UTC (rev 3397)
@@ -667,7 +667,7 @@
JBossMessage msg = new JBossMessage(123);
RequestSupport req =
- new SessionSendRequest("23", (byte)77, msg, false, true);
+ new SessionSendRequest("23", (byte)77, msg, false, true, -1);
testPacket(req, PacketSupport.REQ_SESSION_SEND);
}
More information about the jboss-cvs-commits
mailing list