Author: timfox
Date: 2009-12-05 11:21:45 -0500 (Sat, 05 Dec 2009)
New Revision: 8583
Modified:
trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
fix MultiThreadRandomFailoverTest etc
Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-05 13:10:42
UTC (rev 8582)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java 2009-12-05 16:21:45
UTC (rev 8583)
@@ -178,6 +178,8 @@
private volatile boolean workDone;
private final String groupID;
+
+ private volatile boolean inClose;
// Constructors
----------------------------------------------------------------------------
@@ -487,6 +489,10 @@
rollbackOnFailover();
}
+ else
+ {
+ throw e;
+ }
}
workDone = false;
@@ -750,7 +756,7 @@
public void close() throws HornetQException
{
if (closed)
- {
+ {
return;
}
@@ -760,11 +766,16 @@
closeChildren();
+ inClose = true;
+
channel.sendBlocking(new SessionCloseMessage());
}
- catch (Throwable ignore)
+ catch (Throwable e)
{
// Session close should always return without exception
+
+ //Note - we only log at trace
+ log.trace("Failed to close session", e);
}
doCleanup();
@@ -829,114 +840,120 @@
else
{
// The session wasn't found on the server - probably we're failing
over onto a backup server where the
- // session
- // won't exist or the target server has been restarted - in this case the
session will need to be recreated,
+ // session won't exist or the target server has been restarted - in this
case the session will need to be recreated,
// and we'll need to recreate any consumers
-
- Packet createRequest = new CreateSessionMessage(name,
- channel.getID(),
- version,
- username,
- password,
- minLargeMessageSize,
- xa,
- autoCommitSends,
- autoCommitAcks,
- preAcknowledge,
- confirmationWindowSize);
- boolean retry = false;
- do
+
+ // It could also be that the server hasn't been restarted, but the
session is currently executing close, and that
+ // has already been executed on the server, that's why we can't find
the session- in this case we *don't* want
+ // to recreate the session, we just want to unblock the blocking call
+ if (!inClose)
{
- try
+ Packet createRequest = new CreateSessionMessage(name,
+ channel.getID(),
+ version,
+ username,
+ password,
+ minLargeMessageSize,
+ xa,
+ autoCommitSends,
+ autoCommitAcks,
+ preAcknowledge,
+ confirmationWindowSize);
+ boolean retry = false;
+ do
{
- channel1.sendBlocking(createRequest);
- retry = false;
- }
- catch (HornetQException e)
- {
- // the session was created while its server was starting, retry it:
- if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ try
{
- log.warn("Server is starting, retry to create the session
" + name);
- retry = true;
- // sleep a little bit to avoid spinning too much
- Thread.sleep(10);
+ channel1.sendBlocking(createRequest);
+ retry = false;
}
- else
+ catch (HornetQException e)
{
- throw e;
+ // the session was created while its server was starting, retry it:
+ if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+ {
+ log.warn("Server is starting, retry to create the session
" + name);
+ retry = true;
+ // sleep a little bit to avoid spinning too much
+ Thread.sleep(10);
+ }
+ else
+ {
+ throw e;
+ }
}
}
- }
- while (retry);
-
- channel.clearCommands();
-
- for (Map.Entry<Long, ClientConsumerInternal> entry :
consumers.entrySet())
- {
- SessionCreateConsumerMessage createConsumerRequest = new
SessionCreateConsumerMessage(entry.getKey(),
-
entry.getValue()
-
.getQueueName(),
-
entry.getValue()
-
.getFilterString(),
-
entry.getValue()
-
.isBrowseOnly(),
-
false);
-
- createConsumerRequest.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer =
createConsumerRequest.encode(channel.getConnection());
-
- conn.write(buffer, false);
-
- int clientWindowSize = entry.getValue().getClientWindowSize();
-
- if (clientWindowSize != 0)
+ while (retry);
+
+ channel.clearCommands();
+
+ for (Map.Entry<Long, ClientConsumerInternal> entry :
consumers.entrySet())
{
- SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
-
clientWindowSize);
-
- packet.setChannelID(channel.getID());
-
- buffer = packet.encode(channel.getConnection());
-
+ SessionCreateConsumerMessage createConsumerRequest = new
SessionCreateConsumerMessage(entry.getKey(),
+
entry.getValue()
+
.getQueueName(),
+
entry.getValue()
+
.getFilterString(),
+
entry.getValue()
+
.isBrowseOnly(),
+
false);
+
+ createConsumerRequest.setChannelID(channel.getID());
+
+ Connection conn = channel.getConnection().getTransportConnection();
+
+ HornetQBuffer buffer =
createConsumerRequest.encode(channel.getConnection());
+
conn.write(buffer, false);
+
+ int clientWindowSize = entry.getValue().getClientWindowSize();
+
+ if (clientWindowSize != 0)
+ {
+ SessionConsumerFlowCreditMessage packet = new
SessionConsumerFlowCreditMessage(entry.getKey(),
+
clientWindowSize);
+
+ packet.setChannelID(channel.getID());
+
+ buffer = packet.encode(channel.getConnection());
+
+ conn.write(buffer, false);
+ }
}
- }
-
- if ((!autoCommitAcks || !autoCommitSends) && workDone)
- {
- // Session is transacted - set for rollback only
- // FIXME - there is a race condition here - a commit could sneak in before
this is set
- rollbackOnly = true;
- }
-
- // Now start the session if it was already started
- if (started)
- {
- for (ClientConsumerInternal consumer : consumers.values())
+
+ if ((!autoCommitAcks || !autoCommitSends) && workDone)
{
- consumer.clearAtFailover();
- consumer.start();
+ // Session is transacted - set for rollback only
+ // FIXME - there is a race condition here - a commit could sneak in
before this is set
+ rollbackOnly = true;
}
-
- Packet packet = new PacketImpl(PacketImpl.SESS_START);
-
- packet.setChannelID(channel.getID());
-
- Connection conn = channel.getConnection().getTransportConnection();
-
- HornetQBuffer buffer = packet.encode(channel.getConnection());
-
- conn.write(buffer, false);
+
+ // Now start the session if it was already started
+ if (started)
+ {
+ for (ClientConsumerInternal consumer : consumers.values())
+ {
+ consumer.clearAtFailover();
+ consumer.start();
+ }
+
+ Packet packet = new PacketImpl(PacketImpl.SESS_START);
+
+ packet.setChannelID(channel.getID());
+
+ Connection conn = channel.getConnection().getTransportConnection();
+
+ HornetQBuffer buffer = packet.encode(channel.getConnection());
+
+ conn.write(buffer, false);
+ }
+
+ resetCreditManager = true;
}
-
- resetCreditManager = true;
-
+
channel.returnBlocking();
}
+
}
catch (Throwable t)
{
@@ -1007,8 +1024,6 @@
if (rollbackOnly)
{
- rollback(xid);
-
throw new XAException(XAException.XA_RBOTHER);
}