[hornetq-commits] JBoss hornetq SVN: r8583 - trunk/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Dec 5 11:21:46 EST 2009


Author: timfox
Date: 2009-12-05 11:21:45 -0500 (Sat, 05 Dec 2009)
New Revision: 8583

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
fix MultiThreadRandomFailoverTest etc

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-05 13:10:42 UTC (rev 8582)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-05 16:21:45 UTC (rev 8583)
@@ -178,6 +178,8 @@
    private volatile boolean workDone;
 
    private final String groupID;
+   
+   private volatile boolean inClose;
 
    // Constructors ----------------------------------------------------------------------------
 
@@ -487,6 +489,10 @@
             
             rollbackOnFailover();      
          }
+         else
+         {
+            throw e;
+         }
       }
 
       workDone = false;
@@ -750,7 +756,7 @@
    public void close() throws HornetQException
    {
       if (closed)
-      {        
+      {     
          return;
       }
 
@@ -760,11 +766,16 @@
 
          closeChildren();
 
+         inClose = true;
+         
          channel.sendBlocking(new SessionCloseMessage());
       }
-      catch (Throwable ignore)
+      catch (Throwable e)
       {
          // Session close should always return without exception
+         
+         //Note - we only log at trace
+         log.trace("Failed to close session", e);
       }
 
       doCleanup();
@@ -829,114 +840,120 @@
          else
          {
             // The session wasn't found on the server - probably we're failing over onto a backup server where the
-            // session
-            // won't exist or the target server has been restarted - in this case the session will need to be recreated,
+            // session won't exist or the target server has been restarted - in this case the session will need to be recreated,
             // and we'll need to recreate any consumers
-
-            Packet createRequest = new CreateSessionMessage(name,
-                                                            channel.getID(),
-                                                            version,
-                                                            username,
-                                                            password,
-                                                            minLargeMessageSize,
-                                                            xa,
-                                                            autoCommitSends,
-                                                            autoCommitAcks,
-                                                            preAcknowledge,
-                                                            confirmationWindowSize);
-            boolean retry = false;
-            do
+            
+            // It could also be that the server hasn't been restarted, but the session is currently executing close, and that
+            // has already been executed on the server, that's why we can't find the session- in this case we *don't* want
+            // to recreate the session, we just want to unblock the blocking call
+            if (!inClose)
             {
-               try
+               Packet createRequest = new CreateSessionMessage(name,
+                                                               channel.getID(),
+                                                               version,
+                                                               username,
+                                                               password,
+                                                               minLargeMessageSize,
+                                                               xa,
+                                                               autoCommitSends,
+                                                               autoCommitAcks,
+                                                               preAcknowledge,
+                                                               confirmationWindowSize);
+               boolean retry = false;
+               do
                {
-                  channel1.sendBlocking(createRequest);
-                  retry = false;
-               }
-               catch (HornetQException e)
-               {
-                  // the session was created while its server was starting, retry it:
-                  if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+                  try
                   {
-                     log.warn("Server is starting, retry to create the session " + name);
-                     retry = true;
-                     // sleep a little bit to avoid spinning too much
-                     Thread.sleep(10);
+                     channel1.sendBlocking(createRequest);
+                     retry = false;
                   }
-                  else
+                  catch (HornetQException e)
                   {
-                     throw e;
+                     // the session was created while its server was starting, retry it:
+                     if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+                     {
+                        log.warn("Server is starting, retry to create the session " + name);
+                        retry = true;
+                        // sleep a little bit to avoid spinning too much
+                        Thread.sleep(10);
+                     }
+                     else
+                     {
+                        throw e;
+                     }
                   }
                }
-            }
-            while (retry);
-
-            channel.clearCommands();
-
-            for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
-            {
-               SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
-                                                                                                     entry.getValue()
-                                                                                                          .getQueueName(),
-                                                                                                     entry.getValue()
-                                                                                                          .getFilterString(),
-                                                                                                     entry.getValue()
-                                                                                                          .isBrowseOnly(),
-                                                                                                     false);
-
-               createConsumerRequest.setChannelID(channel.getID());
-
-               Connection conn = channel.getConnection().getTransportConnection();
-
-               HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
-
-               conn.write(buffer, false);
-
-               int clientWindowSize = entry.getValue().getClientWindowSize();
-
-               if (clientWindowSize != 0)
+               while (retry);
+   
+               channel.clearCommands();
+   
+               for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
                {
-                  SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
-                                                                                                 clientWindowSize);
-
-                  packet.setChannelID(channel.getID());
-
-                  buffer = packet.encode(channel.getConnection());
-
+                  SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
+                                                                                                        entry.getValue()
+                                                                                                             .getQueueName(),
+                                                                                                        entry.getValue()
+                                                                                                             .getFilterString(),
+                                                                                                        entry.getValue()
+                                                                                                             .isBrowseOnly(),
+                                                                                                        false);
+   
+                  createConsumerRequest.setChannelID(channel.getID());
+   
+                  Connection conn = channel.getConnection().getTransportConnection();
+   
+                  HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
+   
                   conn.write(buffer, false);
+   
+                  int clientWindowSize = entry.getValue().getClientWindowSize();
+   
+                  if (clientWindowSize != 0)
+                  {
+                     SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+                                                                                                    clientWindowSize);
+   
+                     packet.setChannelID(channel.getID());
+   
+                     buffer = packet.encode(channel.getConnection());
+   
+                     conn.write(buffer, false);
+                  }
                }
-            }
-
-            if ((!autoCommitAcks || !autoCommitSends) && workDone)
-            {
-               // Session is transacted - set for rollback only
-               // FIXME - there is a race condition here - a commit could sneak in before this is set
-               rollbackOnly = true;
-            }
-
-            // Now start the session if it was already started
-            if (started)
-            {
-               for (ClientConsumerInternal consumer : consumers.values())
+   
+               if ((!autoCommitAcks || !autoCommitSends) && workDone)
                {
-                  consumer.clearAtFailover();
-                  consumer.start();
+                  // Session is transacted - set for rollback only
+                  // FIXME - there is a race condition here - a commit could sneak in before this is set
+                  rollbackOnly = true;
                }
-
-               Packet packet = new PacketImpl(PacketImpl.SESS_START);
-
-               packet.setChannelID(channel.getID());
-
-               Connection conn = channel.getConnection().getTransportConnection();
-
-               HornetQBuffer buffer = packet.encode(channel.getConnection());
-
-               conn.write(buffer, false);
+   
+               // Now start the session if it was already started
+               if (started)
+               {
+                  for (ClientConsumerInternal consumer : consumers.values())
+                  {
+                     consumer.clearAtFailover();
+                     consumer.start();
+                  }
+   
+                  Packet packet = new PacketImpl(PacketImpl.SESS_START);
+   
+                  packet.setChannelID(channel.getID());
+   
+                  Connection conn = channel.getConnection().getTransportConnection();
+   
+                  HornetQBuffer buffer = packet.encode(channel.getConnection());
+   
+                  conn.write(buffer, false);
+               }
+   
+               resetCreditManager = true;
             }
-
-            resetCreditManager = true;
-
+            
             channel.returnBlocking();
          }
+         
       }
       catch (Throwable t)
       {
@@ -1007,8 +1024,6 @@
 
       if (rollbackOnly)
       {
-         rollback(xid);
-         
          throw new XAException(XAException.XA_RBOTHER);
       }
 



More information about the hornetq-commits mailing list