[jboss-cvs] JBoss Messaging SVN: r5281 - in trunk: src/main/org/jboss/messaging/core/client and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Nov 5 14:02:15 EST 2008


Author: timfox
Date: 2008-11-05 14:02:14 -0500 (Wed, 05 Nov 2008)
New Revision: 5281

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
Modified:
   trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
   trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
   trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
   trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
   trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
   trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverExpiredMessageTest.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
Log:
Moved send management to session and tests for failover of management messages


Modified: trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java
===================================================================
--- trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/examples/messaging/src/org/jboss/messaging/example/ManagementClient.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -55,13 +55,11 @@
       clientSession.addDestination(replytoQueue, false, true);
       clientSession.createQueue(replytoQueue, replytoQueue, null, false, true);
 
-      ClientProducer mngmntProducer = clientSession.createProducer(ManagementHelper.MANAGEMENT_DESTINATION);
-
       // create a management message to subscribe to notifications from the
       // server
       ClientMessage mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, true);
-      mngmntProducer.sendManagement(mngmntMessage);
+      clientSession.sendManagementMessage(mngmntMessage);
       System.out.println("send message to subscribe to notifications");
 
       ClientConsumer mngmntConsumer = clientSession.createConsumer(replytoQueue);
@@ -119,7 +117,7 @@
                                               ManagementServiceImpl.getMessagingServerObjectName(),
                                               "setMessageCounterSamplePeriod",
                                               (long)30000);
-      mngmntProducer.sendManagement(mngmntMessage);
+      clientSession.sendManagementMessage(mngmntMessage);
       System.out.println("sent management message to set an attribute");
 
       // create a message to retrieve one or many attributes
@@ -130,7 +128,7 @@
                                      "MessageCount",
                                      "Durable");
 
-      mngmntProducer.sendManagement(mngmntMessage);
+      clientSession.sendManagementMessage(mngmntMessage);
       System.out.println("sent management message to retrieve attributes");
 
       // create a message to invoke the operation sendMessageToDLQ(long) on the
@@ -141,14 +139,14 @@
                                               ManagementServiceImpl.getQueueObjectName(queue, queue),
                                               "sendMessageToDLQ",
                                               (long)6161);
-      mngmntProducer.sendManagement(mngmntMessage);
+      clientSession.sendManagementMessage(mngmntMessage);
       System.out.println("sent management message to invoke operation");
 
       // create a message to unsubscribe from the notifications sent by the
       // server
       mngmntMessage = clientSession.createClientMessage(false);
       ManagementHelper.putNotificationSubscription(mngmntMessage, replytoQueue, false);
-      mngmntProducer.sendManagement(mngmntMessage);
+      clientSession.sendManagementMessage(mngmntMessage);
       System.out.println("send message to unsubscribe to notifications");
 
       Thread.sleep(5000);

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientProducer.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -41,8 +41,6 @@
 
    void send(final SimpleString address, final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException;
 
-   void sendManagement(ClientMessage mngmntMessage) throws MessagingException;
-
    void registerAcknowledgementHandler(AcknowledgementHandler handler);
    
    void unregisterAcknowledgementHandler(AcknowledgementHandler handler);

Modified: trunk/src/main/org/jboss/messaging/core/client/ClientSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/client/ClientSession.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -117,4 +117,6 @@
    boolean removeFailureListener(FailureListener listener);
 
    int getVersion();
+   
+   void sendManagementMessage(ClientMessage message);
 }

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientProducerImpl.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -20,7 +20,6 @@
 import org.jboss.messaging.core.remoting.Channel;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionProducerCloseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionScheduledSendMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendMessage;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiter;
@@ -114,7 +113,7 @@
       doSend(address, msg, 0);
    }
 
-    public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
+   public void send(final ClientMessage msg, long scheduleDeliveryTime) throws MessagingException
    {
       checkClosed();
 
@@ -128,54 +127,6 @@
       doSend(address, msg, scheduleDeliveryTime);
    }
 
-   // use a special wireformat packet to sendScheduled management message (on the server-side they are
-   // handled by the server session differently from regular Client Message)
-   public void sendManagement(final ClientMessage msg) throws MessagingException
-   {
-      checkClosed();
-      
-      if (address != null)
-      {
-         msg.setDestination(address);
-      }
-      else
-      {
-         msg.setDestination(this.address);
-      }
-      
-      if (rateLimiter != null)
-      {
-         // Rate flow control
-                  
-         rateLimiter.limit();
-      }
-      
-      boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
-      
-      SessionSendManagementMessage message = new SessionSendManagementMessage(id, msg, false);
-      
-      if (sendBlocking)
-      {        
-         channel.sendBlocking(message);
-      }
-      else
-      {
-         channel.send(message);
-      }      
-      
-//      //We only flow control with non-anonymous producers
-//      if (address == null && creditFlowControl)
-//      {
-//         try
-//         {
-//            availableCredits.acquire(message.getClientMessage().getEncodeSize());
-//         }
-//         catch (InterruptedException e)
-//         {           
-//         }         
-//      }
-   }
-
    public void registerAcknowledgementHandler(final AcknowledgementHandler handler)
    {
       // TODO
@@ -273,7 +224,7 @@
          rateLimiter.limit();
       }
 
-      if(autoGroupId != null)
+      if (autoGroupId != null)
       {
          msg.putStringProperty(MessageImpl.GROUP_ID, autoGroupId);
       }
@@ -281,8 +232,9 @@
       boolean sendBlocking = msg.isDurable() ? blockOnPersistentSend : blockOnNonPersistentSend;
 
       SessionSendMessage message;
-      //check to see if this message need to be scheduled.
-      if(scheduledDeliveryTime <= 0)
+      
+      // check to see if this message need to be scheduled.
+      if (scheduledDeliveryTime <= 0)
       {
          message = new SessionSendMessage(id, msg, sendBlocking);
       }
@@ -291,7 +243,6 @@
          message = new SessionScheduledSendMessage(id, msg, sendBlocking, scheduledDeliveryTime);
       }
 
-
       if (sendBlocking)
       {
          channel.sendBlocking(message);

Modified: trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -63,6 +63,7 @@
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionQueueQueryResponseMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionRemoveDestinationMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.SessionSendManagementMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXACommitMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAEndMessage;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAForgetMessage;
@@ -84,6 +85,7 @@
 import org.jboss.messaging.util.SimpleIDGenerator;
 import org.jboss.messaging.util.SimpleString;
 import org.jboss.messaging.util.TokenBucketLimiterImpl;
+import org.jboss.messaging.util.TypedProperties;
 
 /*
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -737,6 +739,16 @@
       // Now we can add a failure listener since if a further failure occurs we cleanup since no backup any more
       remotingConnection.addFailureListener(this);
    }
+   
+   public void sendManagementMessage(final ClientMessage message)
+   {
+      Packet packet = new SessionSendManagementMessage(message);
+      
+      //Fill in the dest - it's not actually used
+      message.setDestination(new SimpleString("JBM"));
+      
+      channel.send(packet);
+   }
 
    // XAResource implementation
    // --------------------------------------------------------------------

Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionSendManagementMessage.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -41,27 +41,19 @@
    
    // Attributes ----------------------------------------------------
 
-   private long producerID;
-   
    private ClientMessage clientMessage;
    
    private ServerMessage serverMessage;
    
-   private boolean requiresResponse;
-
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionSendManagementMessage(final long producerID, final ClientMessage message, final boolean requiresResponse)
+   public SessionSendManagementMessage(final ClientMessage message)
    {
       super(SESS_MANAGEMENT_SEND);
 
-      this.producerID = producerID;
-      
       this.clientMessage = message;
-      
-      this.requiresResponse = requiresResponse;
    }
       
    public SessionSendManagementMessage()
@@ -71,11 +63,6 @@
 
    // Public --------------------------------------------------------
 
-   public long getProducerID()
-   {
-      return producerID;
-   }
-   
    public ClientMessage getClientMessage()
    {
       return clientMessage;
@@ -86,15 +73,8 @@
       return serverMessage;
    }
    
-   public boolean isRequiresResponse()
-   {
-      return requiresResponse;
-   }
-   
    public void encodeBody(final MessagingBuffer buffer)
    {
-      buffer.putLong(producerID);      
-      
       if (clientMessage != null)
       {
          clientMessage.encode(buffer);
@@ -104,23 +84,15 @@
          //If we're replicating a buffer to a backup node then we encode the serverMessage not the clientMessage
          serverMessage.encode(buffer);
       }
-      
-      buffer.putBoolean(requiresResponse);
    }
    
    public void decodeBody(final MessagingBuffer buffer)
-   {
-      //TODO can be optimised
-      
-      producerID = buffer.getLong();
-                  
+   {          
       serverMessage = new ServerMessageImpl();
       
       serverMessage.decode(buffer);
       
       serverMessage.getBody().flip();
-      
-      requiresResponse = buffer.getBoolean();
    }
 
 

Modified: trunk/src/main/org/jboss/messaging/core/server/ServerSession.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerSession.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -23,7 +23,6 @@
 package org.jboss.messaging.core.server;
 
 import org.jboss.messaging.core.remoting.Channel;
-import org.jboss.messaging.core.remoting.DelayedResult;
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.remoting.RemotingConnection;
 import org.jboss.messaging.core.remoting.impl.wireformat.SessionAcknowledgeMessage;
@@ -151,8 +150,4 @@
    int transferConnection(RemotingConnection newConnection, int lastReceivedCommandID);
    
    Channel getChannel();
-   
-   //Should this really be here??
-   void sendResponse(final DelayedResult result, final Packet response);
-
 }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -108,7 +108,6 @@
  * @author <a href="mailto:jmesnil at redhat.com">Jeff Mesnil</a>
  * @author <a href="mailto:andy.taylor at jboss.org>Andy Taylor</a>
  */
-
 public class ServerSessionImpl implements ServerSession, FailureListener, NotificationListener
 {
    // Constants -----------------------------------------------------------------------------
@@ -1987,9 +1986,7 @@
       try
       {
          channel.replicatePacket(packet);
-
-         // set started will unlock
-         
+ 
          //note we process start before response is back from the backup
          setStarted(true);         
       }
@@ -2005,7 +2002,7 @@
    //TODO try removing the lock consumers and see what happens!!
    public void handleStop(final Packet packet)
    {
-      boolean lock = this.channel.getReplicatingChannel() != null;
+      boolean lock = channel.getReplicatingChannel() != null;
       
       if (lock)
       {
@@ -2018,9 +2015,25 @@
 
          //note we process stop before response is back from the backup
          
+         final Packet response = new NullResponseMessage();
+         
          setStarted(false);
 
-         sendResponse(result, new NullResponseMessage());
+         if (result == null)
+         {
+            // Not clustered - just send now
+            channel.send(response);
+         }
+         else
+         {
+            result.setResultRunner(new Runnable()
+            {
+               public void run()
+               {
+                  channel.send(response);
+               }
+            });
+         }         
       }
       finally
       {
@@ -2077,8 +2090,7 @@
       Packet response = null;
 
       try
-      {
-         //note we process close before response is back from the backup
+      {         
          close();
 
          response = new NullResponseMessage();
@@ -2317,11 +2329,24 @@
          channel.send(response);
       }
    }
-
+   
    public void handleSendScheduledProducerMessage(final SessionScheduledSendMessage packet)
-   {
+   {               
       ServerMessage msg = packet.getServerMessage();
-
+      
+      final SendLock lock;
+            
+      if (channel.getReplicatingChannel() != null)
+      {
+         lock = postOffice.getAddressLock(msg.getDestination());
+               
+         lock.beforeSend();
+      }
+      else
+      {
+         lock = null;
+      }
+      
       if (msg.getMessageID() == 0L)
       {
          // must generate message id here, so we know they are in sync on live and backup
@@ -2331,12 +2356,35 @@
       }
 
       DelayedResult result = channel.replicatePacket(packet);
+      
+      //With a send we must make sure it is replicated to backup before being processed on live
+      //or can end up with delivery being processed on backup before original send
+      
+      if (result == null)
+      {
+         doSendScheduled(packet);                        
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doSendScheduled(packet);
+               
+               lock.afterSend();
+            }
+         });
+      }
+   }
 
+   private void doSendScheduled(final SessionScheduledSendMessage packet)
+   {
       Packet response = null;
 
       try
       {
-         producers.get(packet.getProducerID()).sendScheduled(msg, packet.getScheduledDeliveryTime());
+         producers.get(packet.getProducerID()).sendScheduled(packet.getServerMessage(), packet.getScheduledDeliveryTime());
 
          if (packet.isRequiresResponse())
          {
@@ -2359,24 +2407,56 @@
          }
       }
 
-      sendResponse(result, response);
+      if (response != null)
+      {
+         channel.send(response);
+      }
    }
+   
+   public void handleManagementMessage(final SessionSendManagementMessage packet)
+   {        
+      ServerMessage msg = packet.getServerMessage();
+      
+      if (msg.getMessageID() == 0L)
+      {
+         // must generate message id here, so we know they are in sync on live and backup
+         long id = storageManager.generateUniqueID();
 
-   public void handleManagementMessage(final SessionSendManagementMessage packet)
-   {
+         msg.setMessageID(id);
+      }
+      
       DelayedResult result = channel.replicatePacket(packet);
+      
+      //With a send we must make sure it is replicated to backup before being processed on live
+      //or can end up with delivery being processed on backup before original send
+      
+      if (result == null)
+      {
+         doHandleManagementMessage(packet);                        
+      }
+      else
+      {
+         result.setResultRunner(new Runnable()
+         {
+            public void run()
+            {
+               doHandleManagementMessage(packet);
+            }
+         });
+      }
+   }
 
-      Packet response = null;
-
+   public void doHandleManagementMessage(final SessionSendManagementMessage packet)
+   {
       try
       {
-         ServerMessage serverMessage = packet.getServerMessage();
+         ServerMessage message = packet.getServerMessage();
 
-         if (serverMessage.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
+         if (message.containsProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS))
          {
-            boolean subscribe = (Boolean)serverMessage.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
+            boolean subscribe = (Boolean)message.getProperty(ManagementHelper.HDR_JMX_SUBSCRIBE_TO_NOTIFICATIONS);
 
-            final SimpleString replyTo = (SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
+            final SimpleString replyTo = (SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO);
 
             if (subscribe)
             {
@@ -2399,35 +2479,17 @@
          }
          else
          {
-            managementService.handleMessage(serverMessage);
+            managementService.handleMessage(message);
+            
+            message.setDestination((SimpleString)message.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
 
-            serverMessage.setDestination((SimpleString)serverMessage.getProperty(ManagementHelper.HDR_JMX_REPLYTO));
-
-            send(serverMessage);
+            send(message);
          }
-
-         if (packet.isRequiresResponse())
-         {
-            response = new NullResponseMessage();
-         }
       }
       catch (Exception e)
       {
-         log.error("Failed to send management message", e);
-         if (packet.isRequiresResponse())
-         {
-            if (e instanceof MessagingException)
-            {
-               response = new MessagingExceptionMessage((MessagingException)e);
-            }
-            else
-            {
-               response = new MessagingExceptionMessage(new MessagingException(MessagingException.INTERNAL_ERROR));
-            }
-         }
+         log.error("Failed to send management message", e);        
       }
-
-      sendResponse(result, response);
    }
 
    public void handleReplicatedDelivery(final SessionReplicateDeliveryMessage packet)
@@ -2521,7 +2583,6 @@
       catch (Exception e)
       {
          log.warn("problem while sending a notification message " + notification, e);
-
       }
    }
 
@@ -2654,40 +2715,4 @@
       }
    }
 
-   private void sendResponse(final DelayedResult result, final Packet response, final boolean closeChannel)
-   {
-      if (response != null)
-      {
-         if (result == null)
-         {
-            // Not clustered - just send now
-            channel.send(response);
-
-            if (closeChannel)
-            {
-               channel.close();
-            }
-         }
-         else
-         {
-            result.setResultRunner(new Runnable()
-            {
-               public void run()
-               {
-                  channel.send(response);
-
-                  if (closeChannel)
-                  {
-                     channel.close();
-                  }
-               }
-            });
-         }
-      }
-   }
-
-   public void sendResponse(final DelayedResult result, final Packet response)
-   {
-      this.sendResponse(result, response, false);
-   }
 }

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverExpiredMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverExpiredMessageTest.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverExpiredMessageTest.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -59,7 +59,7 @@
  */
 public class FailoverExpiredMessageTest extends TestCase
 {
-   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+   private static final Logger log = Logger.getLogger(FailoverExpiredMessageTest.class);
 
    // Constants -----------------------------------------------------
 

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverManagementTest.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -0,0 +1,311 @@
+/*
+ * JBoss, Home of Professional Open Source Copyright 2005-2008, Red Hat
+ * Middleware LLC, and individual contributors by the @authors tag. See the
+ * copyright.txt in the distribution for a full listing of individual
+ * contributors.
+ * 
+ * This is free software; you can redistribute it and/or modify it under the
+ * terms of the GNU Lesser General Public License as published by the Free
+ * Software Foundation; either version 2.1 of the License, or (at your option)
+ * any later version.
+ * 
+ * This software is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS
+ * FOR A PARTICULAR PURPOSE. See the GNU Lesser General Public License for more
+ * details.
+ * 
+ * You should have received a copy of the GNU Lesser General Public License
+ * along with this software; if not, write to the Free Software Foundation,
+ * Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA, or see the FSF
+ * site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import javax.management.Notification;
+
+import junit.framework.TestCase;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryInternal;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.client.management.impl.ManagementHelper;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.logging.Logger;
+import org.jboss.messaging.core.management.impl.ManagementServiceImpl;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * 
+ * A FailoverManagementTest
+ *
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
+ * 
+ * Created 5 Nov 2008 15:05:14
+ *
+ *
+ */
+public class FailoverManagementTest extends TestCase
+{
+   private static final Logger log = Logger.getLogger(FailoverManagementTest.class);
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   private final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   public void testManagementMessages() throws Exception
+   {            
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+      
+      sf1.setSendWindowSize(32 * 1024);
+  
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+      
+      SimpleString replyTo = new SimpleString("replyto");
+      
+      session1.createQueue(replyTo, new SimpleString("replyto"), null, false, false);
+      
+      ClientProducer producer = session1.createProducer(ADDRESS);
+      
+      final int numMessages = 10;
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg  = session1.createClientMessage(false);
+         
+         msg.getBody().flip();
+         
+         producer.send(msg);
+      }
+      
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage managementMessage  = session1.createClientMessage(false);
+         
+         ManagementHelper.putAttributes(managementMessage,
+                                        replyTo,
+                                        ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
+                                        "MessageCount");
+         
+         managementMessage.getBody().flip();
+         
+         session1.sendManagementMessage(managementMessage);
+      }
+                            
+      ClientConsumer consumer1 = session1.createConsumer(replyTo);
+                 
+      final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+ 
+      conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+      
+      //Send the other half
+      for (int i = 0; i < numMessages / 2; i++)
+      {
+         ClientMessage managementMessage  = session1.createClientMessage(false);
+         
+         ManagementHelper.putAttributes(managementMessage,
+                                        replyTo,
+                                        ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
+                                        "MessageCount");
+         
+         managementMessage.getBody().flip();
+         
+         session1.sendManagementMessage(managementMessage);
+      }
+            
+      session1.start();
+                   
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+         
+         assertNotNull(message);
+                        
+         message.acknowledge();
+         
+         assertTrue(ManagementHelper.isAttributesResult(message));
+         
+         assertEquals(numMessages, message.getProperty(new SimpleString("MessageCount")));
+      }
+      
+      session1.close();
+      
+      //Make sure no more messages
+      ClientSession session2 = sf1.createSession(false, true, true, false);
+      
+      session2.start();
+      
+      ClientConsumer consumer2 = session2.createConsumer(replyTo);
+      
+      ClientMessage message = consumer2.receive(1000);
+      
+      assertNull(message);
+      
+      session2.close();      
+   }
+   
+   public void testManagementMessages2() throws Exception
+   {            
+      ClientSessionFactoryInternal sf1 = new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                                                      new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                                                 backupParams));
+      
+      sf1.setSendWindowSize(32 * 1024);
+  
+      ClientSession session1 = sf1.createSession(false, true, true, false);
+
+      session1.createQueue(ADDRESS, ADDRESS, null, false, false);
+      
+      SimpleString replyTo = new SimpleString("replyto");
+      
+      session1.createQueue(replyTo, new SimpleString("replyto"), null, false, false);
+      
+      ClientProducer producer = session1.createProducer(ADDRESS);
+      
+      final int numMessages = 10;
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage msg  = session1.createClientMessage(false);
+         
+         msg.getBody().flip();
+         
+         producer.send(msg);
+      }
+      
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage managementMessage  = session1.createClientMessage(false);
+         
+         ManagementHelper.putAttributes(managementMessage,
+                                        replyTo,
+                                        ManagementServiceImpl.getQueueObjectName(ADDRESS, ADDRESS),
+                                        "MessageCount");
+         
+         managementMessage.getBody().flip();
+         
+         session1.sendManagementMessage(managementMessage);
+      }
+                            
+      ClientConsumer consumer1 = session1.createConsumer(replyTo);
+                       
+                      
+      session1.start();
+                   
+      for (int i = 0; i < numMessages; i++)
+      {
+         ClientMessage message = consumer1.receive(1000);
+         
+         assertNotNull(message);
+         
+         if (i == 0)
+         {
+            //Fail after receipt but before ack
+            final RemotingConnection conn1 = ((ClientSessionImpl)session1).getConnection();
+            
+            conn1.fail(new MessagingException(MessagingException.NOT_CONNECTED));
+         }
+                        
+         message.acknowledge();
+         
+         assertTrue(ManagementHelper.isAttributesResult(message));
+         
+         assertEquals(numMessages, message.getProperty(new SimpleString("MessageCount")));
+      }
+      
+      session1.close();
+      
+      //Make sure no more messages
+      ClientSession session2 = sf1.createSession(false, true, true, false);
+      
+      session2.start();
+      
+      ClientConsumer consumer2 = session2.createConsumer(replyTo);
+      
+      ClientMessage message = consumer2.receive(1000);
+      
+      assertNull(message);
+      
+      session2.close();      
+   }
+   
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      Configuration backupConf = new ConfigurationImpl();
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory",
+                                                backupParams));
+      backupConf.setBackup(true);
+      backupService = MessagingServiceImpl.newNullStorageMessagingServer(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory"));
+      liveConf.setBackupConnectorConfiguration(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                          backupParams));
+      liveService = MessagingServiceImpl.newNullStorageMessagingServer(liveConf);
+      liveService.start();
+   }
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+}
+
+

Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java	2008-11-05 17:24:25 UTC (rev 5280)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/FailoverScheduledMessageTest.java	2008-11-05 19:02:14 UTC (rev 5281)
@@ -58,7 +58,7 @@
  */
 public class FailoverScheduledMessageTest extends TestCase
 {
-   private static final Logger log = Logger.getLogger(SimpleAutomaticFailoverTest.class);
+   private static final Logger log = Logger.getLogger(FailoverScheduledMessageTest.class);
 
    // Constants -----------------------------------------------------
 




More information about the jboss-cvs-commits mailing list