JBoss hornetq SVN: r8074 - in branches/Replication_Clebert: src/main/org/hornetq/core/persistence and 3 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-08 22:45:05 -0400 (Thu, 08 Oct 2009)
New Revision: 8074
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
fixes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -191,7 +191,10 @@
public void close() throws Exception
{
- storageManager.pageClosed(storeName, pageId);
+ if (storageManager != null)
+ {
+ storageManager.pageClosed(storeName, pageId);
+ }
file.close();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -18,7 +18,6 @@
import javax.transaction.xa.Xid;
-import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -87,6 +87,8 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.remoting.impl.wireformat.RollbackMessage;
@@ -410,12 +412,12 @@
}
case REPLICATION_PAGE_WRITE:
{
- packet = new ReplicationResponseMessage();
+ packet = new ReplicationPageWriteMessage();
break;
}
case REPLICATION_PAGE_EVENT:
{
- packet = new ReplicationResponseMessage();
+ packet = new ReplicationPageEventMessage();
break;
}
default:
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -299,6 +299,12 @@
ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
Page page = pages.remove(packet.getPageNumber());
+
+ if (page == null)
+ {
+ page = getPage(packet.getStoreName(), packet.getPageNumber());
+ }
+
if (page != null)
{
@@ -331,7 +337,12 @@
if (resultIndex == null)
{
- resultIndex = pageIndex.putIfAbsent(storeName, new ConcurrentHashMap<Integer, Page>());
+ resultIndex = new ConcurrentHashMap<Integer, Page>();
+ ConcurrentMap<Integer, Page> mapResult = pageIndex.putIfAbsent(storeName, resultIndex);
+ if (mapResult != null)
+ {
+ resultIndex = mapResult;
+ }
}
return resultIndex;
@@ -365,6 +376,7 @@
if (page == null)
{
page = pageManager.getPageStore(storeName).createPage(pageId);
+ page.open();
map.put(pageId, page);
}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 00:45:01 UTC (rev 8073)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-09 02:45:05 UTC (rev 8074)
@@ -27,6 +27,7 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
+import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.client.impl.ClientSessionFactoryImpl;
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.client.impl.ConnectionManagerImpl;
@@ -40,6 +41,13 @@
import org.hornetq.core.journal.RecordInfo;
import org.hornetq.core.journal.TransactionFailureCallback;
import org.hornetq.core.management.ManagementService;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
+import org.hornetq.core.paging.impl.PagedMessageImpl;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
+import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Interceptor;
@@ -50,10 +58,15 @@
import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.HornetQServerImpl;
+import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.core.settings.HierarchicalRepository;
+import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
+import org.hornetq.utils.SimpleString;
/**
* A ReplicationTest
@@ -168,17 +181,8 @@
replicatedJournal.appendPrepareRecord(3, new FakeData(), false);
replicatedJournal.appendRollbackRecord(3, false);
- final CountDownLatch latch = new CountDownLatch(1);
- manager.afterReplicated(new Runnable()
- {
+ blockOnReplication(manager);
- public void run()
- {
- latch.countDown();
- }
-
- });
- assertTrue(latch.await(1, TimeUnit.SECONDS));
assertEquals(1, manager.getActiveTokens().size());
manager.completeToken();
@@ -194,6 +198,47 @@
}
assertEquals(0, manager.getActiveTokens().size());
+
+ ServerMessage msg = new ServerMessageImpl();
+
+ SimpleString dummy = new SimpleString("dummy");
+ msg.setDestination(dummy);
+ msg.setBody(ChannelBuffers.wrappedBuffer(new byte[10]));
+
+ replicatedJournal.appendAddRecordTransactional(23, 24, (byte)1, new FakeData());
+
+ PagedMessage pgmsg = new PagedMessageImpl(msg, -1);
+ manager.pageWrite(pgmsg, 1);
+ manager.pageWrite(pgmsg, 2);
+ manager.pageWrite(pgmsg, 3);
+ manager.pageWrite(pgmsg, 4);
+
+ blockOnReplication(manager);
+
+ PagingManager pagingManager = createPageManager(server.getStorageManager(),
+ server.getConfiguration(),
+ server.getExecutorFactory(),
+ server.getAddressSettingsRepository());
+
+ PagingStore store = pagingManager.getPageStore(dummy);
+ store.start();
+ assertEquals(5, store.getNumberOfPages());
+ store.stop();
+
+ manager.pageDeleted(dummy, 1);
+ manager.pageDeleted(dummy, 2);
+ manager.pageDeleted(dummy, 3);
+ manager.pageDeleted(dummy, 4);
+ manager.pageDeleted(dummy, 5);
+ manager.pageDeleted(dummy, 6);
+
+
+ blockOnReplication(manager);
+
+ store.start();
+
+ assertEquals(0, store.getNumberOfPages());
+
manager.stop();
}
finally
@@ -201,7 +246,27 @@
server.stop();
}
}
-
+
+ /**
+ * @param manager
+ * @return
+ */
+ private void blockOnReplication(ReplicationManagerImpl manager) throws Exception
+ {
+ final CountDownLatch latch = new CountDownLatch(1);
+ manager.afterReplicated(new Runnable()
+ {
+
+ public void run()
+ {
+ latch.countDown();
+ }
+
+ });
+
+ assertTrue(latch.await(30, TimeUnit.SECONDS));
+ }
+
public void testNoActions() throws Exception
{
@@ -359,6 +424,22 @@
}
+ protected PagingManager createPageManager(StorageManager storageManager,
+ Configuration configuration,
+ ExecutorFactory executorFactory,
+ HierarchicalRepository<AddressSettings> addressSettingsRepository) throws Exception
+ {
+
+ PagingManager paging = new PagingManagerImpl(new PagingStoreFactoryNIO(configuration.getPagingDirectory(),
+ executorFactory),
+ storageManager,
+ addressSettingsRepository,
+ false);
+
+ paging.start();
+ return paging;
+ }
+
// Private -------------------------------------------------------
// Inner classes -------------------------------------------------
16 years, 2 months
JBoss hornetq SVN: r8073 - in trunk: src/main/org/hornetq/jms/server/management/impl and 1 other directories.
by do-not-reply@jboss.org
Author: plugtree
Date: 2009-10-08 20:45:01 -0400 (Thu, 08 Oct 2009)
New Revision: 8073
Modified:
trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
Log:
Added queue pause and resume to JMSQueueControl. For more info see https://jira.jboss.org/jira/browse/HORNETQ-82
Modified: trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java 2009-10-09 00:43:49 UTC (rev 8072)
+++ trunk/src/main/org/hornetq/jms/server/management/JMSQueueControl.java 2009-10-09 00:45:01 UTC (rev 8073)
@@ -105,4 +105,14 @@
@Operation(desc = "List the message counters history as HTML", impact = INFO)
String listMessageCounterHistoryAsHTML() throws Exception;
+ @Operation(desc = "Pauses the queue.", impact = INFO)
+ void pause() throws Exception;
+
+ @Operation(desc = "Returns true if the queue is paused.", impact = INFO)
+ boolean isPaused() throws Exception;
+
+ @Operation(desc = "Resumes the queue.", impact = INFO)
+ void resume() throws Exception;
+
+
}
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-10-09 00:43:49 UTC (rev 8072)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSQueueControlImpl.java 2009-10-09 00:45:01 UTC (rev 8073)
@@ -305,6 +305,21 @@
return MessageCounterHelper.listMessageCounterHistoryAsHTML(new MessageCounter[] { counter });
}
+ public boolean isPaused() throws Exception
+ {
+ return coreQueueControl.isPaused();
+ }
+
+ public void pause() throws Exception
+ {
+ coreQueueControl.pause();
+ }
+
+ public void resume() throws Exception
+ {
+ coreQueueControl.resume();
+ }
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2009-10-09 00:43:49 UTC (rev 8072)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlTest.java 2009-10-09 00:45:01 UTC (rev 8073)
@@ -13,6 +13,7 @@
package org.hornetq.tests.integration.jms.server.management;
+import static org.hornetq.tests.integration.management.ManagementControlHelper.createHornetQServerControl;
import static org.hornetq.tests.integration.management.ManagementControlHelper.createJMSQueueControl;
import static org.hornetq.tests.util.RandomUtil.randomLong;
import static org.hornetq.tests.util.RandomUtil.randomSimpleString;
@@ -33,6 +34,8 @@
import org.hornetq.core.config.Configuration;
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
+import org.hornetq.core.management.HornetQServerControl;
+import org.hornetq.core.management.QueueControl;
import org.hornetq.core.remoting.impl.invm.InVMAcceptorFactory;
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.server.HornetQ;
@@ -705,7 +708,29 @@
JMSUtil.consumeMessages(1, queue);
}
+
+ public void testPauseAndResume()
+ {
+
+ try
+ {
+ JMSQueueControl queueControl = createManagementControl();
+
+ assertFalse(queueControl.isPaused());
+ queueControl.pause();
+ assertTrue(queueControl.isPaused());
+ queueControl.resume();
+ assertFalse(queueControl.isPaused());
+ }
+ catch (Exception e)
+ {
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+
+
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2009-10-09 00:43:49 UTC (rev 8072)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/server/management/JMSQueueControlUsingJMSTest.java 2009-10-09 00:45:01 UTC (rev 8073)
@@ -236,6 +236,21 @@
{
return (String)proxy.retrieveAttributeValue("JNDIBinding");
}
+
+ public boolean isPaused() throws Exception
+ {
+ return (Boolean)proxy.invokeOperation("isPaused");
+ }
+
+ public void pause() throws Exception
+ {
+ proxy.invokeOperation("pause");
+ }
+
+ public void resume() throws Exception
+ {
+ proxy.invokeOperation("resume");
+ }
};
}
16 years, 2 months
JBoss hornetq SVN: r8072 - trunk/docs/user-manual/en.
by do-not-reply@jboss.org
Author: plugtree
Date: 2009-10-08 20:43:49 -0400 (Thu, 08 Oct 2009)
New Revision: 8072
Modified:
trunk/docs/user-manual/en/management.xml
Log:
Added documentation for Queue's paused and resume capabilities.
Modified: trunk/docs/user-manual/en/management.xml
===================================================================
--- trunk/docs/user-manual/en/management.xml 2009-10-08 21:41:35 UTC (rev 8071)
+++ trunk/docs/user-manual/en/management.xml 2009-10-09 00:43:49 UTC (rev 8072)
@@ -211,6 +211,13 @@
if it was created with one, <literal>isDurable()</literal> to know wether the
queue is durable or not, etc.)</para>
</listitem>
+ <listitem>
+ <para>Pausing and resuming Queues</para>
+ <para>The <literal>QueueControl</literal> can pause and resume the underlying queue.
+ When a queue is paused, it will receive messages but will not deliver them. When it's resume, it'll begin
+ delivering the queued messages, if any.
+ </para>
+ </listitem>
</itemizedlist>
</section>
<section>
@@ -433,6 +440,13 @@
is temporary or not, <literal>isDurable()</literal> to know wether the queue is
durable or not, etc.)</para>
</listitem>
+ <listitem>
+ <para>Pausing and resuming queues</para>
+ <para>The <literal>JMSQueueControl</literal> can pause and resume the underlying queue.
+ When the queue is paused it will continue to receive messages but will not deliver them.
+ When resumed again it will deliver the enqueued messages, if any.
+ </para>
+ </listitem>
</itemizedlist>
</section>
<section>
16 years, 2 months
JBoss hornetq SVN: r8071 - in branches/Replication_Clebert: src/main/org/hornetq/core/replication/impl and 1 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-08 17:41:35 -0400 (Thu, 08 Oct 2009)
New Revision: 8071
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java
Removed:
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
Log:
Rename
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-08 21:40:37 UTC (rev 8070)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-08 21:41:35 UTC (rev 8071)
@@ -57,7 +57,7 @@
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
-import org.hornetq.core.replication.impl.ReplicatedJournalImpl;
+import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -192,7 +192,7 @@
if (replicator != null)
{
- this.bindingsJournal = new ReplicatedJournalImpl((byte)0, localBindings, replicator);
+ this.bindingsJournal = new ReplicatedJournal((byte)0, localBindings, replicator);
}
else
{
@@ -261,7 +261,7 @@
if (replicator != null)
{
- this.messageJournal = new ReplicatedJournalImpl((byte)1, localMessage, replicator);
+ this.messageJournal = new ReplicatedJournal((byte)1, localMessage, replicator);
}
else
{
Copied: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java (from rev 8068, branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java)
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournal.java 2009-10-08 21:41:35 UTC (rev 8071)
@@ -0,0 +1,399 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.replication.impl;
+
+import java.util.List;
+
+import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.journal.Journal;
+import org.hornetq.core.journal.LoaderCallback;
+import org.hornetq.core.journal.PreparedTransactionInfo;
+import org.hornetq.core.journal.RecordInfo;
+import org.hornetq.core.journal.TransactionFailureCallback;
+import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
+import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.replication.ReplicationManager;
+
+/**
+ * Used by the {@link JournalStorageManager} to replicate journal calls.
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ * @see JournalStorageManager
+ *
+ */
+public class ReplicatedJournal implements Journal
+{
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ private static final boolean trace = false;
+
+ private final ReplicationManager replicationManager;
+
+ private final Journal replicatedJournal;
+
+ private final byte journalID;
+
+ public ReplicatedJournal(final byte journaID,
+ final Journal replicatedJournal,
+ final ReplicationManager replicationManager)
+ {
+ super();
+ journalID = journaID;
+ this.replicatedJournal = replicatedJournal;
+ this.replicationManager = replicationManager;
+ }
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ // Public --------------------------------------------------------
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
+ */
+ public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
+ {
+ this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("Append record id = " + id + " recordType = " + recordType);
+ }
+ replicationManager.appendAddRecord(journalID, id, recordType, record);
+ replicatedJournal.appendAddRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
+ */
+ public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
+ {
+ this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendAddRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("Append record TXid = " + id + " recordType = " + recordType);
+ }
+ replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
+ replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param txID
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
+ */
+ public void appendCommitRecord(final long txID, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendCommit " + txID);
+ }
+ replicationManager.appendCommitRecord(journalID, txID);
+ replicatedJournal.appendCommitRecord(txID, sync);
+ }
+
+ /**
+ * @param id
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
+ */
+ public void appendDeleteRecord(final long id, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendDelete " + id);
+ }
+ replicationManager.appendDeleteRecord(journalID, id);
+ replicatedJournal.appendDeleteRecord(id, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
+ */
+ public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
+ {
+ this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendDelete txID=" + txID + " id=" + id);
+ }
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
+ replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
+ */
+ public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
+ }
+ replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
+ replicatedJournal.appendDeleteRecordTransactional(txID, id);
+ }
+
+ /**
+ * @param txID
+ * @param transactionData
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
+ */
+ public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
+ {
+ this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
+ }
+
+ /**
+ * @param txID
+ * @param transactionData
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendPrepare txID=" + txID);
+ }
+ replicationManager.appendPrepareRecord(journalID, txID, transactionData);
+ replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
+ }
+
+ /**
+ * @param txID
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
+ */
+ public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendRollback " + txID);
+ }
+ replicationManager.appendRollbackRecord(journalID, txID);
+ replicatedJournal.appendRollbackRecord(txID, sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
+ */
+ public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
+ {
+ this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
+ }
+
+ /**
+ * @param id
+ * @param recordType
+ * @param record
+ * @param sync
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
+ */
+ public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
+ }
+ replicationManager.appendUpdateRecord(journalID, id, recordType, record);
+ replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
+ */
+ public void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final byte[] record) throws Exception
+ {
+ this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
+ }
+
+ /**
+ * @param txID
+ * @param id
+ * @param recordType
+ * @param record
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
+ */
+ public void appendUpdateRecordTransactional(final long txID,
+ final long id,
+ final byte recordType,
+ final EncodingSupport record) throws Exception
+ {
+ if (trace)
+ {
+ System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
+ }
+ replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
+ replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
+ }
+
+ /**
+ * @param committedRecords
+ * @param preparedTransactions
+ * @param transactionFailure
+ * @return
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
+ */
+ public long load(final List<RecordInfo> committedRecords,
+ final List<PreparedTransactionInfo> preparedTransactions,
+ final TransactionFailureCallback transactionFailure) throws Exception
+ {
+ return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
+ }
+
+ /**
+ * @param reloadManager
+ * @return
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
+ */
+ public long load(final LoaderCallback reloadManager) throws Exception
+ {
+ return replicatedJournal.load(reloadManager);
+ }
+
+ /**
+ * @param pages
+ * @throws Exception
+ * @see org.hornetq.core.journal.Journal#perfBlast(int)
+ */
+ public void perfBlast(final int pages) throws Exception
+ {
+ replicatedJournal.perfBlast(pages);
+ }
+
+ /**
+ * @throws Exception
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
+ {
+ replicatedJournal.start();
+ }
+
+ /**
+ * @throws Exception
+ * @see org.hornetq.core.server.HornetQComponent#stop()
+ */
+ public void stop() throws Exception
+ {
+ replicatedJournal.stop();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.journal.Journal#getAlignment()
+ */
+ public int getAlignment() throws Exception
+ {
+ return replicatedJournal.getAlignment();
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
+ {
+ return replicatedJournal.isStarted();
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Deleted: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicatedJournalImpl.java 2009-10-08 21:41:35 UTC (rev 8071)
@@ -1,399 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.replication.impl;
-
-import java.util.List;
-
-import org.hornetq.core.journal.EncodingSupport;
-import org.hornetq.core.journal.Journal;
-import org.hornetq.core.journal.LoaderCallback;
-import org.hornetq.core.journal.PreparedTransactionInfo;
-import org.hornetq.core.journal.RecordInfo;
-import org.hornetq.core.journal.TransactionFailureCallback;
-import org.hornetq.core.journal.impl.JournalImpl.ByteArrayEncoding;
-import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
-import org.hornetq.core.replication.ReplicationManager;
-
-/**
- * Used by the {@link JournalStorageManager} to replicate journal calls.
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- * @see JournalStorageManager
- *
- */
-public class ReplicatedJournalImpl implements Journal
-{
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- private static final boolean trace = false;
-
- private final ReplicationManager replicationManager;
-
- private final Journal replicatedJournal;
-
- private final byte journalID;
-
- public ReplicatedJournalImpl(final byte journaID,
- final Journal replicatedJournal,
- final ReplicationManager replicationManager)
- {
- super();
- journalID = journaID;
- this.replicatedJournal = replicatedJournal;
- this.replicationManager = replicationManager;
- }
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
- /**
- * @param id
- * @param recordType
- * @param record
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, byte[], boolean)
- */
- public void appendAddRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
- {
- this.appendAddRecord(id, recordType, new ByteArrayEncoding(record), sync);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendAddRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
- public void appendAddRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("Append record id = " + id + " recordType = " + recordType);
- }
- replicationManager.appendAddRecord(journalID, id, recordType, record);
- replicatedJournal.appendAddRecord(id, recordType, record, sync);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, byte[])
- */
- public void appendAddRecordTransactional(final long txID, final long id, final byte recordType, final byte[] record) throws Exception
- {
- this.appendAddRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendAddRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendAddRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception
- {
- if (trace)
- {
- System.out.println("Append record TXid = " + id + " recordType = " + recordType);
- }
- replicationManager.appendAddRecordTransactional(journalID, txID, id, recordType, record);
- replicatedJournal.appendAddRecordTransactional(txID, id, recordType, record);
- }
-
- /**
- * @param txID
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendCommitRecord(long, boolean)
- */
- public void appendCommitRecord(final long txID, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendCommit " + txID);
- }
- replicationManager.appendCommitRecord(journalID, txID);
- replicatedJournal.appendCommitRecord(txID, sync);
- }
-
- /**
- * @param id
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendDeleteRecord(long, boolean)
- */
- public void appendDeleteRecord(final long id, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendDelete " + id);
- }
- replicationManager.appendDeleteRecord(journalID, id);
- replicatedJournal.appendDeleteRecord(id, sync);
- }
-
- /**
- * @param txID
- * @param id
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, byte[])
- */
- public void appendDeleteRecordTransactional(final long txID, final long id, final byte[] record) throws Exception
- {
- this.appendDeleteRecordTransactional(txID, id, new ByteArrayEncoding(record));
- }
-
- /**
- * @param txID
- * @param id
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendDeleteRecordTransactional(final long txID, final long id, final EncodingSupport record) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendDelete txID=" + txID + " id=" + id);
- }
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id, record);
- replicatedJournal.appendDeleteRecordTransactional(txID, id, record);
- }
-
- /**
- * @param txID
- * @param id
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendDeleteRecordTransactional(long, long)
- */
- public void appendDeleteRecordTransactional(final long txID, final long id) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendDelete (noencoding) txID=" + txID + " id=" + id);
- }
- replicationManager.appendDeleteRecordTransactional(journalID, txID, id);
- replicatedJournal.appendDeleteRecordTransactional(txID, id);
- }
-
- /**
- * @param txID
- * @param transactionData
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, byte[], boolean)
- */
- public void appendPrepareRecord(final long txID, final byte[] transactionData, final boolean sync) throws Exception
- {
- this.appendPrepareRecord(txID, new ByteArrayEncoding(transactionData), sync);
- }
-
- /**
- * @param txID
- * @param transactionData
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendPrepareRecord(long, org.hornetq.core.journal.EncodingSupport, boolean)
- */
- public void appendPrepareRecord(final long txID, final EncodingSupport transactionData, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendPrepare txID=" + txID);
- }
- replicationManager.appendPrepareRecord(journalID, txID, transactionData);
- replicatedJournal.appendPrepareRecord(txID, transactionData, sync);
- }
-
- /**
- * @param txID
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendRollbackRecord(long, boolean)
- */
- public void appendRollbackRecord(final long txID, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendRollback " + txID);
- }
- replicationManager.appendRollbackRecord(journalID, txID);
- replicatedJournal.appendRollbackRecord(txID, sync);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, byte[], boolean)
- */
- public void appendUpdateRecord(final long id, final byte recordType, final byte[] record, final boolean sync) throws Exception
- {
- this.appendUpdateRecord(id, recordType, new ByteArrayEncoding(record), sync);
- }
-
- /**
- * @param id
- * @param recordType
- * @param record
- * @param sync
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendUpdateRecord(long, byte, org.hornetq.core.journal.EncodingSupport, boolean)
- */
- public void appendUpdateRecord(final long id, final byte recordType, final EncodingSupport record, final boolean sync) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendUpdateRecord id = " + id + " , recordType = " + recordType);
- }
- replicationManager.appendUpdateRecord(journalID, id, recordType, record);
- replicatedJournal.appendUpdateRecord(id, recordType, record, sync);
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, byte[])
- */
- public void appendUpdateRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final byte[] record) throws Exception
- {
- this.appendUpdateRecordTransactional(txID, id, recordType, new ByteArrayEncoding(record));
- }
-
- /**
- * @param txID
- * @param id
- * @param recordType
- * @param record
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#appendUpdateRecordTransactional(long, long, byte, org.hornetq.core.journal.EncodingSupport)
- */
- public void appendUpdateRecordTransactional(final long txID,
- final long id,
- final byte recordType,
- final EncodingSupport record) throws Exception
- {
- if (trace)
- {
- System.out.println("AppendUpdateRecord txid=" + txID + " id = " + id + " , recordType = " + recordType);
- }
- replicationManager.appendUpdateRecordTransactional(journalID, txID, id, recordType, record);
- replicatedJournal.appendUpdateRecordTransactional(txID, id, recordType, record);
- }
-
- /**
- * @param committedRecords
- * @param preparedTransactions
- * @param transactionFailure
- * @return
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#load(java.util.List, java.util.List, org.hornetq.core.journal.TransactionFailureCallback)
- */
- public long load(final List<RecordInfo> committedRecords,
- final List<PreparedTransactionInfo> preparedTransactions,
- final TransactionFailureCallback transactionFailure) throws Exception
- {
- return replicatedJournal.load(committedRecords, preparedTransactions, transactionFailure);
- }
-
- /**
- * @param reloadManager
- * @return
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#load(org.hornetq.core.journal.LoaderCallback)
- */
- public long load(final LoaderCallback reloadManager) throws Exception
- {
- return replicatedJournal.load(reloadManager);
- }
-
- /**
- * @param pages
- * @throws Exception
- * @see org.hornetq.core.journal.Journal#perfBlast(int)
- */
- public void perfBlast(final int pages) throws Exception
- {
- replicatedJournal.perfBlast(pages);
- }
-
- /**
- * @throws Exception
- * @see org.hornetq.core.server.HornetQComponent#start()
- */
- public void start() throws Exception
- {
- replicatedJournal.start();
- }
-
- /**
- * @throws Exception
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
- replicatedJournal.stop();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.journal.Journal#getAlignment()
- */
- public int getAlignment() throws Exception
- {
- return replicatedJournal.getAlignment();
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
- */
- public boolean isStarted()
- {
- return replicatedJournal.isStarted();
- }
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-08 21:40:37 UTC (rev 8070)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/replication/ReplicationTest.java 2009-10-08 21:41:35 UTC (rev 8071)
@@ -47,7 +47,7 @@
import org.hornetq.core.remoting.impl.invm.InVMConnectorFactory;
import org.hornetq.core.remoting.server.impl.RemotingServiceImpl;
import org.hornetq.core.remoting.spi.HornetQBuffer;
-import org.hornetq.core.replication.impl.ReplicatedJournalImpl;
+import org.hornetq.core.replication.impl.ReplicatedJournal;
import org.hornetq.core.replication.impl.ReplicationManagerImpl;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.impl.HornetQServerImpl;
@@ -153,7 +153,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
@@ -218,7 +218,7 @@
ReplicationManagerImpl manager = new ReplicationManagerImpl(connectionManager, executor);
manager.start();
- Journal replicatedJournal = new ReplicatedJournalImpl((byte)1, new FakeJournal(), manager);
+ Journal replicatedJournal = new ReplicatedJournal((byte)1, new FakeJournal(), manager);
replicatedJournal.appendPrepareRecord(1, new FakeData(), false);
16 years, 2 months
JBoss hornetq SVN: r8070 - in branches/Replication_Clebert: src/main/org/hornetq/core/paging/impl and 8 other directories.
by do-not-reply@jboss.org
Author: clebert.suconic(a)jboss.com
Date: 2009-10-08 17:40:37 -0400 (Thu, 08 Oct 2009)
New Revision: 8070
Added:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageEventMessage.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java
Removed:
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java
Modified:
branches/Replication_Clebert/src/main/org/hornetq/core/paging/PagingStore.java
branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
Log:
Paging changes
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/PagingStore.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/PagingStore.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/PagingStore.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -54,6 +54,8 @@
public boolean readPage() throws Exception;
Page getCurrentPage();
+
+ Page createPage(final int page) throws Exception;
/**
*
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PageImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -28,6 +28,8 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.Page;
import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.persistence.StorageManager;
+import org.hornetq.utils.SimpleString;
/**
*
@@ -59,18 +61,30 @@
private final SequentialFileFactory fileFactory;
private final AtomicInteger size = new AtomicInteger(0);
+
+ private final StorageManager storageManager;
+
+ private final SimpleString storeName;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public PageImpl(final SequentialFileFactory factory, final SequentialFile file, final int pageId) throws Exception
+ public PageImpl(final SimpleString storeName, final StorageManager storageManager, final SequentialFileFactory factory, final SequentialFile file, final int pageId) throws Exception
{
this.pageId = pageId;
this.file = file;
- fileFactory = factory;
+ this.fileFactory = factory;
+ this.storageManager = storageManager;
+ this.storeName = storeName;
}
+
+ public PageImpl(final SequentialFileFactory factory, final SequentialFile file, final int pageId) throws Exception
+ {
+ this(null, null, factory, file, pageId);
+ }
+
// Public --------------------------------------------------------
// PagingFile implementation
@@ -154,9 +168,11 @@
numberOfMessages.incrementAndGet();
size.addAndGet(buffer.limit());
+ storageManager.pageWrite(message, pageId);
+
if (message.getMessage(null).isLargeMessage())
{
- // If we don't sync on large messages we could have the risk of files unnatended files on disk
+ // If we don't sync on large messages we could have the risk of unattended files on disk
sync();
}
}
@@ -175,6 +191,7 @@
public void close() throws Exception
{
+ storageManager.pageClosed(storeName, pageId);
file.close();
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -607,6 +607,35 @@
return currentPage;
}
+
+ public Page createPage(final int page) throws Exception
+ {
+ String fileName = createFileName(page);
+
+ if (fileFactory == null)
+ {
+ fileFactory = storeFactory.newFileFactory(getStoreName());
+ }
+
+ SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
+
+ file.open();
+
+ long size = file.size();
+
+ if (fileFactory.isSupportsCallbacks() && size < pageSize)
+ {
+ file.fill((int)size, (int)(pageSize - size), (byte)0);
+ }
+
+ file.position(0);
+
+ file.close();
+
+ return new PageImpl(this.storeName, storageManager, fileFactory, file, page);
+ }
+
+
// TestSupportPageStore ------------------------------------------
public void forceAnotherPage() throws Exception
@@ -700,36 +729,6 @@
// Protected -----------------------------------------------------
- // In order to test failures, we need to be able to extend this class
- // and replace the Page for another Page that will fail before the file is removed
- // That's why createPage is not a private method
- protected Page createPage(final int page) throws Exception
- {
- String fileName = createFileName(page);
-
- if (fileFactory == null)
- {
- fileFactory = storeFactory.newFileFactory(getStoreName());
- }
-
- SequentialFile file = fileFactory.createSequentialFile(fileName, 1000);
-
- file.open();
-
- long size = file.size();
-
- if (fileFactory.isSupportsCallbacks() && size < pageSize)
- {
- file.fill((int)size, (int)(pageSize - size), (byte)0);
- }
-
- file.position(0);
-
- file.close();
-
- return new PageImpl(fileFactory, file, page);
- }
-
// Private -------------------------------------------------------
/**
@@ -931,8 +930,9 @@
{
currentPage.close();
}
-
+
currentPage = createPage(currentPageId);
+
currentPageSize.set(0);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -18,7 +18,9 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.server.HornetQComponent;
@@ -44,6 +46,12 @@
{
// Message related operations
+ void pageClosed(SimpleString storeName, int pageNumber);
+
+ void pageDeleted(SimpleString storeName, int pageNumber);
+
+ void pageWrite(PagedMessage message, int pageNumber);
+
boolean isReplicated();
void afterReplicated(Runnable run);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -29,6 +29,7 @@
import javax.transaction.xa.Xid;
+import org.hornetq.core.buffers.ChannelBuffer;
import org.hornetq.core.buffers.ChannelBuffers;
import org.hornetq.core.config.Configuration;
import org.hornetq.core.exception.HornetQException;
@@ -47,6 +48,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.QueueBindingInfo;
@@ -290,6 +292,47 @@
return replicator != null;
}
+
+ // TODO: shouldn't those page methods be on the PageManager?
+
+ /*
+ *
+ * (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageClosed(SimpleString storeName, int pageNumber)
+ {
+ if (isReplicated())
+ {
+ replicator.pageClosed(storeName, pageNumber);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageDeleted(SimpleString storeName, int pageNumber)
+ {
+ if (isReplicated())
+ {
+ replicator.pageDeleted(storeName, pageNumber);
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.utils.SimpleString, int, org.hornetq.core.buffers.ChannelBuffer)
+ */
+ public void pageWrite(PagedMessage message, int pageNumber)
+ {
+ if (isReplicated())
+ {
+ replicator.pageWrite(message, pageNumber);
+ }
+ }
+
+
+ // TODO: shouldn't those page methods be on the PageManager? ^^^^
+
public void afterReplicated(Runnable run)
{
if (replicator == null)
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -21,6 +21,7 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.paging.PageTransactionInfo;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
@@ -268,4 +269,25 @@
{
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageClosed(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageClosed(SimpleString storeName, int pageNumber)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageDeleted(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageDeleted(SimpleString storeName, int pageNumber)
+ {
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.persistence.StorageManager#pageWrite(org.hornetq.core.paging.PagedMessage, int)
+ */
+ public void pageWrite(PagedMessage message, int pageNumber)
+ {
+ }
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/PacketDecoder.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -12,6 +12,9 @@
*/
package org.hornetq.core.remoting.impl;
+
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_EVENT;
+import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PAGE_WRITE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_PREPARE;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_DELETE_TX;
import static org.hornetq.core.remoting.impl.wireformat.PacketImpl.REPLICATION_COMMIT_ROLLBACK;
@@ -405,6 +408,16 @@
packet = new ReplicationResponseMessage();
break;
}
+ case REPLICATION_PAGE_WRITE:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
+ case REPLICATION_PAGE_EVENT:
+ {
+ packet = new ReplicationResponseMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/PacketImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -155,7 +155,11 @@
public static final byte REPLICATION_COMMIT_ROLLBACK = 86;
+ public static final byte REPLICATION_PAGE_WRITE = 87;
+
+ public static final byte REPLICATION_PAGE_EVENT = 88;
+
// Static --------------------------------------------------------
Deleted: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPacket.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -1,53 +0,0 @@
-/*
- * Copyright 2009 Red Hat, Inc.
- * Red Hat licenses this file to you under the Apache License, version
- * 2.0 (the "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * http://www.apache.org/licenses/LICENSE-2.0
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
- * implied. See the License for the specific language governing
- * permissions and limitations under the License.
- */
-
-package org.hornetq.core.remoting.impl.wireformat;
-
-/**
- * A ReplicationPacket
- *
- * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
- *
- *
- */
-public class ReplicationPacket extends PacketImpl
-{
-
- /**
- * @param type
- */
- public ReplicationPacket(byte type)
- {
- super(type);
- // TODO Auto-generated constructor stub
- }
-
- // Constants -----------------------------------------------------
-
- // Attributes ----------------------------------------------------
-
- // Static --------------------------------------------------------
-
- // Constructors --------------------------------------------------
-
- // Public --------------------------------------------------------
-
- // Package protected ---------------------------------------------
-
- // Protected -----------------------------------------------------
-
- // Private -------------------------------------------------------
-
- // Inner classes -------------------------------------------------
-
-}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageEventMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageEventMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageEventMessage.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -0,0 +1,117 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+import org.hornetq.utils.SimpleString;
+
+/**
+ * A ReplicationPageWrite
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPageEventMessage extends PacketImpl
+{
+
+ private int pageNumber;
+
+ private SimpleString storeName;
+
+ /**
+ * True = delete page, False = close page
+ */
+ private boolean isDelete;
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationPageEventMessage()
+ {
+ super(REPLICATION_PAGE_EVENT);
+ }
+
+ public ReplicationPageEventMessage(final SimpleString storeName, final int pageNumber, final boolean isDelete)
+ {
+ this();
+ this.pageNumber = pageNumber;
+ this.isDelete = isDelete;
+ this.storeName = storeName;
+ }
+
+ // Public --------------------------------------------------------
+
+ @Override
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + storeName.sizeof() + DataConstants.SIZE_BOOLEAN;
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeSimpleString(storeName);
+ buffer.writeInt(pageNumber);
+ buffer.writeBoolean(isDelete);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ storeName = buffer.readSimpleString();
+ pageNumber = buffer.readInt();
+ isDelete = buffer.readBoolean();
+ }
+
+ /**
+ * @return the pageNumber
+ */
+ public int getPageNumber()
+ {
+ return pageNumber;
+ }
+
+ /**
+ * @return the storeName
+ */
+ public SimpleString getStoreName()
+ {
+ return storeName;
+ }
+
+ /**
+ * @return the isDelete
+ */
+ public boolean isDelete()
+ {
+ return isDelete;
+ }
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Added: branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java (rev 0)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/remoting/impl/wireformat/ReplicationPageWriteMessage.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -0,0 +1,107 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.remoting.impl.wireformat;
+
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.impl.PagedMessageImpl;
+import org.hornetq.core.remoting.spi.HornetQBuffer;
+import org.hornetq.utils.DataConstants;
+
+/**
+ * A ReplicationPageWrite
+ *
+ * @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
+ *
+ *
+ */
+public class ReplicationPageWriteMessage extends PacketImpl
+{
+
+ int pageNumber;
+
+ PagedMessage pagedMessage;
+
+ // Constants -----------------------------------------------------
+
+ // Attributes ----------------------------------------------------
+
+ // Static --------------------------------------------------------
+
+ // Constructors --------------------------------------------------
+
+ public ReplicationPageWriteMessage()
+ {
+ super(REPLICATION_PAGE_WRITE);
+ }
+
+ public ReplicationPageWriteMessage(final PagedMessage pagedMessage, final int pageNumber)
+ {
+ this();
+ this.pageNumber = pageNumber;
+ this.pagedMessage = pagedMessage;
+ }
+
+ // Public --------------------------------------------------------
+
+ public int getRequiredBufferSize()
+ {
+ return BASIC_PACKET_SIZE +
+ DataConstants.SIZE_INT +
+ pagedMessage.getEncodeSize();
+
+ }
+
+ @Override
+ public void encodeBody(final HornetQBuffer buffer)
+ {
+ buffer.writeInt(pageNumber);
+ pagedMessage.encode(buffer);
+ }
+
+ @Override
+ public void decodeBody(final HornetQBuffer buffer)
+ {
+ this.pageNumber = buffer.readInt();
+ pagedMessage = new PagedMessageImpl();
+ pagedMessage.decode(buffer);
+ }
+
+ /**
+ * @return the pageNumber
+ */
+ public int getPageNumber()
+ {
+ return pageNumber;
+ }
+
+ /**
+ * @return the pagedMessage
+ */
+ public PagedMessage getPagedMessage()
+ {
+ return pagedMessage;
+ }
+
+
+
+
+ // Package protected ---------------------------------------------
+
+ // Protected -----------------------------------------------------
+
+ // Private -------------------------------------------------------
+
+ // Inner classes -------------------------------------------------
+
+}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/ReplicationManager.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -16,7 +16,9 @@
import java.util.Set;
import org.hornetq.core.journal.EncodingSupport;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.utils.SimpleString;
/**
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
@@ -53,4 +55,23 @@
/** A list of tokens that are still waiting for replications to be completed */
Set<ReplicationToken> getActiveTokens();
+ /**
+ * @param storeName
+ * @param pageNumber
+ */
+ void pageClosed(SimpleString storeName, int pageNumber);
+
+ /**
+ * @param storeName
+ * @param pageNumber
+ */
+ void pageDeleted(SimpleString storeName, int pageNumber);
+
+
+ /**
+ * @param storeName
+ * @param pageNumber
+ */
+ void pageWrite(PagedMessage message, int pageNumber);
+
}
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationEndpointImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -13,9 +13,17 @@
package org.hornetq.core.replication.impl;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+
import org.hornetq.core.config.Configuration;
import org.hornetq.core.journal.Journal;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.Page;
+import org.hornetq.core.paging.PagedMessage;
+import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.impl.PagingManagerImpl;
+import org.hornetq.core.paging.impl.PagingStoreFactoryNIO;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.Packet;
@@ -25,10 +33,14 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationResponseMessage;
import org.hornetq.core.replication.ReplicationEndpoint;
import org.hornetq.core.server.HornetQServer;
+import org.hornetq.core.server.ServerMessage;
+import org.hornetq.utils.SimpleString;
/**
*
@@ -57,6 +69,10 @@
private JournalStorageManager storage;
+ private PagingManager pageManager;
+
+ private final ConcurrentMap<SimpleString, ConcurrentMap<Integer, Page>> pageIndex = new ConcurrentHashMap<SimpleString, ConcurrentMap<Integer, Page>>();
+
// Constructors --------------------------------------------------
public ReplicationEndpointImpl(final HornetQServer server)
{
@@ -74,28 +90,37 @@
{
if (packet.getType() == PacketImpl.REPLICATION_APPEND)
{
- handleAppendAddRecord(packet);
+ handleAppendAddRecord((ReplicationAddMessage)packet);
}
else if (packet.getType() == PacketImpl.REPLICATION_APPEND_TX)
{
- handleAppendAddTXRecord(packet);
+ handleAppendAddTXRecord((ReplicationAddTXMessage)packet);
}
else if (packet.getType() == PacketImpl.REPLICATION_DELETE)
{
- handleAppendDelete(packet);
+ handleAppendDelete((ReplicationDeleteMessage)packet);
}
else if (packet.getType() == PacketImpl.REPLICATION_DELETE_TX)
{
- handleAppendDeleteTX(packet);
+ handleAppendDeleteTX((ReplicationDeleteTXMessage)packet);
}
else if (packet.getType() == PacketImpl.REPLICATION_PREPARE)
{
- handlePrepare(packet);
+ handlePrepare((ReplicationPrepareMessage)packet);
}
else if (packet.getType() == PacketImpl.REPLICATION_COMMIT_ROLLBACK)
{
- handleCommitRollback(packet);
+ handleCommitRollback((ReplicationCommitMessage)packet);
}
+ else if (packet.getType() == PacketImpl.REPLICATION_PAGE_WRITE)
+ {
+ handlePageWrite((ReplicationPageWriteMessage)packet);
+ }
+ else if (packet.getType() == PacketImpl.REPLICATION_PAGE_EVENT)
+ {
+ handlePageEvent((ReplicationPageEventMessage)packet);
+ }
+
}
catch (Exception e)
{
@@ -120,8 +145,7 @@
{
Configuration config = server.getConfiguration();
- // TODO: this needs an executor
- storage = new JournalStorageManager(config, null);
+ storage = new JournalStorageManager(config, server.getExecutorFactory().getExecutor());
storage.start();
bindingsJournal = storage.getBindingsJournal();
@@ -129,6 +153,15 @@
// We only need to load internal structures on the backup...
storage.loadInternalOnly();
+
+ pageManager = new PagingManagerImpl(new PagingStoreFactoryNIO(config.getPagingDirectory(),
+ server.getExecutorFactory()),
+ storage,
+ server.getAddressSettingsRepository(),
+ false);
+
+ pageManager.start();
+
}
/* (non-Javadoc)
@@ -165,82 +198,70 @@
/**
* @param packet
*/
- private void handleCommitRollback(final Packet packet) throws Exception
+ private void handleCommitRollback(final ReplicationCommitMessage packet) throws Exception
{
- ReplicationCommitMessage commitMessage = (ReplicationCommitMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(commitMessage.getJournalID());
-
- if (commitMessage.isRollback())
+ if (packet.isRollback())
{
- journalToUse.appendRollbackRecord(commitMessage.getTxId(), false);
+ journalToUse.appendRollbackRecord(packet.getTxId(), false);
}
else
{
- journalToUse.appendCommitRecord(commitMessage.getTxId(), false);
+ journalToUse.appendCommitRecord(packet.getTxId(), false);
}
}
/**
* @param packet
*/
- private void handlePrepare(final Packet packet) throws Exception
+ private void handlePrepare(final ReplicationPrepareMessage packet) throws Exception
{
- ReplicationPrepareMessage prepareMessage = (ReplicationPrepareMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(prepareMessage.getJournalID());
-
- journalToUse.appendPrepareRecord(prepareMessage.getTxId(), prepareMessage.getRecordData(), false);
+ journalToUse.appendPrepareRecord(packet.getTxId(), packet.getRecordData(), false);
}
/**
* @param packet
*/
- private void handleAppendDeleteTX(final Packet packet) throws Exception
+ private void handleAppendDeleteTX(final ReplicationDeleteTXMessage packet) throws Exception
{
- ReplicationDeleteTXMessage deleteMessage = (ReplicationDeleteTXMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(deleteMessage.getJournalID());
-
- journalToUse.appendDeleteRecordTransactional(deleteMessage.getTxId(),
- deleteMessage.getId(),
- deleteMessage.getRecordData());
+ journalToUse.appendDeleteRecordTransactional(packet.getTxId(), packet.getId(), packet.getRecordData());
}
/**
* @param packet
*/
- private void handleAppendDelete(final Packet packet) throws Exception
+ private void handleAppendDelete(final ReplicationDeleteMessage packet) throws Exception
{
- ReplicationDeleteMessage deleteMessage = (ReplicationDeleteMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(deleteMessage.getJournalID());
-
- journalToUse.appendDeleteRecord(deleteMessage.getId(), false);
+ journalToUse.appendDeleteRecord(packet.getId(), false);
}
/**
* @param packet
*/
- private void handleAppendAddTXRecord(final Packet packet) throws Exception
+ private void handleAppendAddTXRecord(final ReplicationAddTXMessage packet) throws Exception
{
- ReplicationAddTXMessage addMessage = (ReplicationAddTXMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(addMessage.getJournalID());
-
- if (addMessage.isUpdate())
+ if (packet.isUpdate())
{
- journalToUse.appendUpdateRecordTransactional(addMessage.getTxId(),
- addMessage.getId(),
- addMessage.getRecordType(),
- addMessage.getRecordData());
+ journalToUse.appendUpdateRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
}
else
{
- journalToUse.appendAddRecordTransactional(addMessage.getTxId(),
- addMessage.getId(),
- addMessage.getRecordType(),
- addMessage.getRecordData());
+ journalToUse.appendAddRecordTransactional(packet.getTxId(),
+ packet.getId(),
+ packet.getRecordType(),
+ packet.getRecordData());
}
}
@@ -248,34 +269,109 @@
* @param packet
* @throws Exception
*/
- private void handleAppendAddRecord(final Packet packet) throws Exception
+ private void handleAppendAddRecord(final ReplicationAddMessage packet) throws Exception
{
- ReplicationAddMessage addMessage = (ReplicationAddMessage)packet;
+ Journal journalToUse = getJournal(packet.getJournalID());
- Journal journalToUse = getJournal(addMessage.getJournalID());
-
- if (addMessage.isUpdate())
+ if (packet.isUpdate())
{
if (trace)
{
- System.out.println("Endpoint appendUpdate id = " + addMessage.getId());
+ System.out.println("Endpoint appendUpdate id = " + packet.getId());
}
- journalToUse.appendUpdateRecord(addMessage.getId(),
- addMessage.getRecordType(),
- addMessage.getRecordData(),
- false);
+ journalToUse.appendUpdateRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
}
else
{
if (trace)
{
- System.out.println("Endpoint append id = " + addMessage.getId());
+ System.out.println("Endpoint append id = " + packet.getId());
}
- journalToUse.appendAddRecord(addMessage.getId(), addMessage.getRecordType(), addMessage.getRecordData(), false);
+ journalToUse.appendAddRecord(packet.getId(), packet.getRecordType(), packet.getRecordData(), false);
}
}
/**
+ * @param packet
+ */
+ private void handlePageEvent(final ReplicationPageEventMessage packet) throws Exception
+ {
+ ConcurrentMap<Integer, Page> pages = getPageMap(packet.getStoreName());
+
+ Page page = pages.remove(packet.getPageNumber());
+
+ if (page != null)
+ {
+ if (packet.isDelete())
+ {
+ page.delete();
+ }
+ else
+ {
+ page.close();
+ }
+ }
+
+ }
+
+ /**
+ * @param packet
+ */
+ private void handlePageWrite(final ReplicationPageWriteMessage packet) throws Exception
+ {
+ PagedMessage pgdMessage = packet.getPagedMessage();
+ ServerMessage msg = pgdMessage.getMessage(storage);
+ Page page = getPage(msg.getDestination(), packet.getPageNumber());
+ page.write(pgdMessage);
+ }
+
+ private ConcurrentMap<Integer, Page> getPageMap(final SimpleString storeName)
+ {
+ ConcurrentMap<Integer, Page> resultIndex = pageIndex.get(storeName);
+
+ if (resultIndex == null)
+ {
+ resultIndex = pageIndex.putIfAbsent(storeName, new ConcurrentHashMap<Integer, Page>());
+ }
+
+ return resultIndex;
+ }
+
+ private Page getPage(final SimpleString storeName, final int pageId) throws Exception
+ {
+ ConcurrentMap<Integer, Page> map = getPageMap(storeName);
+
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = newPage(pageId, storeName, map);
+ }
+
+ return page;
+ }
+
+ /**
+ * @param pageId
+ * @param map
+ * @return
+ */
+ private synchronized Page newPage(final int pageId,
+ final SimpleString storeName,
+ final ConcurrentMap<Integer, Page> map) throws Exception
+ {
+ Page page = map.get(pageId);
+
+ if (page == null)
+ {
+ page = pageManager.getPageStore(storeName).createPage(pageId);
+ map.put(pageId, page);
+ }
+
+ return page;
+ }
+
+ /**
* @param journalID
* @return
*/
Modified: branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java
===================================================================
--- branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/src/main/org/hornetq/core/replication/impl/ReplicationManagerImpl.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -21,6 +21,7 @@
import org.hornetq.core.client.impl.ConnectionManager;
import org.hornetq.core.journal.EncodingSupport;
import org.hornetq.core.logging.Logger;
+import org.hornetq.core.paging.PagedMessage;
import org.hornetq.core.remoting.Channel;
import org.hornetq.core.remoting.ChannelHandler;
import org.hornetq.core.remoting.Packet;
@@ -32,11 +33,14 @@
import org.hornetq.core.remoting.impl.wireformat.ReplicationCommitMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationDeleteTXMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageEventMessage;
+import org.hornetq.core.remoting.impl.wireformat.ReplicationPageWriteMessage;
import org.hornetq.core.remoting.impl.wireformat.ReplicationPrepareMessage;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.replication.ReplicationManager;
import org.hornetq.core.replication.ReplicationToken;
import org.hornetq.utils.ConcurrentHashSet;
+import org.hornetq.utils.SimpleString;
/**
* A RepplicationManagerImpl
@@ -217,6 +221,39 @@
}
/* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#pageClosed(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageClosed(final SimpleString storeName, final int pageNumber)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, false));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#pageDeleted(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageDeleted(final SimpleString storeName, final int pageNumber)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationPageEventMessage(storeName, pageNumber, true));
+ }
+ }
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.replication.ReplicationManager#pageWrite(org.hornetq.utils.SimpleString, int)
+ */
+ public void pageWrite(final PagedMessage message, final int pageNumber)
+ {
+ if (enabled)
+ {
+ sendReplicatePacket(new ReplicationPageWriteMessage(message, pageNumber));
+ }
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.HornetQComponent#isStarted()
*/
public synchronized boolean isStarted()
Modified: branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java
===================================================================
--- branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-10-08 17:09:12 UTC (rev 8069)
+++ branches/Replication_Clebert/tests/src/org/hornetq/tests/integration/paging/PageCrashTest.java 2009-10-08 21:40:37 UTC (rev 8070)
@@ -310,7 +310,7 @@
}
@Override
- protected Page createPage(final int page) throws Exception
+ public Page createPage(final int page) throws Exception
{
Page originalPage = super.createPage(page);
16 years, 2 months
JBoss hornetq SVN: r8069 - in trunk: src/main/org/hornetq/core/postoffice/impl and 3 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-08 13:09:12 -0400 (Thu, 08 Oct 2009)
New Revision: 8069
Modified:
trunk/src/main/org/hornetq/core/postoffice/Binding.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
Log:
more routing refactoring
Modified: trunk/src/main/org/hornetq/core/postoffice/Binding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/Binding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -15,6 +15,7 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.server.Bindable;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -43,12 +44,11 @@
boolean isHighAcceptPriority(ServerMessage message);
- //TODO find a better way
- void willRoute(ServerMessage message);
-
boolean isExclusive();
long getID();
int getDistance();
+
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -15,10 +15,8 @@
import java.nio.ByteBuffer;
import java.util.Collection;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -28,7 +26,6 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
-import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
@@ -199,9 +196,7 @@
if (theBinding != null)
{
- theBinding.willRoute(message);
-
- theBinding.getBindable().route(message, context);
+ theBinding.route(message, context);
}
}
@@ -230,8 +225,6 @@
}
else
{
- Set<Bindable> chosen = new HashSet<Bindable>();
-
for (Map.Entry<SimpleString, List<Binding>> entry : routingNameBindingMap.entrySet())
{
SimpleString routingName = entry.getKey();
@@ -341,18 +334,11 @@
if (theBinding != null)
{
- theBinding.willRoute(message);
-
- chosen.add(theBinding.getBindable());
+ theBinding.route(message, context);
}
routingNamePositions.put(routingName, pos);
}
-
- for (Bindable bindable : chosen)
- {
- bindable.route(message, context);
- }
}
}
}
@@ -371,9 +357,7 @@
if (binding != null)
{
- binding.willRoute(message);
-
- binding.getBindable().route(message, context);
+ binding.route(message, context);
}
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/DivertBinding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -19,6 +19,7 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.utils.SimpleString;
@@ -109,8 +110,9 @@
return true;
}
- public void willRoute(final ServerMessage message)
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ divert.route(message, context);
}
public int getDistance()
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/LocalQueueBinding.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Consumer;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
import org.hornetq.utils.SimpleString;
@@ -144,10 +145,11 @@
return false;
}
- public void willRoute(final ServerMessage message)
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ queue.route(message, context);
}
-
+
public boolean isQueueBinding()
{
return true;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -528,16 +528,16 @@
{
SimpleString address = message.getDestination();
- byte[] duplicateIDBytes = null;
-
Object duplicateID = message.getProperty(MessageImpl.HDR_DUPLICATE_DETECTION_ID);
DuplicateIDCache cache = null;
+
+ byte[] duplicateIDBytes = null;
if (duplicateID != null)
{
cache = getDuplicateIDCache(message.getDestination());
-
+
if (duplicateID instanceof SimpleString)
{
duplicateIDBytes = ((SimpleString)duplicateID).getData();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/RemoteQueueBindingImpl.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -26,6 +26,7 @@
import org.hornetq.core.postoffice.BindingType;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.RemoteQueueBinding;
import org.hornetq.utils.SimpleString;
@@ -183,12 +184,8 @@
return false;
}
- public void willRoute(final ServerMessage message)
- {
- //We add a header with the name of the queue, holding a list of the transient ids of the queues to route to
-
- //TODO - this can be optimised
-
+ public void route(final ServerMessage message, final RoutingContext context)
+ {
byte[] ids = (byte[])message.getProperty(idsHeaderName);
if (ids == null)
@@ -208,9 +205,16 @@
buff.putLong(remoteQueueID);
- message.putBytesProperty(idsHeaderName, ids);
+ message.putBytesProperty(idsHeaderName, ids);
+
+ if (!context.getQueues().contains(this.storeAndForwardQueue))
+ {
+ //There can be many remote bindings for the same node, we only want to add the message once to
+ //the s & f queue for that node
+ context.getQueues().add(storeAndForwardQueue);
+ }
}
-
+
public synchronized void addConsumer(final SimpleString filterString) throws Exception
{
if (filterString != null)
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionPropertyIndexes.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -11,7 +11,6 @@
* permissions and limitations under the License.
*/
-
package org.hornetq.core.transaction;
/**
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-08 09:46:28 UTC (rev 8068)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-08 17:09:12 UTC (rev 8069)
@@ -29,6 +29,7 @@
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
@@ -1009,11 +1010,12 @@
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.Binding#willRoute(org.hornetq.core.server.ServerMessage)
+ * @see org.hornetq.core.postoffice.Binding#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public void willRoute(final ServerMessage message)
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
}
16 years, 2 months
JBoss hornetq SVN: r8067 - trunk/tests/src/org/hornetq/tests/integration/jms/divert.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-08 05:20:02 -0400 (Thu, 08 Oct 2009)
New Revision: 8067
Modified:
trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
Log:
re-enabled test for https://jira.jboss.org/jira/browse/HORNETQ-165
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-08 09:15:11 UTC (rev 8066)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/divert/DivertAndACKClientTest.java 2009-10-08 09:20:02 UTC (rev 8067)
@@ -50,6 +50,8 @@
/**
* A DivertAndACKClientTest
+ *
+ * https://jira.jboss.org/jira/browse/HORNETQ-165
*
* @author <mailto:clebert.suconic@jboss.org">Clebert Suconic</a>
*
@@ -73,7 +75,6 @@
HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
-
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -97,7 +98,6 @@
HornetQQueue queueSource = (HornetQQueue)createQueue("Source");
HornetQQueue queueTarget = (HornetQQueue)createQueue("Dest");
-
Connection connection = cf.createConnection();
Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -119,12 +119,11 @@
// Package protected ---------------------------------------------
// Protected -----------------------------------------------------
-
+
protected boolean usePersistence()
{
return true;
}
-
protected Configuration createDefaultConfig(final boolean netty)
{
@@ -142,7 +141,7 @@
divertList.add(divert);
config.setDivertConfigurations(divertList);
-
+
return config;
}
@@ -151,42 +150,42 @@
// Inner classes -------------------------------------------------
protected void createCF(List<Pair<TransportConfiguration, TransportConfiguration>> connectorConfigs,
List<String> jndiBindings) throws Exception
- {
- int retryInterval = 1000;
- double retryIntervalMultiplier = 1.0;
- int reconnectAttempts = -1;
- boolean failoverOnServerShutdown = true;
- int callTimeout = 30000;
+ {
+ int retryInterval = 1000;
+ double retryIntervalMultiplier = 1.0;
+ int reconnectAttempts = -1;
+ boolean failoverOnServerShutdown = true;
+ int callTimeout = 30000;
- jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
- connectorConfigs,
- null,
- DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
- DEFAULT_CONNECTION_TTL,
- callTimeout,
- DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
- DEFAULT_MIN_LARGE_MESSAGE_SIZE,
- DEFAULT_CONSUMER_WINDOW_SIZE,
- DEFAULT_CONSUMER_MAX_RATE,
- DEFAULT_PRODUCER_WINDOW_SIZE,
- DEFAULT_PRODUCER_MAX_RATE,
- false, // TODO: set this to true, and the test will fail
- DEFAULT_BLOCK_ON_PERSISTENT_SEND,
- DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
- DEFAULT_AUTO_GROUP,
- DEFAULT_PRE_ACKNOWLEDGE,
- DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_ACK_BATCH_SIZE,
- DEFAULT_USE_GLOBAL_POOLS,
- DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
- DEFAULT_THREAD_POOL_MAX_SIZE,
- retryInterval,
- retryIntervalMultiplier,
- DEFAULT_MAX_RETRY_INTERVAL,
- reconnectAttempts,
- failoverOnServerShutdown,
- jndiBindings);
- }
+ jmsServer.createConnectionFactory("ManualReconnectionToSingleServerTest",
+ connectorConfigs,
+ null,
+ DEFAULT_CLIENT_FAILURE_CHECK_PERIOD,
+ DEFAULT_CONNECTION_TTL,
+ callTimeout,
+ DEFAULT_CACHE_LARGE_MESSAGE_CLIENT,
+ DEFAULT_MIN_LARGE_MESSAGE_SIZE,
+ DEFAULT_CONSUMER_WINDOW_SIZE,
+ DEFAULT_CONSUMER_MAX_RATE,
+ DEFAULT_PRODUCER_WINDOW_SIZE,
+ DEFAULT_PRODUCER_MAX_RATE,
+ true,
+ DEFAULT_BLOCK_ON_PERSISTENT_SEND,
+ DEFAULT_BLOCK_ON_NON_PERSISTENT_SEND,
+ DEFAULT_AUTO_GROUP,
+ DEFAULT_PRE_ACKNOWLEDGE,
+ DEFAULT_CONNECTION_LOAD_BALANCING_POLICY_CLASS_NAME,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_ACK_BATCH_SIZE,
+ DEFAULT_USE_GLOBAL_POOLS,
+ DEFAULT_SCHEDULED_THREAD_POOL_MAX_SIZE,
+ DEFAULT_THREAD_POOL_MAX_SIZE,
+ retryInterval,
+ retryIntervalMultiplier,
+ DEFAULT_MAX_RETRY_INTERVAL,
+ reconnectAttempts,
+ failoverOnServerShutdown,
+ jndiBindings);
+ }
}
16 years, 2 months
JBoss hornetq SVN: r8066 - in trunk: src/main/org/hornetq/core/postoffice and 4 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-08 05:15:11 -0400 (Thu, 08 Oct 2009)
New Revision: 8066
Modified:
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
a little more refactoring on routing
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -729,7 +729,7 @@
notificationMessage.putTypedProperties(notifProps);
- postOffice.route(notificationMessage, new RoutingContextImpl(null));
+ postOffice.route(notificationMessage);
}
}
}
Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -52,6 +52,8 @@
Bindings getMatchingBindings(SimpleString address);
+ void route(ServerMessage message) throws Exception;
+
void route(ServerMessage message, RoutingContext context) throws Exception;
MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -518,6 +518,11 @@
{
return addressManager.getMatchingBindings(address);
}
+
+ public void route(final ServerMessage message) throws Exception
+ {
+ route(message, new RoutingContextImpl(null));
+ }
public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
@@ -700,11 +705,6 @@
return reference;
}
- public void route(final ServerMessage message) throws Exception
- {
- route(message, new RoutingContextImpl(null));
- }
-
public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
{
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
@@ -1167,7 +1167,7 @@
// This could happen when the PageStore left the pageState
// TODO is this correct - don't we lose transactionality here???
- route(message, new RoutingContextImpl(null));
+ route(message);
}
first = false;
}
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -568,7 +568,7 @@
getRefsOperation(tx).addAck(ref);
}
- final RefsOperation getRefsOperation(final Transaction tx)
+ private final RefsOperation getRefsOperation(final Transaction tx)
{
synchronized (tx)
{
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -1803,17 +1803,13 @@
throw e;
}
- RoutingContext context;
-
if (tx == null || autoCommitSends)
{
- context = new RoutingContextImpl(null);
+ postOffice.route(msg);
}
else
{
- context = new RoutingContextImpl(tx);
- }
-
- postOffice.route(msg, context);
+ postOffice.route(msg, new RoutingContextImpl(tx));
+ }
}
}
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -117,36 +117,4 @@
assertEquals(0, server.getRemotingService().getConnections().size());
}
-
- public static void checkWeakReferences(WeakReference<?>... references)
- {
-
- int i = 0;
- boolean hasValue = false;
-
- do
- {
- hasValue = false;
-
- if (i > 0)
- {
- forceGC();
- }
-
- for (WeakReference<?> ref : references)
- {
- if (ref.get() != null)
- {
- hasValue = true;
- }
- }
- }
- while (i++ <= 30 && hasValue);
-
- for (WeakReference<?> ref : references)
- {
- assertNull(ref.get());
- }
- }
-
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-08 09:00:29 UTC (rev 8065)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-08 09:15:11 UTC (rev 8066)
@@ -164,6 +164,15 @@
}
+ /* (non-Javadoc)
+ * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage)
+ */
+ public void route(ServerMessage message) throws Exception
+ {
+ // TODO Auto-generated method stub
+
+ }
+
}
\ No newline at end of file
16 years, 2 months
JBoss hornetq SVN: r8065 - trunk/tests/src/org/hornetq/tests/integration/management.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-08 05:00:29 -0400 (Thu, 08 Oct 2009)
New Revision: 8065
Modified:
trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
Log:
fixed timing issue in test
Modified: trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-10-07 21:01:20 UTC (rev 8064)
+++ trunk/tests/src/org/hornetq/tests/integration/management/ClusterConnectionControl2Test.java 2009-10-08 09:00:29 UTC (rev 8065)
@@ -84,11 +84,24 @@
assertEquals(0, nodes.size());
server_1.start();
- Thread.sleep(3000);
-
- nodes = clusterConnectionControl_0.getNodes();
- System.out.println(nodes);
+ long start = System.currentTimeMillis();
+
+ while (true)
+ {
+ nodes = clusterConnectionControl_0.getNodes();
+
+ if (nodes.size() != 1 && System.currentTimeMillis() - start < 30000)
+ {
+ Thread.sleep(100);
+ }
+ else
+ {
+ break;
+ }
+ }
+
assertEquals(1, nodes.size());
+
String remoteAddress = nodes.values().iterator().next();
assertTrue(remoteAddress.endsWith(":" + port_1));
}
16 years, 2 months
JBoss hornetq SVN: r8064 - in trunk: src/main/org/hornetq/core/deployers/impl and 24 other directories.
by do-not-reply@jboss.org
Author: timfox
Date: 2009-10-07 17:01:20 -0400 (Wed, 07 Oct 2009)
New Revision: 8064
Added:
trunk/src/main/org/hornetq/core/server/RoutingContext.java
trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
Modified:
trunk/.classpath
trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/Bindings.java
trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/Bindable.java
trunk/src/main/org/hornetq/core/server/MessageReference.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
routing refactoring, plus fixed some tests
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/.classpath 2009-10-07 21:01:20 UTC (rev 8064)
@@ -7,7 +7,7 @@
<classpathentry kind="src" path="tests/config"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
<attributes>
- <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="hornet/native/bin"/>
+ <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk/native/bin"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="tests/jms-tests/src"/>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -229,7 +229,7 @@
try
{
Deployer deployer = entry.getValue().deployer;
- log.info("Undeploying " + deployer + " with url " + pair.a);
+ log.debug("Undeploying " + deployer + " with url " + pair.a);
deployer.undeploy(pair.a);
toRemove.add(pair);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -1441,9 +1441,8 @@
try
{
+ log.debug("Starting compacting operation on journal");
- log.info("Starting compacting operation on journal");
-
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
compactingLock.writeLock().lock();
@@ -1582,7 +1581,7 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- log.info("Finished compacting on journal");
+ log.debug("Finished compacting on journal");
}
finally
@@ -2167,7 +2166,7 @@
try
{
- log.info("Cleaning up file " + file);
+ log.debug("Cleaning up file " + file);
if (file.getPosCount() == 0)
{
@@ -2221,7 +2220,7 @@
finally
{
compactingLock.readLock().unlock();
- log.info("Clean up on file " + file + " done");
+ log.debug("Clean up on file " + file + " done");
}
}
@@ -2767,7 +2766,7 @@
{
if (state != STATE_LOADED)
{
- throw new IllegalStateException("The journal was stopped");
+ throw new IllegalStateException("The journal is not loaded " + state);
}
int size = bb.capacity();
@@ -2845,11 +2844,14 @@
currentFile.getFile().write(bb, sync);
}
- return currentFile;
+ return currentFile;
}
finally
{
- currentFile.getFile().enableAutoFlush();
+ if (currentFile != null)
+ {
+ currentFile.getFile().enableAutoFlush();
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -67,6 +67,7 @@
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -728,7 +729,7 @@
notificationMessage.putTypedProperties(notifProps);
- postOffice.route(notificationMessage, null);
+ postOffice.route(notificationMessage, new RoutingContextImpl(null));
}
}
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -38,7 +38,9 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -593,6 +595,7 @@
if (onDepage(page.getPageId(), storeName, messages))
{
page.delete();
+
return true;
}
else
@@ -755,16 +758,14 @@
// back to where it was
Transaction depageTransaction = new TransactionImpl(storageManager);
-
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
for (PagedMessage pagedMessage : pagedMessages)
{
- ServerMessage message = null;
-
- message = pagedMessage.getMessage(storageManager);
+ ServerMessage message = pagedMessage.getMessage(storageManager);
if (message.isLargeMessage())
{
@@ -787,7 +788,6 @@
log.warn("Transaction " + pagedMessage.getTransactionID() +
" used during paging not found, ignoring message " +
message);
-
continue;
}
@@ -815,7 +815,7 @@
if (isTrace)
{
trace("Rollback was called after prepare, ignoring message " + message);
- }
+ }
continue;
}
@@ -827,7 +827,7 @@
}
}
- postOffice.route(message, depageTransaction);
+ postOffice.route(message, new RoutingContextImpl(depageTransaction));
}
if (!running)
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -100,7 +101,8 @@
void deleteHeuristicCompletion(long id) throws Exception;
- void loadMessageJournal(PagingManager pagingManager,
+ void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,6 +51,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.JournalType;
@@ -143,11 +144,11 @@
private final String journalDir;
private final String largeMessagesDirectory;
-
+
public JournalStorageManager(final Configuration config, final Executor executor)
{
this.executor = executor;
-
+
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
{
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
@@ -250,7 +251,7 @@
this.persistentID = id;
}
-
+
public long generateUniqueID()
{
long id = idGenerator.generateID();
@@ -518,7 +519,8 @@
}
- public void loadMessageJournal(final PagingManager pagingManager,
+ public void loadMessageJournal(final PostOffice postOffice,
+ final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -730,9 +732,9 @@
{
record.message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledDeliveryTime);
}
+
+ MessageReference ref = postOffice.reroute(record.message, queue, null);
- MessageReference ref = queue.reroute(record.message, null);
-
ref.setDeliveryCount(record.deliveryCount);
if (scheduledDeliveryTime != 0)
@@ -742,13 +744,13 @@
}
}
- loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
for (LargeServerMessage msg : largeMessages)
{
if (msg.getRefCount() == 0)
{
- log.info("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
+ log.debug("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
msg.decrementRefCount();
}
}
@@ -796,7 +798,8 @@
return largeMessage;
}
- private void loadPreparedTransactions(final PagingManager pagingManager,
+ private void loadPreparedTransactions(final PostOffice postOffice,
+ final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
final List<PreparedTransactionInfo> preparedTransactions,
@@ -867,7 +870,7 @@
throw new IllegalStateException("Cannot find message with id " + messageID);
}
- queue.reroute(message, tx);
+ postOffice.reroute(message, queue, tx);
break;
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -25,6 +25,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -232,7 +233,8 @@
{
}
- public void loadMessageJournal(PagingManager pagingManager,
+ public void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
Modified: trunk/src/main/org/hornetq/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -16,8 +16,8 @@
import java.util.Collection;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
/**
* A Bindings
@@ -32,13 +32,13 @@
{
Collection<Binding> getBindings();
- boolean route(ServerMessage message, Transaction tx) throws Exception;
-
void addBinding(Binding binding);
void removeBinding(Binding binding);
void setRouteWhenNoConsumers(boolean takePriorityIntoAccount);
- boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception;
+ void redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception;
+
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -15,7 +15,9 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -50,11 +52,11 @@
Bindings getMatchingBindings(SimpleString address);
- void route(ServerMessage message) throws Exception;
+ void route(ServerMessage message, RoutingContext context) throws Exception;
- void route(ServerMessage message, Transaction tx) throws Exception;
+ MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
- boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
+ boolean redistribute(ServerMessage message, final Queue originatingQueue, RoutingContext context) throws Exception;
PagingManager getPagingManager();
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,8 +30,8 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
@@ -56,7 +56,7 @@
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
private volatile boolean routeWhenNoConsumers;
-
+
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
{
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -122,51 +122,14 @@
bindingsMap.remove(binding.getID());
}
-
- private boolean routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
+
+ public void redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
{
- byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
-
- ByteBuffer buff = ByteBuffer.wrap(ids);
-
- Set<Bindable> chosen = new HashSet<Bindable>();
-
- while (buff.hasRemaining())
- {
- long bindingID = buff.getLong();
-
- Binding binding = bindingsMap.get(bindingID);
-
- if (binding == null)
- {
- return false;
- }
-
- binding.willRoute(message);
-
- chosen.add(binding.getBindable());
- }
-
- for (Bindable bindable : chosen)
- {
- bindable.preroute(message, tx);
- }
-
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
- }
-
- return true;
- }
-
- public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
- {
if (routeWhenNoConsumers)
{
- return false;
+ return;
}
-
+
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -175,7 +138,7 @@
{
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
- return false;
+ return;
}
Integer ipos = routingNamePositions.get(routingName);
@@ -238,30 +201,22 @@
{
theBinding.willRoute(message);
- theBinding.getBindable().preroute(message, tx);
-
- theBinding.getBindable().route(message, tx);
-
- return true;
+ theBinding.getBindable().route(message, context);
}
- else
- {
- return false;
- }
}
- public boolean route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
boolean routed = false;
-
+
if (!exclusiveBindings.isEmpty())
{
for (Binding binding : exclusiveBindings)
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
- binding.getBindable().route(message, tx);
-
+ binding.getBindable().route(message, context);
+
routed = true;
}
}
@@ -271,7 +226,7 @@
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
- routed = routeFromCluster(message, tx);
+ routeFromCluster(message, context);
}
else
{
@@ -393,24 +348,34 @@
routingNamePositions.put(routingName, pos);
}
-
- // TODO refactor to do this is one iteration
-
+
for (Bindable bindable : chosen)
{
- bindable.preroute(message, tx);
+ bindable.route(message, context);
}
+ }
+ }
+ }
+
+ private void routeFromCluster(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
-
- routed = true;
- }
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ while (buff.hasRemaining())
+ {
+ long bindingID = buff.getLong();
+
+ Binding binding = bindingsMap.get(bindingID);
+
+ if (binding != null)
+ {
+ binding.willRoute(message);
+
+ binding.getBindable().route(message, context);
}
}
-
- return routed;
}
private final int incrementPos(int pos, int length)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -36,6 +36,7 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.AddressManager;
@@ -45,9 +46,12 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -423,7 +427,7 @@
public synchronized void addBinding(final Binding binding) throws Exception
{
addressManager.addBinding(binding);
-
+
TypedProperties props = new TypedProperties();
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -515,8 +519,8 @@
return addressManager.getMatchingBindings(address);
}
- public void route(final ServerMessage message, Transaction tx) throws Exception
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
SimpleString address = message.getDestination();
byte[] duplicateIDBytes = null;
@@ -540,15 +544,15 @@
if (cache.contains(duplicateIDBytes))
{
- if (tx == null)
+ if (context.getTransaction() == null)
{
- log.trace("Duplicate message detected - message will not be routed");
+ log.trace("Duplicate message detected - message will not be routed");
}
else
{
- log.trace("Duplicate message detected - transaction will be rejected");
+ log.trace("Duplicate message detected - transaction will be rejected");
- tx.markAsRollbackOnly(null);
+ context.getTransaction().markAsRollbackOnly(null);
}
return;
@@ -559,23 +563,26 @@
if (cache != null)
{
- if (tx == null)
+ if (context.getTransaction() == null)
{
// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
- tx = new TransactionImpl(storageManager);
+ Transaction tx = new TransactionImpl(storageManager);
+
+ context.setTransaction(tx);
startedTx = true;
}
- cache.addToCache(duplicateIDBytes, tx);
+ cache.addToCache(duplicateIDBytes, context.getTransaction());
}
- if (tx == null)
+ if (context.getTransaction() == null)
{
if (pagingManager.page(message, true))
{
message.setStored();
+
return;
}
}
@@ -583,80 +590,142 @@
{
SimpleString destination = message.getDestination();
- boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
+ boolean depage = context.getTransaction().getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
if (!depage && pagingManager.isPaging(destination))
{
- getPageOperation(tx).addMessageToPage(message);
+ getPageOperation(context.getTransaction()).addMessageToPage(message);
return;
}
}
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
- boolean routed;
if (bindings != null)
{
- routed = bindings.route(message, tx);
+ context.incrementDepth();
+
+ bindings.route(message, context);
+
+ context.decrementDepth();
}
- else
- {
- routed = false;
- }
- if (!routed)
+ //The depth allows for recursion e.g. with diverts - we only want to process the route after any recursed routes
+ //have been processed
+
+ if (context.getDepth() == 0)
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
-
- boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
-
- if (sendToDLA)
+ if (context.getQueues().isEmpty())
{
- //Send to the DLA for the address
-
- SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
-
- if (dlaAddress == null)
+ // Send to DLA if appropriate
+
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+
+ boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+
+ if (sendToDLA)
{
- log.warn("Did not route to any bindings for address " + address + " and sendToDLAOnNoRoute is true " +
- "but there is no DLA configured for the address, the message will be ignored.");
+ // Send to the DLA for the address
+
+ SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (dlaAddress == null)
+ {
+ log.warn("Did not route to any bindings for address " + address +
+ " and sendToDLAOnNoRoute is true " +
+ "but there is no DLA configured for the address, the message will be ignored.");
+ }
+ else
+ {
+ message.setOriginalHeaders(message, false);
+
+ message.setDestination(dlaAddress);
+
+ route(message, context);
+ }
}
- else
- {
- message.setOriginalHeaders(message, false);
-
- message.setDestination(dlaAddress);
-
- route(message, tx);
- }
}
+ else
+ {
+ processRoute(message, context);
+ }
+
+ if (startedTx)
+ {
+ context.getTransaction().commit();
+ }
}
+ }
+
+ public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+ {
+ MessageReference reference = message.createReference(queue);
- if (startedTx)
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (scheduledDeliveryTime != null)
{
- tx.commit();
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+
+ message.incrementDurableRefCount();
+
+ message.setStored();
+
+ int refCount = message.incrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (refCount == 1)
+ {
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ store.addSize(reference.getMemoryEstimate());
+
+ if (tx == null)
+ {
+ queue.addLast(reference);
+ }
+ else
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>(1);
+
+ refs.add(reference);
+
+ tx.addOperation(new AddOperation(refs));
+ }
+
+ return reference;
}
-
+
public void route(final ServerMessage message) throws Exception
{
- route(message, null);
+ route(message, new RoutingContextImpl(null));
}
- public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
- {
+ public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
+ {
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
+ boolean res = false;
+
if (bindings != null)
{
- return bindings.redistribute(message, originatingQueue, tx);
+ bindings.redistribute(message, originatingQueue, context);
+
+ if (!context.getQueues().isEmpty())
+ {
+ processRoute(message, context);
+
+ res = true;
+ }
}
- else
- {
- return false;
- }
+
+ log.info("redistribute called res is " + res);
+
+ return res;
}
public PagingManager getPagingManager()
@@ -711,8 +780,9 @@
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
- queue.preroute(message, null);
- queue.route(message, null);
+ // queue.preroute(message, null);
+ // queue.route(message, null);
+ routeDirect(message, queue, false);
for (QueueInfo info : queueInfos.values())
{
@@ -727,7 +797,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
@@ -740,7 +810,7 @@
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
}
if (info.getFilterStrings() != null)
@@ -755,7 +825,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
}
}
}
@@ -766,9 +836,107 @@
// Private -----------------------------------------------------------------
+ private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
+ {
+ if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message))
+ {
+ RoutingContext context = new RoutingContextImpl(null);
+
+ queue.route(message, context);
+
+ processRoute(message, context);
+ }
+ }
+
+ private void processRoute(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ Transaction tx = context.getTransaction();
+
+ for (Queue queue : context.getQueues())
+ {
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (scheduledDeliveryTime != null)
+ {
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ if (message.isDurable() && queue.isDurable())
+ {
+ int durableRefCount = message.incrementDurableRefCount();
+
+ if (durableRefCount == 1)
+ {
+ if (tx != null)
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ }
+ else
+ {
+ storageManager.storeMessage(message);
+ }
+
+ message.setStored();
+ }
+
+ if (tx != null)
+ {
+ storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ }
+ else
+ {
+ storageManager.storeReference(queue.getID(), message.getMessageID());
+ }
+
+ if (scheduledDeliveryTime != null)
+ {
+ if (tx != null)
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
+ }
+ }
+
+ int refCount = message.incrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (refCount == 1)
+ {
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ store.addSize(reference.getMemoryEstimate());
+ }
+
+ if (tx != null)
+ {
+ tx.addOperation(new AddOperation(refs));
+ }
+ else
+ {
+ for (MessageReference ref : refs)
+ {
+
+ ref.getQueue().addLast(ref);
+ }
+ }
+ }
+
private synchronized void startExpiryScanner()
{
-
if (reaperPeriod > 0)
{
reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
@@ -779,15 +947,6 @@
}
}
- private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
- {
- if (queue.getFilter() == null || queue.getFilter().match(message))
- {
- queue.preroute(message, null);
- queue.route(message, null);
- }
- }
-
private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
{
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
@@ -1008,7 +1167,7 @@
// This could happen when the PageStore left the pageState
// TODO is this correct - don't we lose transactionality here???
- route(message, null);
+ route(message, new RoutingContextImpl(null));
}
first = false;
}
@@ -1026,4 +1185,64 @@
}
}
}
+
+ private class AddOperation implements TransactionOperation
+ {
+ private final List<MessageReference> refs;
+
+ AddOperation(final List<MessageReference> refs)
+ {
+ this.refs = refs;
+ }
+
+ public void afterCommit(Transaction tx) throws Exception
+ {
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ }
+
+ public void afterPrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ // Reverse the ref counts, and paging sizes
+
+ for (MessageReference ref : refs)
+ {
+ ServerMessage message = ref.getMessage();
+
+ if (message.isDurable() && ref.getQueue().isDurable())
+ {
+ message.decrementDurableRefCount();
+ }
+
+ int count = message.decrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (count == 0)
+ {
+ store.addSize(-message.getMemoryEstimate());
+ }
+
+ store.addSize(-ref.getMemoryEstimate());
+ }
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Bindable.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Bindable.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,7 +13,6 @@
package org.hornetq.core.server;
-import org.hornetq.core.transaction.Transaction;
/**
* A Bindable
@@ -26,8 +25,6 @@
*/
public interface Bindable
{
- void preroute(ServerMessage message, Transaction tx) throws Exception;
-
- void route(ServerMessage message, Transaction tx) throws Exception;
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,4 +51,6 @@
void decrementDeliveryCount();
Queue getQueue();
+
+ void handled();
}
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -19,7 +19,6 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -34,8 +33,6 @@
*/
public interface Queue extends Bindable
{
- MessageReference reroute(ServerMessage message, Transaction tx) throws Exception;
-
SimpleString getName();
long getID();
Added: trunk/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/RoutingContext.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/RoutingContext.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContext
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface RoutingContext
+{
+ Transaction getTransaction();
+
+ void setTransaction(Transaction transaction);
+
+ void addQueue(Queue queue);
+
+ List<Queue> getQueues();
+
+ void incrementDepth();
+
+ void decrementDepth();
+
+ int getDepth();
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -44,6 +44,7 @@
int getMemoryEstimate();
+ //TODO - do we really need this? Can't we use durable ref count?
void setStored() throws Exception;
boolean isStored();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -392,7 +392,7 @@
synchronized (this)
{
- ref.getQueue().referenceHandled();
+ ref.handled();
ServerMessage message = ref.getMessage();
@@ -500,10 +500,6 @@
private void fail()
{
- log.info("bridge " + name + " has failed");
-
- //executor.execute(new FailRunnable());
-
//This will get called even after the bridge reconnects - in this case
//we want to cancel all unacked refs so they get resent
//duplicate detection will ensure no dups are routed on the other side
@@ -670,8 +666,6 @@
queue.deliverAsync(executor);
- log.info("Bridge " + name + " is now connected to destination ");
-
return true;
}
catch (Exception e)
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
@@ -122,7 +123,7 @@
final Transaction tx = new TransactionImpl(storageManager);
- boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
+ boolean routed = postOffice.redistribute(reference.getMessage(), queue, new RoutingContextImpl(tx));
if (routed)
{
@@ -138,7 +139,7 @@
private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
{
- queue.referenceHandled();
+ reference.handled();
queue.acknowledge(tx, reference);
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -17,14 +17,11 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.Transformer;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
@@ -54,19 +51,13 @@
private final Transformer transformer;
- private final PagingManager pagingManager;
-
- private final StorageManager storageManager;
-
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
final SimpleString routingName,
final boolean exclusive,
final Filter filter,
final Transformer transformer,
- final PostOffice postOffice,
- final PagingManager pagingManager,
- final StorageManager storageManager)
+ final PostOffice postOffice)
{
this.forwardAddress = forwardAddress;
@@ -81,40 +72,14 @@
this.transformer = transformer;
this.postOffice = postOffice;
-
- this.pagingManager = pagingManager;
-
- this.storageManager = storageManager;
}
- public void preroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
- //can occur if the message is routed to a queue, then acked before it's routed here
-
- //TODO - combine with similar code in QueueImpl.accept()
-
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- if (message.isDurable())
- {
- message.incrementDurableRefCount();
- }
- }
-
- public void route(ServerMessage message, final Transaction tx) throws Exception
+ public void route(ServerMessage message, final RoutingContext context) throws Exception
{
SimpleString originalDestination = message.getDestination();
message.setDestination(forwardAddress);
-
+
message.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
if (transformer != null)
@@ -122,30 +87,7 @@
message = transformer.transform(message);
}
- postOffice.route(message, tx);
-
- //Decrement the ref count here - and delete the message if necessary
-
- //TODO combine this with code in QueueImpl::postAcknowledge
-
- if (message.isDurable())
- {
- int count = message.decrementDurableRefCount();
-
- if (count == 0)
- {
- storageManager.deleteMessage(message.getMessageID());
- }
- }
-
- // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
- // the Address for the Queue
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (message.decrementRefCount() == 0)
- {
- store.addSize(-message.getMemoryEstimate());
- }
+ postOffice.route(message, context);
}
public SimpleString getRoutingName()
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -80,7 +80,6 @@
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
@@ -91,9 +90,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
-import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.core.version.Version;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -572,7 +569,7 @@
executorFactory.getExecutor(),
channel,
managementService,
- queueFactory,
+ // queueFactory,
this,
configuration.getManagementAddress());
@@ -763,35 +760,6 @@
return nodeID;
}
- public void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception
- {
- Binding binding = postOffice.getBinding(queueName);
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find queue " + queueName);
- }
-
- Queue queue = (Queue)binding.getBindable();
-
- MessageReference reference = queue.removeFirstReference(messageID);
-
- Transaction tx = new TransactionImpl(storageManager);
-
- boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
-
- if (routed)
- {
- queue.acknowledge(tx, reference);
-
- tx.commit();
- }
- else
- {
- throw new IllegalStateException("Must be routed");
- }
- }
-
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
@@ -924,8 +892,6 @@
{
// Complete the startup procedure
- log.info("Activating server");
-
configuration.setBackup(false);
initialisePart2();
@@ -977,9 +943,8 @@
private void initialiseLogging()
{
- LogDelegateFactory logDelegateFactory =
- (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
-
+ LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
+
Logger.setDelegateFactory(logDelegateFactory);
}
@@ -1188,7 +1153,7 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
- storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
+ storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
{
@@ -1267,8 +1232,13 @@
filter = new FilterImpl(filterString);
}
- final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
-
+ final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
+ address,
+ queueName,
+ filter,
+ durable,
+ temporary);
+
binding = new LocalQueueBinding(address, queue, nodeID);
if (durable)
@@ -1335,9 +1305,9 @@
config.isExclusive(),
filter,
transformer,
- postOffice,
- pagingManager,
- storageManager);
+ postOffice);
+ // pagingManager,
+ // storageManager);
Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,55 +12,48 @@
*/
package org.hornetq.core.server.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
* A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
* In other words it only retains the last value
+ *
* This is useful for example, for stock prices, where you're only interested in the latest value
* for a particular stock
+ *
* @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox@jboss.com">Tim Fox</a> rewrite
*/
public class LastValueQueue extends QueueImpl
{
private static final Logger log = Logger.getLogger(LastValueQueue.class);
- private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+ private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<SimpleString, HolderReference>();
- private final PagingManager pagingManager;
-
- private final StorageManager storageManager;
-
public LastValueQueue(final long persistenceID,
- final SimpleString address,
- final SimpleString name,
- final Filter filter,
- final boolean durable,
- final boolean temporary,
- final ScheduledExecutorService scheduledExecutor,
- final PostOffice postOffice,
- final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final boolean durable,
+ final boolean temporary,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
super(persistenceID,
address,
@@ -72,213 +65,158 @@
postOffice,
storageManager,
addressSettingsRepository);
- this.pagingManager = postOffice.getPagingManager();
- this.storageManager = storageManager;
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public synchronized void add(final MessageReference ref, final boolean first)
{
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+
if (prop != null)
{
- synchronized (map)
- {
- ServerMessage msg = map.put(prop, message);
- // if an older message existed then we discard it
- if (msg != null)
+ HolderReference hr = map.get(prop);
+
+ if (!first)
+ {
+ if (hr != null)
{
- MessageReference ref;
- if (tx != null)
+ // We need to overwrite the old ref with the new one and ack the old one
+
+ MessageReference oldRef = hr.getReference();
+
+ super.referenceHandled();
+
+ try
{
- discardMessage(msg.getMessageID(), tx);
+ super.acknowledge(oldRef);
}
- else
+ catch (Exception e)
{
- ref = removeReferenceWithID(msg.getMessageID());
- if (ref != null)
- {
- discardMessage(ref, tx);
- }
+ log.error("Failed to ack old reference", e);
}
+ hr.setReference(ref);
+
}
- }
- }
- super.route(message, tx);
- }
+ else
+ {
+ hr = new HolderReference(prop, ref);
- public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
- {
- synchronized (map)
+ map.put(prop, hr);
+
+ super.add(hr, first);
+ }
+ }
+ else
{
- ServerMessage msg = map.put(prop, message);
- if (msg != null)
+ // Add to front
+
+ if (hr != null)
{
- if (tx != null)
+ // We keep the current ref and ack the one we are returning
+
+ super.referenceHandled();
+
+ try
{
- rediscardMessage(msg.getMessageID(), tx);
+ super.acknowledge(ref);
}
- else
+ catch (Exception e)
{
- MessageReference ref = removeReferenceWithID(msg.getMessageID());
- rediscardMessage(ref);
+ log.error("Failed to ack old reference", e);
}
}
+ else
+ {
+ map.put(prop, (HolderReference)ref);
+
+ super.add(ref, first);
+ }
}
}
- return super.reroute(message, tx);
+ else
+ {
+ super.add(ref, first);
+ }
}
- public void acknowledge(final MessageReference ref) throws Exception
+ private class HolderReference implements MessageReference
{
- super.acknowledge(ref);
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
+ private final SimpleString prop;
+
+ private volatile MessageReference ref;
+
+ HolderReference(final SimpleString prop, final MessageReference ref)
{
- synchronized (map)
- {
- ServerMessage serverMessage = map.get(prop);
- if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
- {
- map.remove(prop);
- }
- }
+ this.prop = prop;
+
+ this.ref = ref;
}
- }
- public void cancel(final Transaction tx, final MessageReference ref) throws Exception
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
+ MessageReference getReference()
{
- synchronized (map)
- {
- ServerMessage msg = map.get(prop);
- if (msg.getMessageID() == ref.getMessage().getMessageID())
- {
- super.cancel(tx, ref);
- }
- else
- {
- discardMessage(ref, tx);
- }
- }
+ return ref;
}
- else
+
+ public void handled()
{
- super.cancel(tx, ref);
+ // We need to remove the entry from the map just before it gets delivered
+
+ map.remove(prop);
}
- }
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
- {
- List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
- List<SimpleString> refsToClear = new ArrayList<SimpleString>();
- synchronized (map)
+ void setReference(final MessageReference ref)
{
- for (MessageReference ref : refs)
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
- {
- ServerMessage msg = map.get(prop);
- if (msg != null)
- {
- if (msg.getMessageID() != ref.getMessage().getMessageID())
- {
- refsToDiscard.add(ref);
- }
- else
- {
- refsToClear.add(prop);
- }
- }
- }
- }
- for (SimpleString simpleString : refsToClear)
- {
- map.remove(simpleString);
- }
+ this.ref = ref;
}
- for (MessageReference ref : refsToDiscard)
+
+ public MessageReference copy(Queue queue)
{
- refs.remove(ref);
- discardMessage(ref, null);
+ return ref.copy(queue);
}
- super.postRollback(refs);
- }
- final void discardMessage(MessageReference ref, Transaction tx) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- QueueImpl queue = (QueueImpl)ref.getQueue();
- ServerMessage msg = ref.getMessage();
- boolean durableRef = msg.isDurable() && queue.isDurable();
+ public void decrementDeliveryCount()
+ {
+ ref.decrementDeliveryCount();
+ }
- if (durableRef)
+ public int getDeliveryCount()
{
- int count = msg.decrementDurableRefCount();
+ return ref.getDeliveryCount();
+ }
- if (count == 0)
- {
- if (tx == null)
- {
- storageManager.deleteMessage(msg.getMessageID());
- }
- else
- {
- storageManager.deleteMessageTransactional(tx.getID(), getID(), msg.getMessageID());
- }
- }
+ public int getMemoryEstimate()
+ {
+ return ref.getMemoryEstimate();
}
- }
- final void discardMessage(Long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+ public ServerMessage getMessage()
+ {
+ return ref.getMessage();
+ }
- while (iterator.hasNext())
+ public Queue getQueue()
{
- MessageReference ref = iterator.next();
+ return ref.getQueue();
+ }
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- discardMessage(ref, tx);
- break;
- }
+ public long getScheduledDeliveryTime()
+ {
+ return ref.getScheduledDeliveryTime();
}
- }
+ public void incrementDeliveryCount()
+ {
+ ref.incrementDeliveryCount();
+ }
- final void rediscardMessage(long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
+ public void setDeliveryCount(int deliveryCount)
{
- MessageReference ref = iterator.next();
+ ref.setDeliveryCount(deliveryCount);
+ }
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- rediscardMessage(ref);
- break;
- }
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
}
-
- final void rediscardMessage(MessageReference ref) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -119,6 +119,11 @@
{
return queue;
}
+
+ public void handled()
+ {
+ queue.referenceHandled();
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,6 +46,7 @@
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ScheduledDeliveryHandler;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
@@ -210,135 +210,12 @@
{
return false;
}
-
- public void preroute(final ServerMessage message, final Transaction tx) throws Exception
+
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
- {
- message.incrementDurableRefCount();
- }
+ context.addQueue(this);
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
- {
- boolean durableRef = message.isDurable() && durable;
-
- // If durable, must be persisted before anything is routed
- MessageReference ref = message.createReference(this);
-
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(ref.getMemoryEstimate());
-
- Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- if (scheduledDeliveryTime != null)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (tx == null)
- {
- if (durableRef)
- {
- if (!message.isStored())
- {
- storageManager.storeMessage(message);
-
- message.setStored();
- }
-
- storageManager.storeReference(ref.getQueue().getID(), message.getMessageID());
- }
-
- if (scheduledDeliveryTime != null && durableRef)
- {
- storageManager.updateScheduledDeliveryTime(ref);
- }
-
- addLast(ref);
- }
- else
- {
- if (durableRef)
- {
- if (!message.isStored())
- {
- storageManager.storeMessageTransactional(tx.getID(), message);
-
- message.setStored();
- }
-
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- storageManager.storeReferenceTransactional(tx.getID(),
- ref.getQueue().getID(),
- message.getMessageID());
- }
-
- if (scheduledDeliveryTime != null && durableRef)
- {
- storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
- }
-
- getRefsOperation(tx).addRef(ref);
- }
- }
-
- public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- MessageReference ref = message.createReference(this);
-
- int count = message.incrementRefCount();
-
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (count == 1)
- {
- store.addSize(message.getMemoryEstimate());
- }
-
- store.addSize(ref.getMemoryEstimate());
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
- {
- message.incrementDurableRefCount();
- }
-
- Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- if (scheduledDeliveryTime != null)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (tx == null)
- {
- addLast(ref);
- }
- else
- {
- getRefsOperation(tx).addRef(ref);
- }
-
- message.setStored();
-
- return ref;
- }
-
// Queue implementation ----------------------------------------------------------------------------------------
public void lockDelivery()
@@ -384,7 +261,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
@@ -1071,7 +948,7 @@
copyMessage.setDestination(toAddress);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, new RoutingContextImpl(tx));
acknowledge(tx, ref);
}
@@ -1158,7 +1035,7 @@
copyMessage.setDestination(address);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, new RoutingContextImpl(tx));
acknowledge(tx, ref);
@@ -1253,7 +1130,7 @@
iterator.remove();
}
- referenceHandled();
+ reference.handled();
try
{
@@ -1316,7 +1193,7 @@
}
}
- private synchronized void add(final MessageReference ref, final boolean first)
+ protected synchronized void add(final MessageReference ref, final boolean first)
{
if (!first)
{
@@ -1558,14 +1435,11 @@
{
synchronized (this)
{
+ direct = false;
+
for (MessageReference ref : refs)
{
- ServerMessage msg = ref.getMessage();
-
- if (!scheduledDeliveryHandler.checkAndSchedule(ref))
- {
- messageReferences.addFirst(ref, msg.getPriority());
- }
+ add(ref, true);
}
deliver();
@@ -1633,15 +1507,8 @@
final class RefsOperation implements TransactionOperation
{
- List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
- synchronized void addRef(final MessageReference ref)
- {
- refsToAdd.add(ref);
- }
-
synchronized void addAck(final MessageReference ref)
{
refsToAck.add(ref);
@@ -1689,28 +1556,8 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public synchronized Collection<Queue> getDistinctQueues()
- {
- HashSet<Queue> queues = new HashSet<Queue>();
-
- for (MessageReference ref : refsToAck)
- {
- queues.add(ref.getQueue());
- }
-
- return queues;
- }
-
public void afterCommit(final Transaction tx) throws Exception
{
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
-
for (MessageReference ref : refsToAck)
{
synchronized (ref.getQueue())
@@ -1726,25 +1573,6 @@
public void beforeRollback(final Transaction tx) throws Exception
{
- Set<ServerMessage> msgs = new HashSet<ServerMessage>();
-
- for (MessageReference ref : refsToAdd)
- {
- ServerMessage msg = ref.getMessage();
-
- // Optimise this
- PagingStore store = pagingManager.getPageStore(msg.getDestination());
-
- store.addSize(-ref.getMemoryEstimate());
-
- if (!msgs.contains(msg))
- {
- store.addSize(-msg.getMemoryEstimate());
- msg.decrementRefCount();
- }
-
- msgs.add(msg);
- }
}
}
Added: trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContextImpl
+ *
+ * @author tim
+ *
+ *
+ */
+public class RoutingContextImpl implements RoutingContext
+{
+ private List<Queue> queues = new ArrayList<Queue>();
+
+ private Transaction transaction;
+
+ private int depth;
+
+ public RoutingContextImpl(final Transaction transaction)
+ {
+ this.transaction = transaction;
+ }
+
+ public void addQueue(final Queue queue)
+ {
+ queues.add(queue);
+ }
+
+ public Transaction getTransaction()
+ {
+ return transaction;
+ }
+
+ public void setTransaction(final Transaction tx)
+ {
+ this.transaction = tx;
+ }
+
+ public List<Queue> getQueues()
+ {
+ return queues;
+ }
+
+ public void decrementDepth()
+ {
+ depth--;
+ }
+
+ public int getDepth()
+ {
+ return depth;
+ }
+
+ public void incrementDepth()
+ {
+ depth++;
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -499,7 +499,7 @@
deliveringRefs.add(ref);
}
- ref.getQueue().referenceHandled();
+ ref.handled();
ref.incrementDeliveryCount();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -85,7 +85,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
@@ -159,10 +159,6 @@
private final SimpleString managementAddress;
- private final QueueFactory queueFactory;
-
- private final SimpleString nodeID;
-
// The current currentLargeMessage being processed
// In case of replication, currentLargeMessage should only be accessed within the replication callbacks
private volatile LargeServerMessage currentLargeMessage;
@@ -187,8 +183,7 @@
final SecurityStore securityStore,
final Executor executor,
final Channel channel,
- final ManagementService managementService,
- final QueueFactory queueFactory,
+ final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
{
@@ -235,10 +230,6 @@
this.managementAddress = managementAddress;
- this.queueFactory = queueFactory;
-
- this.nodeID = server.getNodeID();
-
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -1671,11 +1662,6 @@
// Public
// ----------------------------------------------------------------------------
- public Transaction getTransaction()
- {
- return tx;
- }
-
// Private
// ----------------------------------------------------------------------------
@@ -1816,14 +1802,18 @@
}
throw e;
}
-
+
+ RoutingContext context;
+
if (tx == null || autoCommitSends)
{
- postOffice.route(msg);
+ context = new RoutingContextImpl(null);
}
else
{
- postOffice.route(msg, tx);
+ context = new RoutingContextImpl(tx);
}
+
+ postOffice.route(msg, context);
}
}
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,12 +13,9 @@
package org.hornetq.core.transaction;
-import java.util.Set;
-
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.server.Queue;
/**
* A HornetQ internal transaction
@@ -62,8 +59,6 @@
Object getProperty(int index);
- Set<Queue> getDistinctQueues();
-
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,10 +13,7 @@
package org.hornetq.core.transaction;
-import java.util.Collection;
-import org.hornetq.core.server.Queue;
-
/**
*
* A TransactionOperation
@@ -26,10 +23,6 @@
*/
public interface TransactionOperation
{
-
- /** rollback will need a distinct list of Queues in order to lock those queues before calling rollback */
- Collection<Queue> getDistinctQueues();
-
void beforePrepare(Transaction tx) throws Exception;
void beforeCommit(Transaction tx) throws Exception;
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -97,29 +97,6 @@
// Transaction implementation
// -----------------------------------------------------------
- public Set<Queue> getDistinctQueues()
- {
- HashSet<Queue> queues = new HashSet<Queue>();
-
- if (operations != null)
- {
- for (TransactionOperation op : operations)
- {
- Collection<Queue> q = op.getDistinctQueues();
- if (q == null)
- {
- log.warn("Operation " + op + " returned null getDistinctQueues");
- }
- else
- {
- queues.addAll(q);
- }
- }
- }
-
- return queues;
- }
-
public long getID()
{
return id;
@@ -184,7 +161,7 @@
}
public void commit(boolean onePhase) throws Exception
- {
+ {
synchronized (timeoutLock)
{
if (state == State.ROLLBACK_ONLY)
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -59,8 +59,7 @@
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName();
JMSServerControlImpl control = new JMSServerControlImpl(server);
- managementService.registerInJMX(objectName,
- control);
+ managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_SERVER, control);
return control;
}
@@ -72,8 +71,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
}
- public synchronized void registerQueue(final HornetQQueue queue,
- final String jndiBinding) throws Exception
+ public synchronized void registerQueue(final HornetQQueue queue, final String jndiBinding) throws Exception
{
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -85,10 +83,7 @@
messageCounterManager.getMaxDayCount());
messageCounterManager.registerMessageCounter(queue.getName(), counter);
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
- JMSQueueControlImpl control = new JMSQueueControlImpl(queue,
- coreQueueControl,
- jndiBinding,
- counter);
+ JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jndiBinding, counter);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
}
@@ -100,8 +95,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
}
- public synchronized void registerTopic(final HornetQTopic topic,
- final String jndiBinding) throws Exception
+ public synchronized void registerTopic(final HornetQTopic topic, final String jndiBinding) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
@@ -118,8 +112,8 @@
}
public synchronized void registerConnectionFactory(final String name,
- final HornetQConnectionFactory connectionFactory,
- final List<String> bindings) throws Exception
+ final HornetQConnectionFactory connectionFactory,
+ final List<String> bindings) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -149,6 +149,8 @@
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message2);
+
+ log.info("got message " + message2.getProperty(new SimpleString("id")));
assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -92,7 +92,7 @@
ClientConsumer consumer2 = session.createConsumer(queueName2);
- final int numMessages = 10;
+ final int numMessages = 1;
final SimpleString propKey = new SimpleString("testkey");
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,6 +12,8 @@
*/
package org.hornetq.tests.integration.jms.connection;
+import java.lang.ref.WeakReference;
+
import javax.jms.Connection;
import javax.jms.Session;
@@ -54,16 +56,14 @@
public void testCloseOneConnectionOnGC() throws Exception
{
Connection conn = cf.createConnection();
+
+ WeakReference<Connection> wr = new WeakReference<Connection>(conn);
assertEquals(1, server.getRemotingService().getConnections().size());
conn = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr);
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -74,17 +74,17 @@
Connection conn2 = cf.createConnection();
Connection conn3 = cf.createConnection();
+ WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+ WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+ WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+
assertEquals(1, server.getRemotingService().getConnections().size());
conn1 = null;
conn2 = null;
conn3 = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr1, wr2, wr3);
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -93,8 +93,12 @@
{
Connection conn1 = cf.createConnection();
Connection conn2 = cf.createConnection();
- Connection conn3 = cf.createConnection();
+ Connection conn3 = cf.createConnection();
+ WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+ WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+ WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess3 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -109,13 +113,40 @@
conn2 = null;
conn3 = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr1, wr2, wr3);
assertEquals(0, server.getRemotingService().getConnections().size());
}
+ public static void checkWeakReferences(WeakReference<?>... references)
+ {
+
+ int i = 0;
+ boolean hasValue = false;
+
+ do
+ {
+ hasValue = false;
+
+ if (i > 0)
+ {
+ forceGC();
+ }
+
+ for (WeakReference<?> ref : references)
+ {
+ if (ref.get() != null)
+ {
+ hasValue = true;
+ }
+ }
+ }
+ while (i++ <= 30 && hasValue);
+
+ for (WeakReference<?> ref : references)
+ {
+ assertNull(ref.get());
+ }
+ }
+
}
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -47,7 +47,7 @@
public void testLargeMessageCopy() throws Exception
{
clearData();
-
+
Configuration configuration = createDefaultConfig();
configuration.start();
@@ -63,13 +63,15 @@
byte[] data = new byte[1024];
for (int i = 0; i < 110; i++)
+ {
msg.addBytes(data);
+ }
ServerMessage msg2 = msg.copy(2);
-
+
assertEquals(110 * 1024, msg.getBodySize());
assertEquals(110 * 1024, msg2.getBodySize());
-
+
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -24,8 +24,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -42,7 +44,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RestartSMTest.class);
-
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -62,6 +64,8 @@
configuration.setJournalType(JournalType.ASYNCIO);
+ PostOffice postOffice = new FakePostOffice();
+
final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
try
{
@@ -74,7 +78,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
journal.stop();
@@ -84,7 +88,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -33,6 +34,8 @@
*/
public class LVQTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(LVQTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -127,13 +130,14 @@
clientSession.start();
ClientMessage m = consumer.receive(1000);
assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m1");
producer.send(m2);
consumer.close();
consumer = clientSession.createConsumer(qName1);
m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
+ assertNotNull(m);
+ assertEquals("m2", m.getBody().readString());
+ m.acknowledge();
m = consumer.receive(1000);
assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,9 +23,11 @@
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
@@ -117,12 +119,15 @@
configuration.setJournalType(journalType);
- final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+ PostOffice postOffice = new FakePostOffice();
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
+ Executors.newCachedThreadPool());
journal.start();
HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
final byte[] bytes = new byte[900];
@@ -163,11 +168,10 @@
final SimpleString address = new SimpleString("Destination " + i);
ServerMessageImpl implMsg = new ServerMessageImpl(/* type */(byte)1, /* durable */
- true, /* expiration */
- 0,
- /* timestamp */0, /* priority */
- (byte)0,
- ChannelBuffers.wrappedBuffer(new byte[1024]));
+ true, /* expiration */
+ 0,
+ /* timestamp */0, /* priority */
+ (byte)0, ChannelBuffers.wrappedBuffer(new byte[1024]));
implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
Modified: trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -414,7 +414,7 @@
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
- bridge.setFailureRetryInterval(10);
+ bridge.setFailureRetryInterval(100);
bridge.setMaxRetries(1);
bridge.setMaxBatchSize(1);
bridge.setMaxBatchTime(-1);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -741,7 +741,7 @@
{
return 0;
}
-
+
public SimpleString[] getStoreNames()
{
return null;
@@ -940,7 +940,8 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
*/
- public void loadMessageJournal(PagingManager pagingManager,
+ public void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -1038,7 +1039,7 @@
{
return -1;
}
-
+
public void deleteHeuristicCompletion(long txID) throws Exception
{
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.tests.util.UnitTestCase;
@@ -107,11 +108,11 @@
{
if (route)
{
- bind.route(new FakeMessage(), new FakeTransaction());
+ bind.route(new FakeMessage(), new RoutingContextImpl(new FakeTransaction()));
}
else
{
- bind.redistribute(new FakeMessage(), queue, new FakeTransaction());
+ bind.redistribute(new FakeMessage(), queue, new RoutingContextImpl(new FakeTransaction()));
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -33,6 +33,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
@@ -47,7 +48,6 @@
*/
public class DuplicateDetectionUnitTest extends ServiceTestBase
{
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -77,6 +77,8 @@
Configuration configuration = createDefaultConfig();
+ PostOffice postOffice = new FakePostOffice();
+
configuration.start();
configuration.setJournalType(JournalType.ASYNCIO);
@@ -90,7 +92,8 @@
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -110,7 +113,8 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -137,7 +141,8 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -196,7 +201,7 @@
{
return 0;
}
-
+
public SimpleString[] getStoreNames()
{
return null;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,32 +23,27 @@
import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
public class FakeQueue implements Queue
{
-
private SimpleString name;
public FakeQueue(SimpleString name)
{
this.name = name;
}
-
- public void setExpiryAddress(SimpleString expiryAddress)
- {
- // TODO Auto-generated method stub
-
- }
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)
*/
public void acknowledge(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -56,32 +51,17 @@
*/
public void acknowledge(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#activate()
- */
- public boolean activate()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#activateNow(java.util.concurrent.Executor)
- */
- public void activateNow(Executor executor)
- {
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#addConsumer(org.hornetq.core.server.Consumer)
*/
public void addConsumer(Consumer consumer) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -89,7 +69,8 @@
*/
public void addFirst(MessageReference ref)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -97,7 +78,8 @@
*/
public void addLast(MessageReference ref)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -109,21 +91,22 @@
}
-
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+ * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
*/
- public void cancel(Transaction tx, MessageReference ref) throws Exception
+ public void cancel(MessageReference reference) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
+ * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
*/
- public void cancel(MessageReference reference) throws Exception
+ public void cancel(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -131,7 +114,8 @@
*/
public void cancelRedistributor() throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -139,7 +123,7 @@
*/
public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -148,25 +132,16 @@
*/
public boolean checkDLQ(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#consumerFailedOver()
- */
- public boolean consumerFailedOver()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#deleteAllReferences()
*/
public int deleteAllReferences() throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -175,7 +150,7 @@
*/
public int deleteMatchingReferences(Filter filter) throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -184,7 +159,7 @@
*/
public boolean deleteReference(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -193,7 +168,8 @@
*/
public void deliverAsync(Executor executor)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -201,7 +177,8 @@
*/
public void deliverNow()
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -209,7 +186,8 @@
*/
public void expire(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -217,25 +195,26 @@
*/
public boolean expireReference(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
+ * @see org.hornetq.core.server.Queue#expireReferences()
*/
- public int expireReferences(Filter filter) throws Exception
+ public void expireReferences() throws Exception
{
-
- return 0;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#expireReferences()
+ * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
*/
- public void expireReferences() throws Exception
+ public int expireReferences(Filter filter) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return 0;
}
/* (non-Javadoc)
@@ -243,7 +222,7 @@
*/
public int getConsumerCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -252,7 +231,7 @@
*/
public Set<Consumer> getConsumers()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -261,7 +240,7 @@
*/
public int getDeliveringCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -270,7 +249,7 @@
*/
public Distributor getDistributionPolicy()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -279,7 +258,7 @@
*/
public Filter getFilter()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -288,7 +267,7 @@
*/
public int getMessageCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -297,7 +276,7 @@
*/
public int getMessagesAdded()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -314,7 +293,7 @@
*/
public long getID()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -323,7 +302,7 @@
*/
public MessageReference getReference(long id)
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -332,7 +311,7 @@
*/
public int getScheduledCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -341,25 +320,25 @@
*/
public List<MessageReference> getScheduledMessages()
{
-
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isBackup()
+ * @see org.hornetq.core.server.Queue#isDurable()
*/
- public boolean isBackup()
+ public boolean isDurable()
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isDurable()
+ * @see org.hornetq.core.server.Queue#isPaused()
*/
- public boolean isDurable()
+ public boolean isPaused()
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -368,22 +347,35 @@
*/
public boolean isTemporary()
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#iterator()
+ */
+ public Iterator<MessageReference> iterator()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#list(org.hornetq.core.filter.Filter)
*/
public List<MessageReference> list(Filter filter)
{
-
+ // TODO Auto-generated method stub
return null;
}
-
- public Iterator<MessageReference> iterator()
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#lockDelivery()
+ */
+ public void lockDelivery()
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -391,7 +383,7 @@
*/
public boolean moveReference(long messageID, SimpleString toAddress) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -400,16 +392,26 @@
*/
public int moveReferences(Filter filter, SimpleString toAddress) throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#pause()
+ */
+ public void pause()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#reacknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
*/
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -417,7 +419,8 @@
*/
public void referenceHandled()
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -425,7 +428,7 @@
*/
public boolean removeConsumer(Consumer consumer) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -434,7 +437,7 @@
*/
public MessageReference removeFirstReference(long id) throws Exception
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -443,17 +446,17 @@
*/
public MessageReference removeReferenceWithID(long id) throws Exception
{
-
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#resume()
*/
- public MessageReference reroute(ServerMessage message, Transaction tx) throws Exception
+ public void resume()
{
-
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -461,82 +464,48 @@
*/
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#setBackup()
- */
- public void setBackup()
- {
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
*/
public void setDistributionPolicy(Distributor policy)
{
-
+ // TODO Auto-generated method stub
+
}
-
/* (non-Javadoc)
- * @see org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
*/
- public void preroute(ServerMessage message, Transaction tx) throws Exception
+ public void setExpiryAddress(SimpleString expiryAddress)
{
-
+ // TODO Auto-generated method stub
+
}
+ // TODO Auto-generated method stub
+
/* (non-Javadoc)
- * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#unlockDelivery()
*/
- public void route(ServerMessage message, Transaction tx) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#lock()
- */
- public void lockDelivery()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#unlock()
- */
public void unlockDelivery()
{
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isPaused()
- */
- public boolean isPaused()
- {
// TODO Auto-generated method stub
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#pause()
- */
- public void pause()
- {
- // TODO Auto-generated method stub
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#resume()
+ * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public void resume()
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
// TODO Auto-generated method stub
}
+
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,14 +14,14 @@
package org.hornetq.tests.unit.core.server.impl.fakes;
-import java.util.List;
-
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -29,125 +29,141 @@
public class FakePostOffice implements PostOffice
{
- public Object getNotificationLock()
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
{
- return null;
+ // TODO Auto-generated method stub
+ return false;
}
- public Bindings getMatchingBindings(SimpleString address)
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#activate()
+ * @see org.hornetq.core.server.HornetQComponent#stop()
*/
- public List<Queue> activate()
+ public void stop() throws Exception
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#addBinding(org.hornetq.core.postoffice.Binding)
*/
- public void addBinding(final Binding binding) throws Exception
+ public void addBinding(Binding binding) throws Exception
{
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getBinding(org.hornetq.utils.SimpleString)
*/
- public Binding getBinding(final SimpleString uniqueName)
+ public Binding getBinding(SimpleString uniqueName)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getBindingsForAddress(org.hornetq.utils.SimpleString)
*/
- public Bindings getBindingsForAddress(final SimpleString address) throws Exception
+ public Bindings getBindingsForAddress(SimpleString address) throws Exception
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getDuplicateIDCache(org.hornetq.utils.SimpleString)
*/
- public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
+ public DuplicateIDCache getDuplicateIDCache(SimpleString address)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
+ * @see org.hornetq.core.postoffice.PostOffice#getMatchingBindings(org.hornetq.utils.SimpleString)
*/
- public PagingManager getPagingManager()
+ public Bindings getMatchingBindings(SimpleString address)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.utils.SimpleString, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.postoffice.PostOffice#getNotificationLock()
*/
- public boolean redistribute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+ public Object getNotificationLock()
{
- return false;
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
+ * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
*/
- public Binding removeBinding(final SimpleString uniqueName) throws Exception
+ public PagingManager getPagingManager()
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage)
+ * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.server.RoutingContext)
*/
- public void route(final ServerMessage message) throws Exception
+ public boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
*/
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public Binding removeBinding(SimpleString uniqueName) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
+ * @see org.hornetq.core.postoffice.PostOffice#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.transaction.Transaction)
*/
- public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
+ public MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public boolean isStarted()
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
- return false;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
+ * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
*/
- public void start() throws Exception
+ public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
- }
}
\ No newline at end of file
16 years, 2 months