[jboss-cvs] JBoss Messaging SVN: r5717 - in trunk: src/main/org/jboss/messaging/core/server and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Sun Jan 25 00:39:20 EST 2009
Author: clebert.suconic at jboss.com
Date: 2009-01-25 00:39:20 -0500 (Sun, 25 Jan 2009)
New Revision: 5717
Added:
trunk/tests/src/org/jboss/messaging/tests/integration/persistence/
trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
Modified:
trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
Log:
Adding copy method on LargeMessage
Modified: trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/src/main/org/jboss/messaging/core/persistence/impl/journal/JournalLargeServerMessage.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -31,6 +31,7 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.LargeServerMessage;
+import org.jboss.messaging.core.server.ServerMessage;
import org.jboss.messaging.core.server.impl.ServerMessageImpl;
/**
@@ -70,6 +71,15 @@
this.storageManager = storageManager;
}
+ public JournalLargeServerMessage(final JournalLargeServerMessage copy, final SequentialFile fileCopy)
+ {
+ super(copy);
+ this.storageManager = copy.storageManager;
+ this.file = fileCopy;
+ this.complete = true;
+ this.bodySize = copy.bodySize;
+ }
+
// Public --------------------------------------------------------
/* (non-Javadoc)
@@ -237,11 +247,49 @@
}
}
}
+
+ // TODO: Optimize this per https://jira.jboss.org/jira/browse/JBMESSAGING-1468
+ public synchronized ServerMessage copy(final long newID) throws Exception
+ {
+ SequentialFile newfile = storageManager.createFileForLargeMessage(newID, complete);
+
+ file.open();
+ newfile.open();
+
+ file.position(0);
+ newfile.position(0);
+
+ ByteBuffer buffer = ByteBuffer.allocate(100 * 1024);
+
+ for (long i = 0;i<file.size();)
+ {
+ buffer.rewind();
+ file.read(buffer);
+ newfile.write(buffer, false);
+ i+=buffer.limit();
+ }
+
+
+ file.close();
+ newfile.close();
+
+ JournalLargeServerMessage newMessage = new JournalLargeServerMessage(this, newfile);
+ newMessage.setMessageID(newID);
+
+ return newMessage;
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
+ protected void finalize() throws Throwable
+ {
+ releaseResources();
+ super.finalize();
+ }
+
// Private -------------------------------------------------------
private synchronized void validateFile() throws Exception
Modified: trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/src/main/org/jboss/messaging/core/server/ServerMessage.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -47,7 +47,7 @@
int decrementRefCount();
- ServerMessage copy();
+ ServerMessage copy(long newID) throws Exception;
int getMemoryEstimate();
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -947,15 +947,13 @@
and original message id
*/
- ServerMessage copy = message.copy();
-
// (JBMESSAGING-1468)
// FIXME - this won't work with replication!!!!!!!!!!!
- // FIXME - this won't work with LargeMessages also!!!!
long newMessageId = storageManager.generateUniqueID();
- copy.setMessageID(newMessageId);
+ ServerMessage copy = message.copy(newMessageId);
+
SimpleString originalQueue = copy.getDestination();
copy.putStringProperty(HDR_ORIGINAL_DESTINATION, originalQueue);
copy.putLongProperty(HDR_ORIG_MESSAGE_ID, message.getMessageID());
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/ServerMessageImpl.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -152,10 +152,12 @@
return memoryEstimate;
}
- public ServerMessage copy()
+ public ServerMessage copy(final long newID) throws Exception
{
ServerMessage m = new ServerMessageImpl(this);
+ m.setMessageID(newID);
+
return m;
}
Added: trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java (rev 0)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -0,0 +1,92 @@
+/*
+ * JBoss, Home of Professional Open Source
+ * Copyright 2005-2009, Red Hat Middleware LLC, and individual contributors
+ * by the @authors tag. See the copyright.txt in the distribution for a
+ * full listing of individual contributors.
+ *
+ * This is free software; you can redistribute it and/or modify it
+ * under the terms of the GNU Lesser General Public License as
+ * published by the Free Software Foundation; either version 2.1 of
+ * the License, or (at your option) any later version.
+ *
+ * This software is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this software; if not, write to the Free
+ * Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
+ * 02110-1301 USA, or see the FSF site: http://www.fsf.org.
+ */
+
+package org.jboss.messaging.tests.integration.persistence;
+
+import java.io.File;
+
+import org.jboss.messaging.core.config.impl.FileConfiguration;
+import org.jboss.messaging.core.persistence.impl.journal.JournalStorageManager;
+import org.jboss.messaging.core.server.JournalType;
+import org.jboss.messaging.core.server.LargeServerMessage;
+import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.tests.util.ServiceTestBase;
+import org.jboss.messaging.tests.util.UnitTestCase;
+
+/**
+ * A JournalStorageManagerIntegrationTest
+ *
+ * @author <a href="mailto:clebert.suconic at jboss.org">Clebert Suconic</a>
+ *
+ * Created Jan 24, 2009 11:14:13 PM
+ *
+ *
+ */
+public class JournalStorageManagerIntegrationTest extends ServiceTestBase
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+
+ public void testLargeMessageCopy() throws Exception
+ {
+ clearData();
+ FileConfiguration configuration = createFileConfig();
+
+ configuration.start();
+
+ configuration.setJournalType(JournalType.NIO);
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration);
+ journal.start();
+
+ LargeServerMessage msg = journal.createLargeMessage();
+ msg.setMessageID(1);
+
+ byte[] data = new byte[1024];
+
+ for (int i = 0; i < 110; i++)
+ msg.addBytes(data);
+
+ ServerMessage msg2 = msg.copy(2);
+
+ assertEquals(110 * 1024, msg.getBodySize());
+ assertEquals(110 * 1024, msg2.getBodySize());
+
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/tests/src/org/jboss/messaging/tests/unit/core/server/impl/ServerMessageImplTest.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -65,7 +65,7 @@
assertEquals(id, msg.getMessageID());
}
- public void testCopyConstructor()
+ public void testCopyConstructor() throws Exception
{
for (int j = 0; j < 10; j++)
{
@@ -89,7 +89,7 @@
assertMessagesEquivalent(message, message2);
- ServerMessage message3 = message2.copy();
+ ServerMessage message3 = message2.copy(message2.getMessageID());
assertMessagesEquivalent(message2, message3);
}
Modified: trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-24 12:31:03 UTC (rev 5716)
+++ trunk/tests/src/org/jboss/messaging/tests/util/ServiceTestBase.java 2009-01-25 05:39:20 UTC (rev 5717)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.config.Configuration;
import org.jboss.messaging.core.config.TransportConfiguration;
import org.jboss.messaging.core.config.impl.ConfigurationImpl;
+import org.jboss.messaging.core.config.impl.FileConfiguration;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
import org.jboss.messaging.core.remoting.impl.invm.InVMConnectorFactory;
@@ -58,7 +59,6 @@
// Attributes ----------------------------------------------------
-
protected static final String INVM_ACCEPTOR_FACTORY = InVMAcceptorFactory.class.getCanonicalName();
protected static final String INVM_CONNECTOR_FACTORY = InVMConnectorFactory.class.getCanonicalName();
@@ -78,13 +78,11 @@
// Protected -----------------------------------------------------
-
protected void clearData()
{
clearData(getTestDir());
}
-
protected void clearData(String testDir)
{
deleteAndCreateDir(getJournalDir(testDir));
@@ -102,6 +100,15 @@
file.mkdirs();
}
+ protected FileConfiguration createFileConfig()
+ {
+ FileConfiguration config = new FileConfiguration();
+ config.setJournalDirectory(getJournalDir());
+ config.setBindingsDirectory(getBindingsDir());
+ config.setLargeMessagesDirectory(getLargeMessagesDir());
+ return config;
+ }
+
protected MessagingService createService(final boolean realFiles,
final Configuration configuration,
final Map<String, QueueSettings> settings)
@@ -135,12 +142,16 @@
{
return createService(realFiles, configuration, new HashMap<String, QueueSettings>());
}
-
- protected MessagingService createClusteredServiceWithParams(final int index, final boolean realFiles, final Map<String, Object> params)
+
+ protected MessagingService createClusteredServiceWithParams(final int index,
+ final boolean realFiles,
+ final Map<String, Object> params)
{
- return createService(realFiles, createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY), new HashMap<String, QueueSettings>());
+ return createService(realFiles,
+ createClusteredDefaultConfig(index, params, INVM_ACCEPTOR_FACTORY),
+ new HashMap<String, QueueSettings>());
}
-
+
protected Configuration createDefaultConfig()
{
return createDefaultConfig(false);
@@ -155,18 +166,20 @@
else
{
return createDefaultConfig(new HashMap<String, Object>(), INVM_ACCEPTOR_FACTORY);
- }
+ }
}
-
- protected Configuration createClusteredDefaultConfig(final int index, final Map<String, Object> params, final String... acceptors)
+
+ protected Configuration createClusteredDefaultConfig(final int index,
+ final Map<String, Object> params,
+ final String... acceptors)
{
Configuration config = createDefaultConfig(index, params, acceptors);
-
+
config.setClustered(true);
-
+
return config;
}
-
+
protected Configuration createDefaultConfig(int index, final Map<String, Object> params, final String... acceptors)
{
Configuration configuration = new ConfigurationImpl();
@@ -223,8 +236,7 @@
protected ClientSessionFactory createFactory(final String connectorClass)
{
- return new ClientSessionFactoryImpl(new TransportConfiguration(connectorClass),
- null);
+ return new ClientSessionFactoryImpl(new TransportConfiguration(connectorClass), null);
}
More information about the jboss-cvs-commits
mailing list