[jboss-cvs] JBoss Messaging SVN: r5608 - in branches/Branch_Failover_Page: src/main/org/jboss/messaging/core/paging/impl and 5 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Fri Jan 9 01:24:32 EST 2009


Author: clebert.suconic at jboss.com
Date: 2009-01-09 01:24:32 -0500 (Fri, 09 Jan 2009)
New Revision: 5608

Modified:
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
   branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
Log:
Implementing backupMode on PagingManager

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingManager.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -58,6 +58,14 @@
  */
 public interface PagingManager extends MessagingComponent
 {
+   void setBackup();
+   
+   
+   void activate();
+   
+   
+   boolean isBackup();
+   
    /** The system is paging because of global-page-mode */
    boolean isGlobalPageMode();
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/PagingStore.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -62,6 +62,8 @@
    void sync() throws Exception;
 
    boolean page(PagedMessage message, boolean sync, boolean duplicateDetection) throws Exception;
+   
+   public void readPage() throws Exception;
 
    /**
     * 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingManagerImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -59,6 +59,8 @@
    private volatile boolean started = false;
 
    private final long maxGlobalSize;
+   
+   private volatile boolean backup;
 
    private final AtomicLong globalSize = new AtomicLong(0);
 
@@ -107,6 +109,25 @@
    // PagingManager implementation
    // -----------------------------------------------------------------------------------------------------
 
+   public void setBackup()
+   {
+      this.backup = true;
+   }
+   
+   
+   public void activate()
+   {
+      this.backup = false;
+      
+      startGlobalDepage();
+   }
+   
+   
+   public boolean isBackup()
+   {
+      return this.backup;
+   }
+   
    public boolean isGlobalPageMode()
    {
       return globalMode.get();
@@ -285,10 +306,13 @@
 
    public synchronized void startGlobalDepage()
    {
-      setGlobalPageMode(true);
-      for (PagingStore store : stores.values())
+      if (!isBackup())
       {
-         store.startDepaging(pagingStoreFactory.getGlobalDepagerExecutor());
+         setGlobalPageMode(true);
+         for (PagingStore store : stores.values())
+         {
+            store.startDepaging(pagingStoreFactory.getGlobalDepagerExecutor());
+         }
       }
    }
 

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/paging/impl/PagingStoreImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -368,7 +368,7 @@
             ServerMessage msg = message.getMessage(storageManager);
 
             // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
-            
+
             byte[] bytes = new byte[8];
 
             ByteBuffer buff = ByteBuffer.wrap(bytes);
@@ -456,6 +456,11 @@
 
    public boolean startDepaging(final Executor executor)
    {
+      if (!pagingManager.isBackup())
+      {
+         return false;
+      }
+
       currentPageLock.readLock().lock();
       try
       {
@@ -614,7 +619,7 @@
       }
 
       System.out.println("Paging " + this.getStoreName());
-      
+
       // if the first check failed, we do it again under a global currentPageLock
       // (writeLock) this time
       writeLock.lock();
@@ -638,6 +643,30 @@
       }
    }
 
+   /**
+    * Depage one page-file, read it and send it to the pagingManager / postoffice
+    * @return
+    * @throws Exception
+    */
+   public void readPage() throws Exception
+   {
+      Page page = depage();
+
+      if (page == null)
+      {
+         return;
+      }
+
+      page.open();
+
+      List<PagedMessage> messages = page.read();
+
+      onDepage(page.getPageId(), storeName, messages);
+
+      page.delete();
+
+   }
+
    // TestSupportPageStore ------------------------------------------
 
    public void forceAnotherPage() throws Exception
@@ -648,6 +677,7 @@
    /** 
     *  It returns a Page out of the Page System without reading it. 
     *  The method calling this method will remove the page and will start reading it outside of any locks.
+    *  This method could also replace the current file by a new file, and that process is done through acquiring a writeLock on currentPageLock
     *   
     *  Observation: This method is used internally as part of the regular depage process, but externally is used only on tests, 
     *               and that's why this method is part of the Testable Interface 
@@ -726,7 +756,7 @@
 
    // Protected -----------------------------------------------------
 
-   // In order to test failures, we need to be able to extend this class 
+   // In order to test failures, we need to be able to extend this class
    // and replace the Page for another Page that will fail before the file is removed
    // That's why createPage is not a private method
    protected Page createPage(final int page) throws Exception
@@ -756,7 +786,6 @@
       return new PageImpl(fileFactory, file, page);
    }
 
-
    // Private -------------------------------------------------------
 
    /**
@@ -949,30 +978,6 @@
       return Integer.parseInt(fileName.substring(0, fileName.indexOf('.')));
    }
 
-   /**
-    * Depage one page-file, read it and send it to the pagingManager / postoffice
-    * @return
-    * @throws Exception
-    */
-   private void readPage() throws Exception
-   {
-      Page page = depage();
-
-      if (page == null)
-      {
-         return;
-      }
-
-      page.open();
-
-      List<PagedMessage> messages = page.read();
-
-      onDepage(page.getPageId(), storeName, messages);
-
-      page.delete();
-
-   }
-
    // Inner classes -------------------------------------------------
 
    private class DepageRunnable implements Runnable

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -466,6 +466,8 @@
             }
          }
       }
+      
+      pagingManager.activate();
 
       return queues;
    }
@@ -627,9 +629,17 @@
          }
       }
 
-      // This is necessary as if the server was previously stopped while a depage was being executed,
-      // it needs to resume the depage process on those destinations
-      pagingManager.startGlobalDepage();
+      
+      if (backup)
+      {
+         pagingManager.setBackup();
+      }
+      else
+      {
+         // This is necessary as if the server was previously stopped while a depage was being executed,
+         // it needs to resume the depage process on those destinations
+         pagingManager.startGlobalDepage();
+      }
    }
 
    private class MessageExpiryRunner extends Thread

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/remoting/impl/wireformat/SessionReplicateDeliveryMessage.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -13,6 +13,8 @@
 package org.jboss.messaging.core.remoting.impl.wireformat;
 
 import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.DataConstants;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a>
@@ -28,17 +30,21 @@
 
    private long messageID;
 
+   private SimpleString address;
+
    // Static --------------------------------------------------------
 
    // Constructors --------------------------------------------------
 
-   public SessionReplicateDeliveryMessage(final long consumerID, final long messageID)
+   public SessionReplicateDeliveryMessage(final long consumerID, final long messageID, final SimpleString address)
    {
       super(SESS_REPLICATE_DELIVERY);
 
       this.consumerID = consumerID;
 
       this.messageID = messageID;
+
+      this.address = address;
    }
 
    public SessionReplicateDeliveryMessage()
@@ -58,11 +64,18 @@
       return messageID;
    }
 
+   public SimpleString getAddress()
+   {
+      return address;
+   }
+
    public void encodeBody(final MessagingBuffer buffer)
    {
       buffer.putLong(consumerID);
 
       buffer.putLong(messageID);
+
+      buffer.putSimpleString(address);
    }
 
    public void decodeBody(final MessagingBuffer buffer)
@@ -70,13 +83,20 @@
       consumerID = buffer.getLong();
 
       messageID = buffer.getLong();
+
+      address = buffer.getSimpleString();
    }
-   
+
    public boolean isRequiresConfirmations()
-   {      
+   {
       return false;
    }
 
+   public int getRequiredBufferSize()
+   {
+      return BASIC_PACKET_SIZE + DataConstants.SIZE_LONG + DataConstants.SIZE_LONG + SimpleString.sizeofString(address);
+   }
+
    public boolean equals(Object other)
    {
       if (other instanceof SessionReplicateDeliveryMessage == false)
@@ -88,7 +108,7 @@
 
       return super.equals(other) && this.consumerID == r.consumerID && this.messageID == r.messageID;
    }
-   
+
    // Package protected ---------------------------------------------
 
    // Protected -----------------------------------------------------

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/Queue.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -106,12 +106,12 @@
 
    
    /**
-    * Remove the reference or add it on the DuplicateIDCache case not found
-    * @param id
+    * Remove the reference or backup nodes.
+    * This may lead to depage operations
     * @return
     * @throws Exception
     */
-   MessageReference removeOrCacheReference(long id) throws Exception;
+   MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception;
 
    /** Remove message from queue, add it to the scheduled delivery list without affect reference counting */
    void rescheduleDelivery(long id, long scheduledDeliveryTime);

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/ServerConsumer.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -26,6 +26,7 @@
 
 import org.jboss.messaging.core.remoting.Packet;
 import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * 
@@ -56,7 +57,7 @@
 	
 	void failedOver();
 	
-	void deliverReplicated(final long messageID) throws Exception;
+	void deliverReplicated(SimpleString address, long messageID) throws Exception;
 	
 	void lock();
 	

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -12,7 +12,6 @@
 
 package org.jboss.messaging.core.server.impl;
 
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.LinkedList;
@@ -67,6 +66,8 @@
    private static final boolean trace = log.isTraceEnabled();
 
    public static final int NUM_PRIORITIES = 10;
+   
+   private static final int MAX_NUMBER_OF_DEPAGES_ON_BACKUP = 10;
 
    private volatile long persistenceID = -1;
 
@@ -90,11 +91,6 @@
 
    private final DuplicateIDCache duplicateIDCache;
 
-   /** 
-    * We need to Lock the duplicateIDCache
-    * */
-   private final Lock duplicateIDLock = new ReentrantLock();
-
    private boolean direct;
 
    private boolean promptDelivery;
@@ -170,8 +166,6 @@
 
       boolean durableRef = message.isDurable() && durable;
 
-      boolean startLock = false;
-
       try
       {
 
@@ -199,15 +193,6 @@
                startedTx = true;
 
             }
-
-            if (isBackup())
-            {
-               // We need to lock access to route when dealing with backupNodes
-               // Paging will add IDs for records not found during replicateACK,
-               // and that needs to be atomic between route and addID
-               duplicateIDLock.lock();
-               startLock = true;
-            }
          }
 
          // There is no way to cache the Store, since a Queue may belong to multiple addresses,
@@ -319,11 +304,6 @@
             // some exception happened during routing, the tx needs to be rolled back
             tx.rollback();
          }
-
-         if (startLock)
-         {
-            duplicateIDLock.unlock();
-         }
       }
 
       return true;
@@ -440,59 +420,38 @@
       }
    }
 
-   public MessageReference removeOrCacheReference(long id) throws Exception
+   public MessageReference removeReferenceOnBackup(SimpleString address, long id) throws Exception
    {
+      if (!isBackup())
+      {
+         throw new IllegalStateException("removeReferenceOnBackup method could only be called on backup nodes");
+      }
+      
       // most of the times, the remove will work ok, so we first try it without any locks
       MessageReference ref = removeReferenceWithID(id, false);
       
+      PagingStore store = pagingManager.getPageStore(address);
+      
       if (ref == null)
       {
-         // This is a temporary hack, for the temporary branch only
-         for (int i = 0; i < 10; i++)
+         for (int i = 0; i < MAX_NUMBER_OF_DEPAGES_ON_BACKUP; i++)
          {
-            System.out.println("Retry " + i);
-            Thread.sleep(100);
-            
-            ref = removeReferenceWithID(id, false);
-            
-            if (ref != null)
+            // Can't have the same store being depaged in more than one thread
+            synchronized (store)
             {
-               System.out.println("Finally found it:");
-               break;
+               // as soon as it gets the lock, it needs to verify if another thread couldn't find the reference
+               ref = removeReferenceWithID(id, false);
+               if (ref == null)
+               {
+                  // force a depage
+                  store.readPage();
+               }
+               else
+               {
+                  break;
+               }
             }
          }
-//         duplicateIDLock.lock();
-//         try
-//         {
-//            // we need to do it again under a lock.
-//            // Depage could still be routing the Message, so we need to lock the duplicateIDLock
-//            // to avoid a race with route
-//
-//            ref = removeReferenceWithID(id, false);
-//
-//            if (ref == null)
-//            {
-//               
-//               System.out.println("Didn't find the reference " + id + " on backup. Storing it!");
-//
-//               
-//               // FIXME: This code is duplicated on QueueImpl::removeOrCacheReference
-//
-//               byte[] bytes = new byte[8];
-//
-//               ByteBuffer buff = ByteBuffer.wrap(bytes);
-//
-//               buff.putLong(id);
-//
-//               SimpleString duplID = new SimpleString(bytes);
-//
-//               duplicateIDCache.addToCache(duplID);
-//            }
-//         }
-//         finally
-//         {
-//            duplicateIDLock.unlock();
-//         }
       }
 
       return ref;

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -56,6 +56,7 @@
 import org.jboss.messaging.core.settings.impl.QueueSettings;
 import org.jboss.messaging.core.transaction.Transaction;
 import org.jboss.messaging.core.transaction.impl.TransactionImpl;
+import org.jboss.messaging.util.SimpleString;
 
 /**
  * Concrete implementation of a ClientConsumer.
@@ -248,16 +249,16 @@
       Iterator<MessageReference> iter = refs.iterator();
 
       closed = true;
-      
+
       Transaction tx = new TransactionImpl(storageManager);
 
       while (iter.hasNext())
       {
          MessageReference ref = iter.next();
-         
-         ref.cancel(tx, storageManager, postOffice, queueSettingsRepository);         
+
+         ref.cancel(tx, storageManager, postOffice, queueSettingsRepository);
       }
-      
+
       tx.rollback();
    }
 
@@ -402,26 +403,27 @@
       return ref;
    }
 
-   public void deliverReplicated(final long messageID) throws Exception
+   public void deliverReplicated(final SimpleString address, final long messageID) throws Exception
    {
       // It may not be the first in the queue - since there may be multiple producers
       // sending to the queue
-      MessageReference ref = messageQueue.removeOrCacheReference(messageID);
+      MessageReference ref = messageQueue.removeReferenceOnBackup(address, messageID);
 
-      if (ref != null)
+      if (ref == null)
       {
-         // We call doHandle rather than handle, since we don't want to check available credits
-         // This is because delivery and receive credits can be processed in different order on live
-         // and backup, and otherwise we could have a situation where the delivery is replicated
-         // but the credits haven't arrived yet, so the delivery gets rejected on backup
-         HandleStatus handled = doHandle(ref);
+         throw new IllegalStateException("Cannot find ref when replicating delivery " + messageID);
+      }
 
-         if (handled != HandleStatus.HANDLED)
-         {
-            throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
-         }
+      // We call doHandle rather than handle, since we don't want to check available credits
+      // This is because delivery and receive credits can be processed in different order on live
+      // and backup, and otherwise we could have a situation where the delivery is replicated
+      // but the credits haven't arrived yet, so the delivery gets rejected on backup
+      HandleStatus handled = doHandle(ref);
+
+      if (handled != HandleStatus.HANDLED)
+      {
+         throw new IllegalStateException("Reference was not handled " + ref + " " + handled);
       }
-
    }
 
    public void failedOver()
@@ -529,23 +531,24 @@
             {
                deliveringRefs.add(ref);
             }
-            
+
             ref.getQueue().referenceHandled();
          }
-         
+
          if (preAcknowledge)
          {
-            //With pre-ack, we ack *before* sending to the client
+            // With pre-ack, we ack *before* sending to the client
             doAck(ref);
          }
-                  
+
          // TODO: get rid of the instanceof by something like message.isLargeMessage()
          if (message instanceof LargeServerMessage)
-         {            
-            //FIXME - please put the replication logic in the sendLargeMessage method
-            
+         {
+            // FIXME - please put the replication logic in the sendLargeMessage method
+
             DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
-                                                                                               message.getMessageID()));
+                                                                                               message.getMessageID(),
+                                                                                               message.getDestination()));
 
             if (result == null)
             {
@@ -568,7 +571,7 @@
          {
             sendStandardMessage(ref, message);
          }
-         
+
          return HandleStatus.HANDLED;
       }
       finally
@@ -597,7 +600,9 @@
 
       final SessionReceiveMessage packet = new SessionReceiveMessage(id, message, ref.getDeliveryCount() + 1);
 
-      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id, message.getMessageID()));
+      DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+                                                                                         message.getMessageID(),
+                                                                                         message.getDestination()));
 
       if (result == null)
       {

Modified: branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -1140,7 +1140,7 @@
 
       try
       {
-         consumer.deliverReplicated(packet.getMessageID());
+         consumer.deliverReplicated(packet.getAddress(), packet.getMessageID());
       }
       catch (Exception e)
       {

Modified: branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java
===================================================================
--- branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-09 03:57:13 UTC (rev 5607)
+++ branches/Branch_Failover_Page/tests/src/org/jboss/messaging/tests/integration/cluster/failover/PagingFailoverTest.java	2009-01-09 06:24:32 UTC (rev 5608)
@@ -81,7 +81,7 @@
 
          ClientProducer producer = session.createProducer(ADDRESS);
 
-         final int numMessages = 5000;
+         final int numMessages = 50000;
          
          PagingManager pmLive = liveService.getServer().getPostOffice().getPagingManager();
          PagingStore storeLive = pmLive.getPageStore(ADDRESS);
@@ -119,7 +119,6 @@
             
             if (i == numMessages / 2)
             {
-//               assertEquals("GlobalSize", pmLive.getGlobalSize(), pmBackup.getGlobalSize());
                System.out.println("Failing");
                conn.fail(new MessagingException(MessagingException.NOT_CONNECTED));
             }




More information about the jboss-cvs-commits mailing list