[hornetq-commits] JBoss hornetq SVN: r7930 - in trunk: src/main/org/hornetq/core/exception and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Fri Aug 28 06:12:42 EDT 2009


Author: timfox
Date: 2009-08-28 06:12:42 -0400 (Fri, 28 Aug 2009)
New Revision: 7930

Modified:
   trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
   trunk/src/main/org/hornetq/core/exception/HornetQException.java
   trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
   trunk/src/main/org/hornetq/core/server/Queue.java
   trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
   trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
Log:
fixed tests and made unblock cleaner

Modified: trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/client/impl/ConnectionManagerImpl.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -13,8 +13,6 @@
 
 package org.hornetq.core.client.impl;
 
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
-
 import java.lang.ref.WeakReference;
 import java.util.ArrayList;
 import java.util.Collections;
@@ -140,7 +138,7 @@
    private Future<?> pingerFuture;
 
    private PingRunnable pingRunnable;
-   
+
    private volatile boolean exitLoop;
 
    // debug
@@ -282,13 +280,13 @@
                   if (connection == null)
                   {
                      if (exitLoop)
-                     {                        
+                     {
                         return null;
                      }
                      // This can happen if the connection manager gets exitLoop - e.g. the server gets shut down
 
                      throw new HornetQException(HornetQException.NOT_CONNECTED,
-                                                  "Unable to connect to server using configuration " + connectorConfig);
+                                                "Unable to connect to server using configuration " + connectorConfig);
                   }
 
                   channel1 = connection.getChannel(1, -1, false);
@@ -319,56 +317,66 @@
                                                          preAcknowledge,
                                                          producerWindowSize);
 
-               Packet pResponse = channel1.sendBlocking(request);
-
-               if (pResponse.getType() == EARLY_RESPONSE)
+               Packet pResponse;
+               try
                {
-                  // This means the thread was blocked on create session and failover unblocked it
-                  // so failover could occur
+                  pResponse = channel1.sendBlocking(request);
+               }
+               catch (HornetQException e)
+               {
+                  if (e.getCode() == HornetQException.UNBLOCKED)
+                  {
+                     // This means the thread was blocked on create session and failover unblocked it
+                     // so failover could occur
 
-                  // So we just need to return our connections and flag for retry
+                     // So we just need to return our connections and flag for retry
 
-                  returnConnection(connection.getID());
+                     returnConnection(connection.getID());
 
-                  retry = true;
+                     retry = true;
+                     
+                     continue;
+                  }
+                  else
+                  {
+                     throw e;
+                  }
                }
-               else
-               {
-                  CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
 
-                  Channel sessionChannel = connection.getChannel(sessionChannelID,
-                                                                 producerWindowSize,
-                                                                 producerWindowSize != -1);
+               CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
 
-                  ClientSessionInternal session = new ClientSessionImpl(this,
-                                                                        name,
-                                                                        xa,
-                                                                        autoCommitSends,
-                                                                        autoCommitAcks,
-                                                                        preAcknowledge,
-                                                                        blockOnAcknowledge,
-                                                                        autoGroup,
-                                                                        ackBatchSize,
-                                                                        consumerWindowSize,
-                                                                        consumerMaxRate,
-                                                                        producerMaxRate,
-                                                                        blockOnNonPersistentSend,
-                                                                        blockOnPersistentSend,
-                                                                        cacheLargeMessageClient,
-                                                                        minLargeMessageSize,
-                                                                        connection,
-                                                                        response.getServerVersion(),
-                                                                        sessionChannel,
-                                                                        orderedExecutorFactory.getExecutor());
+               Channel sessionChannel = connection.getChannel(sessionChannelID,
+                                                              producerWindowSize,
+                                                              producerWindowSize != -1);
 
-                  sessions.put(session, connection);
+               ClientSessionInternal session = new ClientSessionImpl(this,
+                                                                     name,
+                                                                     xa,
+                                                                     autoCommitSends,
+                                                                     autoCommitAcks,
+                                                                     preAcknowledge,
+                                                                     blockOnAcknowledge,
+                                                                     autoGroup,
+                                                                     ackBatchSize,
+                                                                     consumerWindowSize,
+                                                                     consumerMaxRate,
+                                                                     producerMaxRate,
+                                                                     blockOnNonPersistentSend,
+                                                                     blockOnPersistentSend,
+                                                                     cacheLargeMessageClient,
+                                                                     minLargeMessageSize,
+                                                                     connection,
+                                                                     response.getServerVersion(),
+                                                                     sessionChannel,
+                                                                     orderedExecutorFactory.getExecutor());
 
-                  ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
+               sessions.put(session, connection);
 
-                  sessionChannel.setHandler(handler);
+               ChannelHandler handler = new ClientSessionPacketHandler(session, sessionChannel);
 
-                  return new DelegatingSession(session);
-               }
+               sessionChannel.setHandler(handler);
+
+               return new DelegatingSession(session);
             }
             catch (Throwable t)
             {
@@ -391,7 +399,7 @@
                else
                {
                   HornetQException me = new HornetQException(HornetQException.INTERNAL_ERROR,
-                                                                 "Failed to create session");
+                                                             "Failed to create session");
 
                   me.initCause(t);
 
@@ -456,9 +464,7 @@
    {
       return listeners.remove(listener);
    }
-   
-   
-   
+
    public void causeExit()
    {
       exitLoop = true;
@@ -531,9 +537,8 @@
 
          boolean serverShutdown = me.getCode() == HornetQException.DISCONNECTED;
 
-         boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0)
-                                                && (failoverOnServerShutdown || !serverShutdown);
-         
+         boolean attemptFailoverOrReconnect = (backupConnectorFactory != null || reconnectAttempts != 0) && (failoverOnServerShutdown || !serverShutdown);
+
          if (attemptFailoverOrReconnect)
          {
             lockAllChannel1s();
@@ -779,7 +784,7 @@
          {
             return null;
          }
-         
+
          RemotingConnection connection = getConnection(initialRefCount);
 
          if (connection == null)
@@ -1099,7 +1104,7 @@
                public void run()
                {
                   conn.fail(new HornetQException(HornetQException.DISCONNECTED,
-                                                   "The connection was exitLoop by the server"));
+                                                 "The connection was exitLoop by the server"));
                }
             });
          }
@@ -1252,7 +1257,7 @@
                   if (!connection.checkDataReceived())
                   {
                      final HornetQException me = new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                                                          "Did not receive data from server for " + connection.getTransportConnection());
+                                                                      "Did not receive data from server for " + connection.getTransportConnection());
 
                      threadPool.execute(new Runnable()
                      {

Modified: trunk/src/main/org/hornetq/core/exception/HornetQException.java
===================================================================
--- trunk/src/main/org/hornetq/core/exception/HornetQException.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/exception/HornetQException.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -34,9 +34,9 @@
 
    public static final int CONNECTION_TIMEDOUT = 003;
 
-   public static final int INTERRUPTED = 004;
+   public static final int DISCONNECTED = 004;
    
-   public static final int DISCONNECTED = 005;
+   public static final int UNBLOCKED = 005;
 
    public static final int QUEUE_DOES_NOT_EXIST = 100;
 

Modified: trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/management/impl/QueueControlImpl.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -206,14 +206,18 @@
       }
    }
 
-   public void setExpiryAddress(final String expiryAddres) throws Exception
+   public void setExpiryAddress(final String expiryAddress) throws Exception
    {
       AddressSettings addressSettings = addressSettingsRepository.getMatch(address);
 
-      if (expiryAddres != null)
+      SimpleString sExpiryAddress = new SimpleString(expiryAddress);
+      
+      if (expiryAddress != null)
       {
-         addressSettings.setExpiryAddress(new SimpleString(expiryAddres));
+         addressSettings.setExpiryAddress(sExpiryAddress);
       }
+      
+      queue.setExpiryAddress(sExpiryAddress);
    }
 
    public Map<String, Object>[] listScheduledMessages() throws Exception

Modified: trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/ChannelImpl.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -13,7 +13,6 @@
 
 package org.hornetq.core.remoting.impl;
 
-import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
 import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_RESPONSE;
 
@@ -146,8 +145,11 @@
 
       try
       {
-         response = new PacketImpl(EARLY_RESPONSE);
+         response = new HornetQExceptionMessage(new HornetQException(HornetQException.UNBLOCKED,
+                                                                     "Connection failure detected. Unblocking a blocking call that will never get a response"
 
+         ));
+
          sendCondition.signal();
       }
       finally
@@ -173,8 +175,7 @@
       {
          packet.setChannelID(id);
 
-         final HornetQBuffer buffer = connection.getTransportConnection()
-                                                  .createBuffer(packet.getRequiredBufferSize());
+         final HornetQBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
 
          int size = packet.encode(buffer);
 
@@ -243,8 +244,7 @@
       {
          packet.setChannelID(id);
 
-         final HornetQBuffer buffer = connection.getTransportConnection()
-                                                  .createBuffer(packet.getRequiredBufferSize());
+         final HornetQBuffer buffer = connection.getTransportConnection().createBuffer(packet.getRequiredBufferSize());
 
          int size = packet.encode(buffer);
 
@@ -302,7 +302,7 @@
             if (response == null)
             {
                throw new HornetQException(HornetQException.CONNECTION_TIMEDOUT,
-                                            "Timed out waiting for response when sending packet " + packet.getType());
+                                          "Timed out waiting for response when sending packet " + packet.getType());
             }
 
             if (response.getType() == PacketImpl.EXCEPTION)
@@ -359,7 +359,7 @@
             }
 
             final HornetQBuffer buffer = connection.getTransportConnection()
-                                                     .createBuffer(packet.getRequiredBufferSize());
+                                                   .createBuffer(packet.getRequiredBufferSize());
 
             packet.encode(buffer);
 
@@ -398,16 +398,16 @@
       List<Runnable> toRun = new ArrayList<Runnable>();
 
       synchronized (replicationLock)
-      {         
+      {
          playedResponsesOnFailure = true;
-         
+
          responseActionCount = 0;
       }
 
       while (true)
       {
          // Execute all the response actions now
-         
+
          Runnable action = responseActions.poll();
 
          if (action != null)
@@ -419,11 +419,11 @@
             break;
          }
       }
-      
+
       for (Runnable action : toRun)
       {
          action.run();
-      }      
+      }
    }
 
    public void setHandler(final ChannelHandler handler)
@@ -541,7 +541,7 @@
          lastReceivedCommandID++;
 
          receivedBytes += packet.getPacketSize();
-         
+
          if (receivedBytes >= confWindowSize)
          {
             receivedBytes = 0;
@@ -585,11 +585,11 @@
       else
       {
          if (packet.isResponse())
-         {            
+         {
             confirm(packet);
 
             lock.lock();
-            
+
             response = packet;
 
             try

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -54,8 +54,6 @@
 
    public static final byte REPLICATION_RESPONSE = 23;
 
-   public static final byte EARLY_RESPONSE = 24;
-
    // Server
    public static final byte CREATESESSION = 30;
 

Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/server/Queue.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -148,4 +148,6 @@
     * @return an immutable iterator which does not allow to remove references
     */
    Iterator<MessageReference> iterator();
+   
+   void setExpiryAddress(SimpleString expiryAddress);
 }

Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -147,7 +147,7 @@
 
    private ConcurrentMap<SimpleString, Consumer> groups = new ConcurrentHashMap<SimpleString, Consumer>();
    
-   private final SimpleString expiryAddress;
+   private volatile SimpleString expiryAddress;
 
    public QueueImpl(final long persistenceID,
                     final SimpleString address,
@@ -762,6 +762,11 @@
          acknowledge(ref);
       }
    }
+   
+   public void setExpiryAddress(final SimpleString expiryAddress)
+   {
+      this.expiryAddress = expiryAddress;
+   }
 
    public void referenceHandled()
    {

Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-08-27 21:58:51 UTC (rev 7929)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingImplTest.java	2009-08-28 10:12:42 UTC (rev 7930)
@@ -1016,13 +1016,18 @@
 
    class FakeQueue implements Queue
    {
-
       private SimpleString name;
       
       FakeQueue(SimpleString name)
       {
          this.name = name;
       }
+      
+      public void setExpiryAddress(SimpleString expiryAddress)
+      {
+         // TODO Auto-generated method stub
+         
+      }
 
       /* (non-Javadoc)
        * @see org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)



More information about the hornetq-commits mailing list