[hornetq-commits] JBoss hornetq SVN: r8623 - in trunk: src/main/org/hornetq/core/remoting/impl/wireformat and 6 other directories.

do-not-reply at jboss.org do-not-reply at jboss.org
Tue Dec 8 13:40:16 EST 2009


Author: timfox
Date: 2009-12-08 13:40:15 -0500 (Tue, 08 Dec 2009)
New Revision: 8623

Added:
   trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
Modified:
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
   trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
   trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java
   trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
   trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
   trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
   trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
   trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
   trunk/src/main/org/hornetq/jms/client/HornetQSession.java
   trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
   trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
Log:
https://jira.jboss.org/jira/browse/HORNETQ-121 and https://jira.jboss.org/jira/browse/HORNETQ-238

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerImpl.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -27,6 +27,7 @@
 import org.hornetq.core.remoting.Channel;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerCloseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionConsumerFlowCreditMessage;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.utils.Future;
@@ -112,6 +113,8 @@
    private boolean stopped = false;
 
    private final AtomicLong forceDeliveryCount = new AtomicLong(0);
+   
+   private final SessionQueueQueryResponseMessage queueInfo;
 
    // Constructors
    // ---------------------------------------------------------------------------------
@@ -125,7 +128,8 @@
                              final int ackBatchSize,
                              final TokenBucketLimiter rateLimiter,
                              final Executor executor,
-                             final Channel channel)
+                             final Channel channel,
+                             final SessionQueueQueryResponseMessage queueInfo)
    {
       this.id = id;
 
@@ -146,6 +150,8 @@
       this.clientWindowSize = clientWindowSize;
 
       this.ackBatchSize = ackBatchSize;
+      
+      this.queueInfo = queueInfo;
    }
 
    // ClientConsumer implementation
@@ -424,6 +430,11 @@
    // ClientConsumerInternal implementation
    // --------------------------------------------------------------
 
+   public SessionQueueQueryResponseMessage getQueueInfo()
+   {
+      return queueInfo;
+   }
+   
    public long getID()
    {
       return id;

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientConsumerInternal.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -16,6 +16,7 @@
 import org.hornetq.core.client.ClientConsumer;
 import org.hornetq.core.client.ClientMessage;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.utils.SimpleString;
@@ -62,4 +63,6 @@
    void stop() throws HornetQException;
 
    void start();
+   
+   SessionQueueQueryResponseMessage getQueueInfo();
 }

Modified: trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/ClientSessionImpl.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -922,6 +922,23 @@
 
                for (Map.Entry<Long, ClientConsumerInternal> entry : consumers.entrySet())
                {
+                  SessionQueueQueryResponseMessage queueInfo = entry.getValue().getQueueInfo();
+                  
+                  // We try and recreate any non durable queues, since they probably won't be there unless
+                  // they are defined in hornetq-configuration.xml
+                  // This allows e.g. JMS non durable subs and temporary queues to continue to be used after failover
+                  if (!queueInfo.isDurable())
+                  {
+                     CreateQueueMessage createQueueRequest = new CreateQueueMessage(queueInfo.getAddress(),
+                                                                                    queueInfo.getName(),
+                                                                                    queueInfo.getFilterString(),
+                                                                                    false,
+                                                                                    queueInfo.isTemporary(),
+                                                                                    false);
+
+                     sendPacketWithoutLock(createQueueRequest);
+                  }
+
                   SessionCreateConsumerMessage createConsumerRequest = new SessionCreateConsumerMessage(entry.getKey(),
                                                                                                         entry.getValue()
                                                                                                              .getQueueName(),
@@ -931,14 +948,8 @@
                                                                                                              .isBrowseOnly(),
                                                                                                         false);
 
-                  createConsumerRequest.setChannelID(channel.getID());
-
-                  Connection conn = channel.getConnection().getTransportConnection();
-
-                  HornetQBuffer buffer = createConsumerRequest.encode(channel.getConnection());
-
-                  conn.write(buffer, false);
-
+                  sendPacketWithoutLock(createConsumerRequest);
+                  
                   int clientWindowSize = entry.getValue().getClientWindowSize();
 
                   if (clientWindowSize != 0)
@@ -946,11 +957,7 @@
                      SessionConsumerFlowCreditMessage packet = new SessionConsumerFlowCreditMessage(entry.getKey(),
                                                                                                     clientWindowSize);
 
-                     packet.setChannelID(channel.getID());
-
-                     buffer = packet.encode(channel.getConnection());
-
-                     conn.write(buffer, false);
+                     sendPacketWithoutLock(packet);
                   }
                }
 
@@ -1006,6 +1013,17 @@
       }
    }
 
+   private void sendPacketWithoutLock(final Packet packet)
+   {
+      packet.setChannelID(channel.getID());
+
+      Connection conn = channel.getConnection().getTransportConnection();
+
+      HornetQBuffer buffer = packet.encode(channel.getConnection());
+
+      conn.write(buffer, false);
+   }
+
    public void workDone()
    {
       workDone = true;
@@ -1478,7 +1496,7 @@
                                                                               browseOnly,
                                                                               true);
 
-      channel.sendBlocking(request);
+      SessionQueueQueryResponseMessage queueInfo = (SessionQueueQueryResponseMessage)channel.sendBlocking(request);
 
       // The actual windows size that gets used is determined by the user since
       // could be overridden on the queue settings
@@ -1497,7 +1515,8 @@
                                                                                                                 false)
                                                                                   : null,
                                                                executor,
-                                                               channel);
+                                                               channel,
+                                                               queueInfo);
 
       addConsumer(consumer);
 
@@ -1546,7 +1565,7 @@
          throw new HornetQException(HornetQException.INTERNAL_ERROR, "Queue can not be both durable and temporay");
       }
 
-      CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp);
+      CreateQueueMessage request = new CreateQueueMessage(address, queueName, filterString, durable, temp, true);
 
       channel.sendBlocking(request);
    }

Modified: trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/client/impl/FailoverManagerImpl.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -546,7 +546,7 @@
    private void failoverOrReconnect(final Object connectionID, final HornetQException me)
    {
       Set<ClientSessionInternal> sessionsToClose = null;
-
+      
       synchronized (failoverLock)
       {
          if (connection == null || connection.getID() != connectionID)
@@ -555,6 +555,7 @@
             // over then a async connection exception or disconnect
             // came in for one of the already exitLoop connections, so we return true - we don't want to call the
             // listeners again
+            
             return;
          }
 
@@ -602,7 +603,7 @@
          {
             attemptReconnect = reconnectAttempts != 0;
          }
-
+         
          if (attemptFailover || attemptReconnect)
          {
             lockChannel1();

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/CreateQueueMessage.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -39,6 +39,8 @@
    private boolean durable;
 
    private boolean temporary;
+   
+   private boolean requiresResponse;
 
    // Static --------------------------------------------------------
 
@@ -48,7 +50,8 @@
                              final SimpleString queueName,
                              final SimpleString filterString,
                              final boolean durable,
-                             final boolean temporary)
+                             final boolean temporary,
+                             final boolean requiresResponse)
    {
       super(PacketImpl.CREATE_QUEUE);
 
@@ -57,6 +60,7 @@
       this.filterString = filterString;
       this.durable = durable;
       this.temporary = temporary;
+      this.requiresResponse = requiresResponse;
    }
 
    public CreateQueueMessage()
@@ -103,6 +107,11 @@
    {
       return temporary;
    }
+   
+   public boolean isRequiresResponse()
+   {
+      return requiresResponse;
+   }
 
    @Override
    public void encodeRest(final HornetQBuffer buffer)
@@ -112,6 +121,7 @@
       buffer.writeNullableSimpleString(filterString);
       buffer.writeBoolean(durable);
       buffer.writeBoolean(temporary);
+      buffer.writeBoolean(requiresResponse);
    }
 
    @Override
@@ -122,6 +132,7 @@
       filterString = buffer.readNullableSimpleString();
       durable = buffer.readBoolean();
       temporary = buffer.readBoolean();
+      requiresResponse = buffer.readBoolean();
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/remoting/impl/wireformat/SessionQueueQueryResponseMessage.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -25,6 +25,8 @@
  */
 public class SessionQueueQueryResponseMessage extends PacketImpl
 {
+   private SimpleString name;
+   
    private boolean exists;
 
    private boolean durable;
@@ -36,31 +38,39 @@
    private SimpleString filterString;
 
    private SimpleString address;
+   
+   private boolean temporary;
 
-   public SessionQueueQueryResponseMessage(final boolean durable,
+   public SessionQueueQueryResponseMessage(final SimpleString name,
+                                           final SimpleString address,                                           
+                                           final boolean durable,
+                                           final boolean temporary,
+                                           final SimpleString filterString,
                                            final int consumerCount,
-                                           final int messageCount,
-                                           final SimpleString filterString,
-                                           final SimpleString address)
+                                           final int messageCount)
    {
-      this(durable, consumerCount, messageCount, filterString, address, true);
+      this(name, address, durable, temporary, filterString, consumerCount, messageCount, true);
    }
 
    public SessionQueueQueryResponseMessage()
    {
-      this(false, 0, 0, null, null, false);
+      this(null, null, false, false, null, 0, 0, false);
    }
 
-   private SessionQueueQueryResponseMessage(final boolean durable,
+   private SessionQueueQueryResponseMessage(final SimpleString name,
+                                            final SimpleString address,
+                                            final boolean durable,
+                                            final boolean temporary,
+                                            final SimpleString filterString,
                                             final int consumerCount,
                                             final int messageCount,
-                                            final SimpleString filterString,
-                                            final SimpleString address,
                                             final boolean exists)
    {
       super(PacketImpl.SESS_QUEUEQUERY_RESP);
 
       this.durable = durable;
+      
+      this.temporary = temporary;
 
       this.consumerCount = consumerCount;
 
@@ -69,6 +79,8 @@
       this.filterString = filterString;
 
       this.address = address;
+      
+      this.name = name;
 
       this.exists = exists;
    }
@@ -108,16 +120,28 @@
    {
       return address;
    }
+   
+   public SimpleString getName()
+   {
+      return name;
+   }
+   
+   public boolean isTemporary()
+   {
+      return temporary;
+   }
 
    @Override
    public void encodeRest(final HornetQBuffer buffer)
    {
       buffer.writeBoolean(exists);
       buffer.writeBoolean(durable);
+      buffer.writeBoolean(temporary);
       buffer.writeInt(consumerCount);
       buffer.writeInt(messageCount);
       buffer.writeNullableSimpleString(filterString);
       buffer.writeNullableSimpleString(address);
+      buffer.writeNullableSimpleString(name);
    }
 
    @Override
@@ -125,10 +149,12 @@
    {
       exists = buffer.readBoolean();
       durable = buffer.readBoolean();
+      temporary = buffer.readBoolean();
       consumerCount = buffer.readInt();
       messageCount = buffer.readInt();
       filterString = buffer.readNullableSimpleString();
       address = buffer.readNullableSimpleString();
+      name = buffer.readNullableSimpleString();
    }
 
    @Override

Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/ClusterConnectionImpl.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -367,8 +367,7 @@
    }
 
    private void updateConnectors(final Map<String, DiscoveryEntry> connectors) throws Exception
-   {
-      System.out.println("ClusterConnectionImpl.updateConnectors");
+   {     
       Iterator<Map.Entry<String, MessageFlowRecord>> iter = records.entrySet().iterator();
 
       while (iter.hasNext())

Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -376,7 +376,7 @@
    public void handleCreateConsumer(final SessionCreateConsumerMessage packet)
    {
       SimpleString name = packet.getQueueName();
-
+      
       SimpleString filterString = packet.getFilterString();
 
       boolean browseOnly = packet.isBrowseOnly();
@@ -437,7 +437,17 @@
             managementService.sendNotification(notification);
          }
 
-         response = new NullResponseMessage();
+         //We send back queue information on the queue as a response-  this allows the queue to
+         //be automaticall recreated on failover
+         
+         if (packet.isRequiresResponse())
+         {
+            response = doExecuteQueueQuery(name);
+         }
+         else
+         {
+            response = null;
+         }
       }
       catch (Exception e)
       {
@@ -451,7 +461,7 @@
             response = new HornetQExceptionMessage(new HornetQException(HornetQException.INTERNAL_ERROR));
          }
       }
-
+      
       sendResponse(packet, response, false, false);
    }
 
@@ -460,7 +470,7 @@
       SimpleString address = packet.getAddress();
 
       final SimpleString name = packet.getQueueName();
-
+      
       SimpleString filterString = packet.getFilterString();
 
       boolean temporary = packet.isTemporary();
@@ -510,7 +520,14 @@
             });
          }
 
-         response = new NullResponseMessage();
+         if (packet.isRequiresResponse())
+         {
+            response = new NullResponseMessage();
+         }
+         else
+         {
+            response = null;
+         }
       }
       catch (Exception e)
       {
@@ -562,7 +579,7 @@
 
       sendResponse(packet, response, false, false);
    }
-
+   
    public void handleExecuteQueueQuery(final SessionQueueQueryMessage packet)
    {
       SimpleString name = packet.getQueueName();
@@ -571,35 +588,7 @@
 
       try
       {
-         if (name == null)
-         {
-            throw new IllegalArgumentException("Queue name is null");
-         }
-
-         Binding binding = postOffice.getBinding(name);
-
-         if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
-         {
-            Queue queue = (Queue)binding.getBindable();
-
-            Filter filter = queue.getFilter();
-
-            SimpleString filterString = filter == null ? null : filter.getFilterString();
-            response = new SessionQueueQueryResponseMessage(queue.isDurable(),
-                                                            queue.getConsumerCount(),
-                                                            queue.getMessageCount(),
-                                                            filterString,
-                                                            binding.getAddress());
-         }
-         // make an exception for the management address (see HORNETQ-29)
-         else if (name.equals(managementAddress))
-         {
-            response = new SessionQueueQueryResponseMessage(true, -1, -1, null, managementAddress);
-         }
-         else
-         {
-            response = new SessionQueueQueryResponseMessage();
-         }
+         response = doExecuteQueueQuery(name);
       }
       catch (Exception e)
       {
@@ -1419,6 +1408,7 @@
       if (consumer == null)
       {
          ServerSessionImpl.log.error("There is no consumer with id " + packet.getConsumerID());
+         
          return;
       }
 
@@ -1710,6 +1700,46 @@
    // Private
    // ----------------------------------------------------------------------------
 
+   private SessionQueueQueryResponseMessage doExecuteQueueQuery(final SimpleString name) throws Exception
+   {
+      if (name == null)
+      {
+         throw new IllegalArgumentException("Queue name is null");
+      }
+      
+      SessionQueueQueryResponseMessage response;
+
+      Binding binding = postOffice.getBinding(name);
+
+      if (binding != null && binding.getType() == BindingType.LOCAL_QUEUE)
+      {
+         Queue queue = (Queue)binding.getBindable();
+
+         Filter filter = queue.getFilter();
+
+         SimpleString filterString = filter == null ? null : filter.getFilterString();
+         
+         response = new SessionQueueQueryResponseMessage(name,
+                                                         binding.getAddress(),
+                                                         queue.isDurable(),
+                                                         queue.isTemporary(),
+                                                         filterString,                                                         
+                                                         queue.getConsumerCount(),
+                                                         queue.getMessageCount());
+      }
+      // make an exception for the management address (see HORNETQ-29)
+      else if (name.equals(managementAddress))
+      {
+         response = new SessionQueueQueryResponseMessage(name, managementAddress, true, false, null, -1, -1);
+      }
+      else
+      {
+         response = new SessionQueueQueryResponseMessage();
+      }
+      
+      return response;
+   }
+   
    private void sendResponse(final Packet confirmPacket,
                              final Packet response,
                              final boolean flush,

Modified: trunk/src/main/org/hornetq/jms/client/HornetQConnection.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQConnection.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -261,16 +261,26 @@
          {
             if (!tempQueues.isEmpty())
             {
-               if (initialSession == null)
-               {
-                  initialSession = sessionFactory.createSession(username, password, false, true, true, false, 0);
-               }
+//               if (initialSession == null)
+//               {
+//                  initialSession = sessionFactory.createSession(username, password, false, true, true, false, 0);
+//               }
 
                // Remove any temporary queues
 
                for (SimpleString queueName : tempQueues)
                {
-                  initialSession.deleteQueue(queueName);
+                  if (!initialSession.isClosed())
+                  {
+                     try
+                     {
+                        initialSession.deleteQueue(queueName);
+                     }
+                     catch (HornetQException ignore)
+                     {
+                        //Exception on deleting queue shouldn't prevent close from completing
+                     }
+                  }
                }
             }
          }

Modified: trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQMessageConsumer.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -145,7 +145,7 @@
 
          if (autoDeleteQueueName != null)
          {
-            // If non durable subscriber need to delete subscription too
+            // If non durable subscriber need to delete subscription too  
             session.deleteQueue(autoDeleteQueueName);
          }
 

Modified: trunk/src/main/org/hornetq/jms/client/HornetQSession.java
===================================================================
--- trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/src/main/org/hornetq/jms/client/HornetQSession.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -939,19 +939,7 @@
          throw JMSExceptionHelper.convertFromHornetQException(e);
       }
    }
-
-   public void deleteQueue(final SimpleString queueName) throws JMSException
-   {
-      try
-      {
-         session.deleteQueue(queueName);
-      }
-      catch (HornetQException e)
-      {
-         throw JMSExceptionHelper.convertFromHornetQException(e);
-      }
-   }
-
+   
    public void start() throws JMSException
    {
       try
@@ -983,6 +971,21 @@
 
    // Package protected ---------------------------------------------
 
+   void deleteQueue(final SimpleString queueName) throws JMSException
+   {
+      if (!session.isClosed())
+      {
+         try
+         {
+            session.deleteQueue(queueName);
+         }
+         catch (HornetQException ignore)
+         {
+            //Exception on deleting queue shouldn't prevent close from completing
+         }
+      }
+   }
+   
    // Protected -----------------------------------------------------
 
    // Private -------------------------------------------------------

Modified: trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/tests/src/org/hornetq/tests/integration/cluster/failover/FailoverTest.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -1350,7 +1350,7 @@
    {
       ClientSessionFactoryInternal sf = getSessionFactory();
 
-      ClientSession session = sendAndConsume(sf);
+      ClientSession session = sendAndConsume(sf, true);
 
       final CountDownLatch latch = new CountDownLatch(1);
 
@@ -1379,7 +1379,7 @@
 
       sf = new ClientSessionFactoryImpl(getConnectorTransportConfiguration(false));
 
-      session = sendAndConsume(sf);
+      session = sendAndConsume(sf, false);
 
       session.close();
 
@@ -1757,8 +1757,28 @@
       Assert.assertEquals(0, sf.numConnections());
    }
 
-   public void testSimpleSendAfterFailover() throws Exception
+   public void testSimpleSendAfterFailoverDurableTemporary() throws Exception
    {
+      testSimpleSendAfterFailover(true, true);
+   }
+   
+   public void testSimpleSendAfterFailoverNonDurableTemporary() throws Exception
+   {
+      testSimpleSendAfterFailover(false, true);
+   }
+   
+   public void testSimpleSendAfterFailoverDurableNonTemporary() throws Exception
+   {
+      testSimpleSendAfterFailover(true, false);
+   }
+   
+   public void testSimpleSendAfterFailoverNonDurableNonTemporary() throws Exception
+   {
+      testSimpleSendAfterFailover(false, false);
+   }
+   
+   private void testSimpleSendAfterFailover(final boolean durable, final boolean temporary) throws Exception
+   {
       ClientSessionFactoryInternal sf = getSessionFactory();
 
       sf.setBlockOnNonDurableSend(true);
@@ -1767,8 +1787,15 @@
 
       ClientSession session = sf.createSession(true, true, 0);
 
-      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
-
+      if (temporary)
+      {
+         session.createTemporaryQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null);
+      }
+      else
+      {
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, durable);         
+      }
+                
       final CountDownLatch latch = new CountDownLatch(1);
 
       class MyListener extends BaseListener
@@ -2254,11 +2281,14 @@
 
    // Private -------------------------------------------------------
 
-   private ClientSession sendAndConsume(final ClientSessionFactory sf) throws Exception
+   private ClientSession sendAndConsume(final ClientSessionFactory sf, final boolean createQueue) throws Exception
    {
       ClientSession session = sf.createSession(false, true, true);
 
-      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+      if (createQueue)
+      {
+         session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, false);
+      }
 
       ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
 

Added: trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java	                        (rev 0)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/cluster/JMSReconnectTest.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -0,0 +1,371 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied.  See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.tests.integration.jms.cluster;
+
+import javax.jms.BytesMessage;
+import javax.jms.Connection;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import junit.framework.Assert;
+
+import org.hornetq.core.client.ClientSession;
+import org.hornetq.core.client.impl.ClientSessionInternal;
+import org.hornetq.core.config.Configuration;
+import org.hornetq.core.config.TransportConfiguration;
+import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
+import org.hornetq.core.remoting.RemotingConnection;
+import org.hornetq.core.remoting.impl.invm.InVMRegistry;
+import org.hornetq.core.server.HornetQ;
+import org.hornetq.core.server.HornetQServer;
+import org.hornetq.jms.HornetQQueue;
+import org.hornetq.jms.HornetQTopic;
+import org.hornetq.jms.client.HornetQConnectionFactory;
+import org.hornetq.jms.client.HornetQSession;
+import org.hornetq.tests.util.RandomUtil;
+import org.hornetq.tests.util.UnitTestCase;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * 
+ * A JMSReconnectTest
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public class JMSReconnectTest extends UnitTestCase
+{
+   private static final Logger log = Logger.getLogger(JMSReconnectTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private HornetQServer liveService;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   //In this test we re-attach to the same node without restarting the server
+   public void testReattachSameNode() throws Exception
+   {
+      testReconnectOrReattachSameNode(true);
+   }
+   
+   //In this test, we reconnect to the same node without restarting the server
+   public void testReconnectSameNode() throws Exception
+   {
+      testReconnectOrReattachSameNode(false);
+   }
+      
+   private void testReconnectOrReattachSameNode(boolean reattach) throws Exception
+   {
+      HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));                       
+
+      jbcf.setBlockOnDurableSend(true);
+      jbcf.setBlockOnNonDurableSend(true);
+      
+      jbcf.setReconnectAttempts(-1);
+      
+      if (reattach)
+      {
+         jbcf.setConfirmationWindowSize(1024 * 1024);
+      }
+      
+      // Note we set consumer window size to a value so we can verify that consumer credit re-sending
+      // works properly on failover
+      // The value is small enough that credits will have to be resent several time
+
+      final int numMessages = 10;
+
+      final int bodySize = 1000;
+
+      jbcf.setConsumerWindowSize(numMessages * bodySize / 10);
+
+      Connection conn = jbcf.createConnection();
+
+      MyExceptionListener listener = new MyExceptionListener();
+
+      conn.setExceptionListener(listener);
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      RemotingConnection coreConn = ((ClientSessionInternal)coreSession).getConnection();
+
+      SimpleString jmsQueueName = new SimpleString(HornetQQueue.JMS_QUEUE_ADDRESS_PREFIX + "myqueue");
+
+      coreSession.createQueue(jmsQueueName, jmsQueueName, null, true);
+
+      Queue queue = sess.createQueue("myqueue");
+
+      MessageProducer producer = sess.createProducer(queue);
+
+      producer.setDeliveryMode(DeliveryMode.PERSISTENT);
+
+      MessageConsumer consumer = sess.createConsumer(queue);
+
+      byte[] body = RandomUtil.randomBytes(bodySize);
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage bm = sess.createBytesMessage();
+
+         bm.writeBytes(body);
+
+         producer.send(bm);
+      }
+
+      conn.start();
+
+      log.info("sent messages and started connection");
+
+      Thread.sleep(2000);
+
+      HornetQException me = new HornetQException(HornetQException.NOT_CONNECTED);
+
+      coreConn.fail(me);
+      
+      //It should reconnect to the same node
+
+      for (int i = 0; i < numMessages; i++)
+      {
+         log.info("got message " + i);
+
+         BytesMessage bm = (BytesMessage)consumer.receive(1000);
+
+         Assert.assertNotNull(bm);
+
+         Assert.assertEquals(body.length, bm.getBodyLength());
+      }
+
+      TextMessage tm = (TextMessage)consumer.receiveNoWait();
+
+      Assert.assertNull(tm);
+
+      conn.close();
+
+      Assert.assertNotNull(listener.e);
+
+      Assert.assertTrue(me == listener.e.getCause());
+   }
+   
+   public void testReconnectSameNodeServerRestartedWithNonDurableSub() throws Exception
+   {
+      testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(true);
+   }
+   
+   public void testReconnectSameNodeServerRestartedWithTempQueue() throws Exception
+   {
+      testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(false);
+   }
+   
+   //Test that non durable JMS sub gets recreated in auto reconnect
+   private void testReconnectSameNodeServerRestartedWithNonDurableSubOrTempQueue(final boolean durableSub) throws Exception
+   {
+      HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));                       
+
+      jbcf.setReconnectAttempts(-1);
+           
+      Connection conn = jbcf.createConnection();
+
+      MyExceptionListener listener = new MyExceptionListener();
+
+      conn.setExceptionListener(listener);
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      Destination dest;
+      
+      if (durableSub)
+      {      
+         coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+   
+         dest = new HornetQTopic("mytopic");
+      }
+      else
+      {
+         dest = sess.createTemporaryQueue();
+      }
+
+      MessageProducer producer = sess.createProducer(dest);
+
+      //Create a non durable subscriber
+      MessageConsumer consumer = sess.createConsumer(dest);
+
+      this.liveService.stop();
+      
+      this.liveService.start();
+      
+      //Allow client some time to reconnect
+      Thread.sleep(3000);
+      
+      log.info("now sending some messages");
+      
+      final int numMessages = 100;
+      
+      byte[] body = RandomUtil.randomBytes(1000);
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage bm = sess.createBytesMessage();
+
+         bm.writeBytes(body);
+
+         producer.send(bm);
+      }
+
+      conn.start();
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         BytesMessage bm = (BytesMessage)consumer.receive(1000);
+
+         Assert.assertNotNull(bm);
+
+         Assert.assertEquals(body.length, bm.getBodyLength());
+      }
+
+      TextMessage tm = (TextMessage)consumer.receiveNoWait();
+
+      Assert.assertNull(tm);
+
+      conn.close();
+
+      Assert.assertNotNull(listener.e);
+   }
+   
+   //If the server is shutdown after a non durable sub is created, then close on the connection should proceed normally
+   public void testNoReconnectCloseAfterFailToReconnectWithTopicConsumer() throws Exception
+   {
+      HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));                       
+
+      jbcf.setReconnectAttempts(0);
+      
+      Connection conn = jbcf.createConnection();
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+      ClientSession coreSession = ((HornetQSession)sess).getCoreSession();
+
+      coreSession.createQueue(HornetQTopic.JMS_TOPIC_ADDRESS_PREFIX + "mytopic", "blahblah", null, false);
+
+      Topic topic = new HornetQTopic("mytopic");
+      
+      //Create a non durable subscriber
+      MessageConsumer consumer = sess.createConsumer(topic);      
+
+      Thread.sleep(2000);
+ 
+      this.liveService.stop();
+      
+      this.liveService.start();
+      
+      sess.close();
+    
+      conn.close();
+   }
+   
+   //If server is shutdown, and then connection is closed, after a temp queue has been created, the close should complete normally
+   public void testNoReconnectCloseAfterFailToReconnectWithTempQueue() throws Exception
+   {
+      HornetQConnectionFactory jbcf = new HornetQConnectionFactory(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMConnectorFactory"));                       
+
+      jbcf.setReconnectAttempts(0);
+      
+      Connection conn = jbcf.createConnection();
+
+      Session sess = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+      
+      sess.createTemporaryQueue();
+      
+      Thread.sleep(2000);
+
+      this.liveService.stop();
+      
+      this.liveService.start();
+      
+      sess.close();
+    
+      conn.close();
+   }
+
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      super.setUp();
+     
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBindingsDirectory(getBindingsDir());
+      liveConf.setJournalMinFiles(2);
+      liveConf.setJournalDirectory(getJournalDir());
+      liveConf.setPagingDirectory(getPageDir());
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir());
+
+      liveService = HornetQ.newHornetQServer(liveConf, true);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      liveService.stop();
+
+      Assert.assertEquals(0, InVMRegistry.instance.size());
+
+      liveService = null;
+
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+   private static class MyExceptionListener implements ExceptionListener
+   {
+      volatile JMSException e;
+
+      public void onException(final JMSException e)
+      {
+         this.e = e;
+      }
+   }
+
+}

Modified: trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2009-12-08 17:05:54 UTC (rev 8622)
+++ trunk/tests/src/org/hornetq/tests/unit/core/client/impl/LargeMessageBufferTest.java	2009-12-08 18:40:15 UTC (rev 8623)
@@ -35,6 +35,7 @@
 import org.hornetq.core.client.impl.ClientMessageInternal;
 import org.hornetq.core.client.impl.LargeMessageBufferImpl;
 import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveContinuationMessage;
 import org.hornetq.core.remoting.impl.wireformat.SessionReceiveLargeMessage;
 import org.hornetq.tests.util.RandomUtil;
@@ -724,6 +725,12 @@
 
       }
 
+      public SessionQueueQueryResponseMessage getQueueInfo()
+      {
+         // TODO Auto-generated method stub
+         return null;
+      }
+
    }
 
 }



More information about the hornetq-commits mailing list