[jboss-cvs] JBoss Messaging SVN: r5514 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.

jboss-cvs-commits at lists.jboss.org jboss-cvs-commits at lists.jboss.org
Wed Dec 10 20:25:42 EST 2008


Author: clebert.suconic at jboss.com
Date: 2008-12-10 20:25:42 -0500 (Wed, 10 Dec 2008)
New Revision: 5514

Added:
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
   trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
Modified:
   trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
   trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Failover on LargeMessages & adding a failing test

Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -54,9 +54,11 @@
    private final JournalStorageManager storageManager;
 
    // We should only use the NIO implementation on the Journal
-   private volatile SequentialFile file;
+   private SequentialFile file;
 
-   private volatile boolean complete = false;
+   private boolean complete = false;
+   
+   private long bodySize = -1;
 
    // Static --------------------------------------------------------
 
@@ -84,15 +86,17 @@
       file.position(file.size());
 
       file.write(ByteBuffer.wrap(bytes), false);
+      
+      bodySize += bytes.length;
    }
 
    @Override
    public synchronized void encodeBody(final MessagingBuffer bufferOut, final long start, final int size)
    {
-      validateFile();
-
       try
       {
+         validateFile();
+
          // This could maybe be optimized (maybe reading directly into bufferOut)
          ByteBuffer bufferRead = ByteBuffer.allocate(size);
          if (!file.isOpen())
@@ -122,22 +126,16 @@
    @Override
    public synchronized int getBodySize()
    {
-      validateFile();
-
       try
       {
-         if (!file.isOpen())
-         {
-            file.open();
-         }
-
-         return (int)file.size();
+         validateFile();
       }
-
       catch (Exception e)
       {
-         throw new RuntimeException("Can't get the file size on " + file.getFileName());
+         throw new RuntimeException(e.getMessage(), e);
       }
+      // FIXME: The file could be bigger than MAX_INT
+      return (int)bodySize;
    }
 
    @Override
@@ -213,7 +211,7 @@
 
    public synchronized void releaseResources()
    {
-      if (file.isOpen())
+      if (file != null && file.isOpen())
       {
          try
          {
@@ -232,7 +230,7 @@
 
    // Private -------------------------------------------------------
 
-   private void validateFile()
+   private synchronized void validateFile() throws Exception
    {
       if (file == null)
       {
@@ -242,6 +240,10 @@
          }
 
          file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+         
+         file.open();
+         
+         bodySize = file.size();
 
       }
    }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -143,7 +143,15 @@
    {
       synchronized (scheduledRunnables)
       {
-         return scheduledRunnables.remove(id).getReference();
+         ScheduledDeliveryRunnable runnable = scheduledRunnables.remove(id);
+         if (runnable == null)
+         {
+            return null;
+         }
+         else
+         {
+            return runnable.getReference();
+         }
       }
    }
 

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -340,13 +340,16 @@
 
          if (ref == null)
          {
-            throw new IllegalStateException("Could not find reference with id " + messageID +
+            throw new IllegalStateException("Could not find reference on consumerID=" + id +
+                                            ", messageId " +
+                                            messageID +
                                             " backup " +
                                             messageQueue.isBackup() +
                                             " closed " +
                                             closed);
          }
 
+
          if (autoCommitAcks)
          {
             doAck(ref);
@@ -533,21 +536,35 @@
          // TODO: get rid of the instanceof by something like message.isLargeMessage()
          if (message instanceof ServerLargeMessage)
          {
-            // TODO: How to inform the backup node about the LargeMessage being sent?
-            largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
+            DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+                                                                                               message.getMessageID()));
 
-            largeMessageSender.sendLargeMessage();
+            if (result == null)
+            {
+               sendLargeMessage(ref, message);
+            }
+            else
+            {
+               // Send when replicate delivery response comes back
+               result.setResultRunner(new Runnable()
+               {
+                  public void run()
+                  {
+                     sendLargeMessage(ref, message);
+                  }
+               });
+            }
+
          }
          else
          {
             sendStandardMessage(ref, message);
-
             if (preAcknowledge)
             {
                doAck(ref);
             }
+
          }
-
          return HandleStatus.HANDLED;
       }
       finally
@@ -556,6 +573,13 @@
       }
    }
 
+   private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
+   {
+      largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
+
+      largeMessageSender.sendLargeMessage();
+   }
+
    /**
     * @param ref
     * @param message
@@ -713,6 +737,8 @@
                }
                catch (Exception e)
                {
+                  // Is there anything we could do here besides logging?
+                  // The message was already sent, and this shouldn't happen
                   log.warn("Error while ACKing reference " + ref, e);
                }
             }

Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -2156,8 +2156,6 @@
    public void handleSendLargeMessage(final SessionSendMessage packet)
    {
 
-      DelayedResult result = channel.replicatePacket(packet);
-
       if (packet.getMessageID() <= 0L)
       {
          // must generate message id here, so we know they are in sync on live and backup
@@ -2166,6 +2164,8 @@
          packet.setMessageID(id);
       }
 
+      
+      DelayedResult result = channel.replicatePacket(packet);
 
       // With a send we must make sure it is replicated to backup before being processed on live
       // or can end up with delivery being processed on backup before original send

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * A FailoverTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 8, 2008 6:59:53 PM
+ *
+ *
+ */
+public class FailoverTestBase extends ServiceTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+   private MessagingService liveService;
+
+   private MessagingService backupService;
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+   protected ClientSessionFactory createFailoverFactory()
+   {
+      return new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+                                          new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+                                                                     backupParams));
+   }
+
+   protected void setUpFileBased() throws Exception
+   {
+
+      deleteDirectory(new File(getTestDir()));
+
+      Configuration backupConf = new ConfigurationImpl();
+
+      backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+      backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+      backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+      backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+      backupConf.setJournalFileSize(100 * 1024);
+
+      backupConf.setPagingMaxGlobalSizeBytes(30 * 1024);
+      backupConf.setPagingDefaultSize(10 * 1024);
+
+      backupConf.setSecurityEnabled(false);
+      backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+      backupConf.getAcceptorConfigurations()
+                .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+      backupConf.setBackup(true);
+
+      clearData(getTestDir() + "/backup");
+
+      backupService = MessagingServiceImpl.newMessagingService(backupConf);
+      backupService.start();
+
+      Configuration liveConf = new ConfigurationImpl();
+
+      liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+      liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+      liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+      liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+      liveConf.setPagingMaxGlobalSizeBytes(30 * 1024);
+      liveConf.setPagingDefaultSize(10 * 1024);
+      liveConf.setJournalFileSize(100 * 1024);
+
+      liveConf.setSecurityEnabled(false);
+      liveConf.getAcceptorConfigurations()
+              .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+      Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+      TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+                                                                   backupParams,
+                                                                   "backup-connector");
+      connectors.put(backupTC.getName(), backupTC);
+      liveConf.setConnectorConfigurations(connectors);
+      liveConf.setBackupConnectorName(backupTC.getName());
+      liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+      clearData(getTestDir() + "/live");
+
+      liveService.start();
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void tearDown() throws Exception
+   {
+      assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+      backupService.stop();
+
+      assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+      liveService.stop();
+
+      assertEquals(0, InVMRegistry.instance.size());
+      
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}

Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java	                        (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java	2008-12-11 01:25:42 UTC (rev 5514)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LargeMessageFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ * 
+ * Created Dec 8, 2008 7:09:38 PM
+ *
+ *
+ */
+public class JustReplicationTest extends FailoverTestBase
+{
+
+   // Constants -----------------------------------------------------
+
+   // Attributes ----------------------------------------------------
+
+   private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+   // Static --------------------------------------------------------
+
+   // Constructors --------------------------------------------------
+
+   // Public --------------------------------------------------------
+
+
+   public void testJustReplication() throws Exception
+   {
+      ClientSessionFactory factory = createFailoverFactory();
+      factory.setBlockOnAcknowledge(true);
+      factory.setBlockOnNonPersistentSend(true);
+      factory.setBlockOnPersistentSend(true);
+
+      ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+      final int numberOfMessages = 200;
+
+      final int numberOfBytes = 1200;
+      
+      try
+      {
+
+         session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+         ClientProducer producer = session.createProducer(ADDRESS);
+
+         // he remotingConnection could be used to force a failure
+         // final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = session.createClientMessage(true);
+
+            ByteBuffer buffer = ByteBuffer.allocate(numberOfBytes);
+
+            buffer.putInt(i);
+
+            buffer.rewind();
+
+            message.setBody(new ByteBufferWrapper(buffer));
+
+            producer.send(message);
+
+         }
+
+         ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+         session.start();
+
+         for (int i = 0; i < numberOfMessages; i++)
+         {
+            ClientMessage message = consumer.receive(5000);
+
+            assertNotNull(message);
+
+            message.acknowledge();
+
+            MessagingBuffer buffer = message.getBody();
+
+            buffer.rewind();
+
+            assertEquals(numberOfBytes, buffer.limit());
+
+            assertEquals(i, buffer.getInt());
+         }
+
+         assertNull(consumer.receive(500));
+      }
+      finally
+      {
+         try
+         {
+            session.close();
+         }
+         catch (Exception ignored)
+         {
+         }
+      }
+
+   }
+
+   // Package protected ---------------------------------------------
+
+   // Protected -----------------------------------------------------
+
+   @Override
+   protected void setUp() throws Exception
+   {
+      setUpFileBased();
+   }
+
+   protected void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   // Private -------------------------------------------------------
+
+   // Inner classes -------------------------------------------------
+
+}




More information about the jboss-cvs-commits mailing list