[hornetq-commits] JBoss hornetq SVN: r10323 - branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl.

do-not-reply at jboss.org do-not-reply at jboss.org
Sat Mar 12 21:22:04 EST 2011


Author: clebert.suconic at jboss.com
Date: 2011-03-12 21:22:03 -0500 (Sat, 12 Mar 2011)
New Revision: 10323

Modified:
   branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
Log:
Resending metadata on failover/reconnection

Modified: branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-11 21:33:07 UTC (rev 10322)
+++ branches/Branch_2_2_EAP/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2011-03-13 02:22:03 UTC (rev 10323)
@@ -13,6 +13,7 @@
 package org.hornetq.core.client.impl;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.LinkedHashMap;
 import java.util.List;
@@ -109,6 +110,8 @@
 
    // Attributes ----------------------------------------------------------------------------
 
+   private Map<String, String> metadata = new HashMap<String, String>();
+
    private final ClientSessionFactoryInternal sessionFactory;
 
    private final String name;
@@ -155,7 +158,7 @@
    private final boolean blockOnDurableSend;
 
    private final int minLargeMessageSize;
-   
+
    private final boolean compressLargeMessages;
 
    private volatile int initialMessagePacketSize;
@@ -184,7 +187,7 @@
    private final String groupID;
 
    private volatile boolean inClose;
-   
+
    private volatile SimpleString defaultAddress;
 
    // Constructors ----------------------------------------------------------------------------
@@ -262,7 +265,7 @@
       this.cacheLargeMessageClient = cacheLargeMessageClient;
 
       this.minLargeMessageSize = minLargeMessageSize;
-      
+
       this.compressLargeMessages = compressLargeMessages;
 
       this.initialMessagePacketSize = initialMessagePacketSize;
@@ -274,7 +277,7 @@
 
    // ClientSession implementation
    // -----------------------------------------------------------------
-   
+
    public void createQueue(final SimpleString address, final SimpleString queueName) throws HornetQException
    {
       internalCreateQueue(address, queueName, null, false, false);
@@ -649,7 +652,7 @@
    {
       stop(true);
    }
-   
+
    public void stop(final boolean waitForOnMessage) throws HornetQException
    {
       checkClosed();
@@ -689,7 +692,7 @@
    {
       return minLargeMessageSize;
    }
-   
+
    public boolean isCompressLargeMessages()
    {
       return compressLargeMessages;
@@ -875,197 +878,222 @@
 
    // Needs to be synchronized to prevent issues with occurring concurrently with close()
 
-   public synchronized void handleFailover(final CoreRemotingConnection backupConnection)
+   public void handleFailover(final CoreRemotingConnection backupConnection)
    {
-      if (closed)
+      synchronized (this)
       {
-         return;
-      }
+         if (closed)
+         {
+            return;
+         }
 
-      boolean resetCreditManager = false;
+         boolean resetCreditManager = false;
 
-      // We lock the channel to prevent any packets to be added to the resend
-      // cache during the failover process
-      channel.lock();
-      try
-      {
-         channel.transferConnection(backupConnection);
+         // We lock the channel to prevent any packets to be added to the resend
+         // cache during the failover process
+         channel.lock();
+         try
+         {
+            channel.transferConnection(backupConnection);
 
-         backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
+            backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
 
-         remotingConnection = backupConnection;
+            remotingConnection = backupConnection;
 
-         int lcid = channel.getLastConfirmedCommandID();
+            int lcid = channel.getLastConfirmedCommandID();
 
-         Packet request = new ReattachSessionMessage(name, lcid);
+            Packet request = new ReattachSessionMessage(name, lcid);
 
-         Channel channel1 = backupConnection.getChannel(1, -1);
+            Channel channel1 = backupConnection.getChannel(1, -1);
 
-         ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
+            ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
 
-         if (response.isReattached())
-         {
-            // The session was found on the server - we reattached transparently ok
+            if (response.isReattached())
+            {
+               // The session was found on the server - we reattached transparently ok
 
-            channel.replayCommands(response.getLastConfirmedCommandID());
-         }
-         else
-         {
+               channel.replayCommands(response.getLastConfirmedCommandID());
+            }
+            else
+            {
 
-            // The session wasn't found on the server - probably we're failing over onto a backup server where the
-            // session won't exist or the target server has been restarted - in this case the session will need to be
-            // recreated,
-            // and we'll need to recreate any consumers
+               // The session wasn't found on the server - probably we're failing over onto a backup server where the
+               // session won't exist or the target server has been restarted - in this case the session will need to be
+               // recreated,
+               // and we'll need to recreate any consumers
 
-            // It could also be that the server hasn't been restarted, but the session is currently executing close, and
-            // that
-            // has already been executed on the server, that's why we can't find the session- in this case we *don't*
-            // want
-            // to recreate the session, we just want to unblock the blocking call
-            if (!inClose)
-            {
-               Packet createRequest = new CreateSessionMessage(name,
-                                                               channel.getID(),
-                                                               version,
-                                                               username,
-                                                               password,
-                                                               minLargeMessageSize,
-                                                               xa,
-                                                               autoCommitSends,
-                                                               autoCommitAcks,
-                                                               preAcknowledge,
-                                                               confirmationWindowSize,
-                                                               defaultAddress == null ? null
-                                                                                     : defaultAddress.toString());
-               boolean retry = false;
-               do
+               // It could also be that the server hasn't been restarted, but the session is currently executing close,
+               // and
+               // that
+               // has already been executed on the server, that's why we can't find the session- in this case we *don't*
+               // want
+               // to recreate the session, we just want to unblock the blocking call
+               if (!inClose)
                {
-                  try
+                  Packet createRequest = new CreateSessionMessage(name,
+                                                                  channel.getID(),
+                                                                  version,
+                                                                  username,
+                                                                  password,
+                                                                  minLargeMessageSize,
+                                                                  xa,
+                                                                  autoCommitSends,
+                                                                  autoCommitAcks,
+                                                                  preAcknowledge,
+                                                                  confirmationWindowSize,
+                                                                  defaultAddress == null ? null
+                                                                                        : defaultAddress.toString());
+                  boolean retry = false;
+                  do
                   {
-                     channel1.sendBlocking(createRequest);
-                     retry = false;
-                  }
-                  catch (HornetQException e)
-                  {
-                     // the session was created while its server was starting, retry it:
-                     if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+                     try
                      {
-                        ClientSessionImpl.log.warn("Server is starting, retry to create the session " + name);
-                        retry = true;
-                        // sleep a little bit to avoid spinning too much
-                        Thread.sleep(10);
+                        channel1.sendBlocking(createRequest);
+                        retry = false;
                      }
-                     else
+                     catch (HornetQException e)
                      {
-                        throw e;
+                        // the session was created while its server was starting, retry it:
+                        if (e.getCode() == HornetQException.SESSION_CREATION_REJECTED)
+                        {
+                           ClientSessionImpl.log.warn("Server is starting, retry to create the session " + name);
+                           retry = true;
+                           // sleep a little bit to avoid spinning too much
+                           Thread.sleep(10);
+                        }
+                        else
+                        {
+                           throw e;
+                        }
                      }
                   }
-               }
-               while (retry);
+                  while (retry);
 
-               channel.clearCommands();
+                  channel.clearCommands();
 
-               for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
-               {
-                  SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
-
-                  // We try and recreate any non durable queues, since they probably won't be there unless
-                  // they are defined in hornetq-configuration.xml
-                  // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
-                  if (!queueInfo.isDurable())
+                  for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
                   {
-                     CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
-                                                                                    queueInfo.getName(),
-                                                                                    queueInfo.getFilterString(),
-                                                                                    false,
-                                                                                    queueInfo.isTemporary(),
-                                                                                    false);
+                     SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
 
-                     sendPacketWithoutLock(createQueueRequest);
-                  }
+                     // We try and recreate any non durable queues, since they probably won't be there unless
+                     // they are defined in hornetq-configuration.xml
+                     // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+                     if (!queueInfo.isDurable())
+                     {
+                        CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+                                                                                       queueInfo.getName(),
+                                                                                       queueInfo.getFilterString(),
+                                                                                       false,
+                                                                                       queueInfo.isTemporary(),
+                                                                                       false);
 
-                  SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
-                                                                                                        entry.getValue()
-                                                                                                             .getQueueName(),
-                                                                                                        entry.getValue()
-                                                                                                             .getFilterString(),
-                                                                                                        entry.getValue()
-                                                                                                             .isBrowseOnly(),
-                                                                                                        false);
+                        sendPacketWithoutLock(createQueueRequest);
+                     }
 
-                  sendPacketWithoutLock(createConsumerRequest);
+                     SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
+                                                                                                           entry.getValue()
+                                                                                                                .getQueueName(),
+                                                                                                           entry.getValue()
+                                                                                                                .getFilterString(),
+                                                                                                           entry.getValue()
+                                                                                                                .isBrowseOnly(),
+                                                                                                           false);
 
-                  int clientWindowSize = entry.getValue().getClientWindowSize();
+                     sendPacketWithoutLock(createConsumerRequest);
 
-                  if (clientWindowSize != 0)
-                  {
-                     SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
-                                                                                                    clientWindowSize);
+                     int clientWindowSize = entry.getValue().getClientWindowSize();
 
-                     sendPacketWithoutLock(packet);
+                     if (clientWindowSize != 0)
+                     {
+                        SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+                                                                                                       clientWindowSize);
+
+                        sendPacketWithoutLock(packet);
+                     }
+                     else
+                     {
+                        // https://jira.jboss.org/browse/HORNETQ-522
+                        SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
+                                                                                                       1);
+                        sendPacketWithoutLock(packet);
+                     }
                   }
-                  else
+
+                  if ((!autoCommitAcks || !autoCommitSends) && workDone)
                   {
-                     //https://jira.jboss.org/browse/HORNETQ-522
-                     SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
-                                                                                                    1);
-                     sendPacketWithoutLock(packet);
+                     // Session is transacted - set for rollback only
+                     // FIXME - there is a race condition here - a commit could sneak in before this is set
+                     rollbackOnly = true;
                   }
-               }
 
-               if ((!autoCommitAcks || !autoCommitSends) && workDone)
-               {
-                  // Session is transacted - set for rollback only
-                  // FIXME - there is a race condition here - a commit could sneak in before this is set
-                  rollbackOnly = true;
-               }
-
-               // Now start the session if it was already started
-               if (started)
-               {
-                  for (ClientConsumerInternal consumer : consumers.values())
+                  // Now start the session if it was already started
+                  if (started)
                   {
-                     consumer.clearAtFailover();
-                     consumer.start();
-                  }
+                     for (ClientConsumerInternal consumer : consumers.values())
+                     {
+                        consumer.clearAtFailover();
+                        consumer.start();
+                     }
 
-                  Packet packet = new PacketImpl(PacketImpl.SESS_START);
+                     Packet packet = new PacketImpl(PacketImpl.SESS_START);
 
-                  packet.setChannelID(channel.getID());
+                     packet.setChannelID(channel.getID());
 
-                  Connection conn = channel.getConnection().getTransportConnection();
+                     Connection conn = channel.getConnection().getTransportConnection();
 
-                  HornetQBuffer buffer = packet.encode(channel.getConnection());
+                     HornetQBuffer buffer = packet.encode(channel.getConnection());
 
-                  conn.write(buffer, false, false);
+                     conn.write(buffer, false, false);
+                  }
+
+                  resetCreditManager = true;
                }
 
-               resetCreditManager = true;
+               channel.returnBlocking();
             }
 
-            channel.returnBlocking();
+            channel.setTransferring(false);
          }
+         catch (Throwable t)
+         {
+            ClientSessionImpl.log.error("Failed to handle failover", t);
+         }
+         finally
+         {
+            channel.unlock();
+         }
 
-         channel.setTransferring(false);
+         if (resetCreditManager)
+         {
+            producerCreditManager.reset();
+
+            // Also need to send more credits for consumers, otherwise the system could hand with the server
+            // not having any credits to send
+         }
       }
-      catch (Throwable t)
+
+      // Resetting the metadata after failover
+      try
       {
-         ClientSessionImpl.log.error("Failed to handle failover", t);
+         for (Map.Entry<String, String> entries : metadata.entrySet())
+         {
+            addMetaData(entries.getKey(), entries.getValue());
+         }
       }
-      finally
+      catch (HornetQException e)
       {
-         channel.unlock();
-      }
 
-      if (resetCreditManager)
-      {
-         producerCreditManager.reset();
+         log.warn("Error on resending metadata: " + metadata, e);
 
-         // Also need to send more credits for consumers, otherwise the system could hand with the server
-         // not having any credits to send
       }
    }
-   
+
+   public void addMetaData(String key, String data) throws HornetQException
+   {
+      metadata.put(key, data);
+      channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
+   }
+
    public ClientSessionFactoryInternal getSessionFactory()
    {
       return sessionFactory;
@@ -1735,8 +1763,6 @@
       }
    }
 
-
-
    private static class BindingQueryImpl implements BindingQuery
    {
 
@@ -1823,9 +1849,4 @@
       }
 
    }
-
-   public void addMetaData(String key, String data) throws HornetQException
-   {
-      channel.sendBlocking(new SessionAddMetaDataMessage(key, data));
-   }
 }



More information about the hornetq-commits mailing list