[jboss-cvs] JBoss Messaging SVN: r5514 - in trunk: src/main/org/jboss/messaging/core/server/impl and 1 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Wed Dec 10 20:25:42 EST 2008
Author: clebert.suconic at jboss.com
Date: 2008-12-10 20:25:42 -0500 (Wed, 10 Dec 2008)
New Revision: 5514
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
Failover on LargeMessages & adding a failing test
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeMessageImpl.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -54,9 +54,11 @@
private final JournalStorageManager storageManager;
// We should only use the NIO implementation on the Journal
- private volatile SequentialFile file;
+ private SequentialFile file;
- private volatile boolean complete = false;
+ private boolean complete = false;
+
+ private long bodySize = -1;
// Static --------------------------------------------------------
@@ -84,15 +86,17 @@
file.position(file.size());
file.write(ByteBuffer.wrap(bytes), false);
+
+ bodySize += bytes.length;
}
@Override
public synchronized void encodeBody(final MessagingBuffer bufferOut, final long start, final int size)
{
- validateFile();
-
try
{
+ validateFile();
+
// This could maybe be optimized (maybe reading directly into bufferOut)
ByteBuffer bufferRead = ByteBuffer.allocate(size);
if (!file.isOpen())
@@ -122,22 +126,16 @@
@Override
public synchronized int getBodySize()
{
- validateFile();
-
try
{
- if (!file.isOpen())
- {
- file.open();
- }
-
- return (int)file.size();
+ validateFile();
}
-
catch (Exception e)
{
- throw new RuntimeException("Can't get the file size on " + file.getFileName());
+ throw new RuntimeException(e.getMessage(), e);
}
+ // FIXME: The file could be bigger than MAX_INT
+ return (int)bodySize;
}
@Override
@@ -213,7 +211,7 @@
public synchronized void releaseResources()
{
- if (file.isOpen())
+ if (file != null && file.isOpen())
{
try
{
@@ -232,7 +230,7 @@
// Private -------------------------------------------------------
- private void validateFile()
+ private synchronized void validateFile() throws Exception
{
if (file == null)
{
@@ -242,6 +240,10 @@
}
file = storageManager.createFileForLargeMessage(getMessageID(), complete);
+
+ file.open();
+
+ bodySize = file.size();
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ScheduledDeliveryHandlerImpl.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -143,7 +143,15 @@
{
synchronized (scheduledRunnables)
{
- return scheduledRunnables.remove(id).getReference();
+ ScheduledDeliveryRunnable runnable = scheduledRunnables.remove(id);
+ if (runnable == null)
+ {
+ return null;
+ }
+ else
+ {
+ return runnable.getReference();
+ }
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -340,13 +340,16 @@
if (ref == null)
{
- throw new IllegalStateException("Could not find reference with id " + messageID +
+ throw new IllegalStateException("Could not find reference on consumerID=" + id +
+ ", messageId " +
+ messageID +
" backup " +
messageQueue.isBackup() +
" closed " +
closed);
}
+
if (autoCommitAcks)
{
doAck(ref);
@@ -533,21 +536,35 @@
// TODO: get rid of the instanceof by something like message.isLargeMessage()
if (message instanceof ServerLargeMessage)
{
- // TODO: How to inform the backup node about the LargeMessage being sent?
- largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
+ DelayedResult result = channel.replicatePacket(new SessionReplicateDeliveryMessage(id,
+ message.getMessageID()));
- largeMessageSender.sendLargeMessage();
+ if (result == null)
+ {
+ sendLargeMessage(ref, message);
+ }
+ else
+ {
+ // Send when replicate delivery response comes back
+ result.setResultRunner(new Runnable()
+ {
+ public void run()
+ {
+ sendLargeMessage(ref, message);
+ }
+ });
+ }
+
}
else
{
sendStandardMessage(ref, message);
-
if (preAcknowledge)
{
doAck(ref);
}
+
}
-
return HandleStatus.HANDLED;
}
finally
@@ -556,6 +573,13 @@
}
}
+ private void sendLargeMessage(final MessageReference ref, final ServerMessage message)
+ {
+ largeMessageSender = new LargeMessageSender((ServerLargeMessage)message, ref);
+
+ largeMessageSender.sendLargeMessage();
+ }
+
/**
* @param ref
* @param message
@@ -713,6 +737,8 @@
}
catch (Exception e)
{
+ // Is there anything we could do here besides logging?
+ // The message was already sent, and this shouldn't happen
log.warn("Error while ACKing reference " + ref, e);
}
}
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-11 01:07:14 UTC (rev 5513)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -2156,8 +2156,6 @@
public void handleSendLargeMessage(final SessionSendMessage packet)
{
- DelayedResult result = channel.replicatePacket(packet);
-
if (packet.getMessageID() <= 0L)
{
// must generate message id here, so we know they are in sync on live and backup
@@ -2166,6 +2164,8 @@
packet.setMessageID(id);
}
+
+ DelayedResult result = channel.replicatePacket(packet);
// With a send we must make sure it is replicated to backup before being processed on live
// or can end up with delivery being processed on backup before original send
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/FailoverTestBase.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionFactoryImpl;
+import org.jboss.messaging.core.config.Configuration;
+import org.jboss.messaging.core.config.TransportConfiguration;
+import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
+import org.jboss.messaging.core.remoting.impl.invm.InVMRegistry;
+import org.jboss.messaging.core.remoting.impl.invm.TransportConstants;
+import org.jboss.messaging.core.server.MessagingService;
+import org.jboss.messaging.core.server.impl.MessagingServiceImpl;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+
+/**
+ * A FailoverTestBase
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 6:59:53 PM
+ *
+ *
+ */
+public class FailoverTestBase extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ protected final Map<String, Object> backupParams = new HashMap<String, Object>();
+
+ private MessagingService liveService;
+
+ private MessagingService backupService;
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ protected ClientSessionFactory createFailoverFactory()
+ {
+ return new ClientSessionFactoryImpl(new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory"),
+ new TransportConfiguration("org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory",
+ backupParams));
+ }
+
+ protected void setUpFileBased() throws Exception
+ {
+
+ deleteDirectory(new File(getTestDir()));
+
+ Configuration backupConf = new ConfigurationImpl();
+
+ backupConf.setJournalDirectory(getJournalDir(getTestDir() + "/backup"));
+ backupConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/backup"));
+ backupConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/backup"));
+ backupConf.setPagingDirectory(getPageDir(getTestDir() + "/backup"));
+ backupConf.setJournalFileSize(100 * 1024);
+
+ backupConf.setPagingMaxGlobalSizeBytes(30 * 1024);
+ backupConf.setPagingDefaultSize(10 * 1024);
+
+ backupConf.setSecurityEnabled(false);
+ backupParams.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+
+ backupConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName(), backupParams));
+ backupConf.setBackup(true);
+
+ clearData(getTestDir() + "/backup");
+
+ backupService = MessagingServiceImpl.newMessagingService(backupConf);
+ backupService.start();
+
+ Configuration liveConf = new ConfigurationImpl();
+
+ liveConf.setJournalDirectory(getJournalDir(getTestDir() + "/live"));
+ liveConf.setLargeMessagesDirectory(getLargeMessagesDir(getTestDir() + "/live"));
+ liveConf.setBindingsDirectory(getBindingsDir(getTestDir() + "/live"));
+ liveConf.setPagingDirectory(getPageDir(getTestDir() + "/live"));
+
+ liveConf.setPagingMaxGlobalSizeBytes(30 * 1024);
+ liveConf.setPagingDefaultSize(10 * 1024);
+ liveConf.setJournalFileSize(100 * 1024);
+
+ liveConf.setSecurityEnabled(false);
+ liveConf.getAcceptorConfigurations()
+ .add(new TransportConfiguration(InVMAcceptorFactory.class.getCanonicalName()));
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+
+ TransportConfiguration backupTC = new TransportConfiguration(INVM_CONNECTOR_FACTORY,
+ backupParams,
+ "backup-connector");
+ connectors.put(backupTC.getName(), backupTC);
+ liveConf.setConnectorConfigurations(connectors);
+ liveConf.setBackupConnectorName(backupTC.getName());
+ liveService = MessagingServiceImpl.newMessagingService(liveConf);
+
+ clearData(getTestDir() + "/live");
+
+ liveService.start();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void tearDown() throws Exception
+ {
+ assertEquals(0, backupService.getServer().getRemotingService().getConnections().size());
+
+ backupService.stop();
+
+ assertEquals(0, liveService.getServer().getRemotingService().getConnections().size());
+
+ liveService.stop();
+
+ assertEquals(0, InVMRegistry.instance.size());
+
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/failover/JustReplicationTest.java 2008-12-11 01:25:42 UTC (rev 5514)
@@ -0,0 +1,158 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2008, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.cluster.failover;
+
+import java.nio.ByteBuffer;
+
+import org.jboss.messaging.core.client.ClientConsumer;
+import org.jboss.messaging.core.client.ClientMessage;
+import org.jboss.messaging.core.client.ClientProducer;
+import org.jboss.messaging.core.client.ClientSession;
+import org.jboss.messaging.core.client.ClientSessionFactory;
+import org.jboss.messaging.core.client.impl.ClientSessionImpl;
+import org.jboss.messaging.core.exception.MessagingException;
+import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ByteBufferWrapper;
+import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.util.SimpleString;
+
+/**
+ * A LargeMessageFailoverTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Dec 8, 2008 7:09:38 PM
+ *
+ *
+ */
+public class JustReplicationTest extends FailoverTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final SimpleString ADDRESS = new SimpleString("FailoverTestAddress");
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+
+ public void testJustReplication() throws Exception
+ {
+ ClientSessionFactory factory = createFailoverFactory();
+ factory.setBlockOnAcknowledge(true);
+ factory.setBlockOnNonPersistentSend(true);
+ factory.setBlockOnPersistentSend(true);
+
+ ClientSession session = factory.createSession(null, null, false, true, true, false, 0);
+
+ final int numberOfMessages = 200;
+
+ final int numberOfBytes = 1200;
+
+ try
+ {
+
+ session.createQueue(ADDRESS, ADDRESS, null, true, false, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ // he remotingConnection could be used to force a failure
+ // final RemotingConnection conn = ((ClientSessionImpl)session).getConnection();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = session.createClientMessage(true);
+
+ ByteBuffer buffer = ByteBuffer.allocate(numberOfBytes);
+
+ buffer.putInt(i);
+
+ buffer.rewind();
+
+ message.setBody(new ByteBufferWrapper(buffer));
+
+ producer.send(message);
+
+ }
+
+ ClientConsumer consumer = session.createConsumer(ADDRESS);
+
+ session.start();
+
+ for (int i = 0; i < numberOfMessages; i++)
+ {
+ ClientMessage message = consumer.receive(5000);
+
+ assertNotNull(message);
+
+ message.acknowledge();
+
+ MessagingBuffer buffer = message.getBody();
+
+ buffer.rewind();
+
+ assertEquals(numberOfBytes, buffer.limit());
+
+ assertEquals(i, buffer.getInt());
+ }
+
+ assertNull(consumer.receive(500));
+ }
+ finally
+ {
+ try
+ {
+ session.close();
+ }
+ catch (Exception ignored)
+ {
+ }
+ }
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ @Override
+ protected void setUp() throws Exception
+ {
+ setUpFileBased();
+ }
+
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
More information about the jboss-cvs-commits
mailing list