[hornetq-commits] JBoss hornetq SVN: r8064 - in trunk: src/main/org/hornetq/core/deployers/impl and 24 other directories.
do-not-reply at jboss.org
do-not-reply at jboss.org
Wed Oct 7 17:01:21 EDT 2009
Author: timfox
Date: 2009-10-07 17:01:20 -0400 (Wed, 07 Oct 2009)
New Revision: 8064
Added:
trunk/src/main/org/hornetq/core/server/RoutingContext.java
trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
Modified:
trunk/.classpath
trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
trunk/src/main/org/hornetq/core/persistence/StorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
trunk/src/main/org/hornetq/core/postoffice/Bindings.java
trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/hornetq/core/server/Bindable.java
trunk/src/main/org/hornetq/core/server/MessageReference.java
trunk/src/main/org/hornetq/core/server/Queue.java
trunk/src/main/org/hornetq/core/server/ServerMessage.java
trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
trunk/src/main/org/hornetq/core/transaction/Transaction.java
trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
Log:
routing refactoring, plus fixed some tests
Modified: trunk/.classpath
===================================================================
--- trunk/.classpath 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/.classpath 2009-10-07 21:01:20 UTC (rev 8064)
@@ -7,7 +7,7 @@
<classpathentry kind="src" path="tests/config"/>
<classpathentry excluding="**/.svn/**/*" kind="src" path="tests/src">
<attributes>
- <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="hornet/native/bin"/>
+ <attribute name="org.eclipse.jdt.launching.CLASSPATH_ATTR_LIBRARY_PATH_ENTRY" value="trunk/native/bin"/>
</attributes>
</classpathentry>
<classpathentry kind="src" path="tests/jms-tests/src"/>
Modified: trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/deployers/impl/FileDeploymentManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -229,7 +229,7 @@
try
{
Deployer deployer = entry.getValue().deployer;
- log.info("Undeploying " + deployer + " with url " + pair.a);
+ log.debug("Undeploying " + deployer + " with url " + pair.a);
deployer.undeploy(pair.a);
toRemove.add(pair);
}
Modified: trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/journal/impl/JournalImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -1441,9 +1441,8 @@
try
{
+ log.debug("Starting compacting operation on journal");
- log.info("Starting compacting operation on journal");
-
// We need to guarantee that the journal is frozen for this short time
// We don't freeze the journal as we compact, only for the short time where we replace records
compactingLock.writeLock().lock();
@@ -1582,7 +1581,7 @@
renameFiles(dataFilesToProcess, newDatafiles);
deleteControlFile(controlFile);
- log.info("Finished compacting on journal");
+ log.debug("Finished compacting on journal");
}
finally
@@ -2167,7 +2166,7 @@
try
{
- log.info("Cleaning up file " + file);
+ log.debug("Cleaning up file " + file);
if (file.getPosCount() == 0)
{
@@ -2221,7 +2220,7 @@
finally
{
compactingLock.readLock().unlock();
- log.info("Clean up on file " + file + " done");
+ log.debug("Clean up on file " + file + " done");
}
}
@@ -2767,7 +2766,7 @@
{
if (state != STATE_LOADED)
{
- throw new IllegalStateException("The journal was stopped");
+ throw new IllegalStateException("The journal is not loaded " + state);
}
int size = bb.capacity();
@@ -2845,11 +2844,14 @@
currentFile.getFile().write(bb, sync);
}
- return currentFile;
+ return currentFile;
}
finally
{
- currentFile.getFile().enableAutoFlush();
+ if (currentFile != null)
+ {
+ currentFile.getFile().enableAutoFlush();
+ }
}
}
Modified: trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/management/impl/ManagementServiceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -67,6 +67,7 @@
import org.hornetq.core.server.cluster.Bridge;
import org.hornetq.core.server.cluster.BroadcastGroup;
import org.hornetq.core.server.cluster.ClusterConnection;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -728,7 +729,7 @@
notificationMessage.putTypedProperties(notifProps);
- postOffice.route(notificationMessage, null);
+ postOffice.route(notificationMessage, new RoutingContextImpl(null));
}
}
}
Modified: trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/paging/impl/PagingStoreImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -38,7 +38,9 @@
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionPropertyIndexes;
@@ -593,6 +595,7 @@
if (onDepage(page.getPageId(), storeName, messages))
{
page.delete();
+
return true;
}
else
@@ -755,16 +758,14 @@
// back to where it was
Transaction depageTransaction = new TransactionImpl(storageManager);
-
+
depageTransaction.putProperty(TransactionPropertyIndexes.IS_DEPAGE, Boolean.valueOf(true));
HashSet<PageTransactionInfo> pageTransactionsToUpdate = new HashSet<PageTransactionInfo>();
for (PagedMessage pagedMessage : pagedMessages)
{
- ServerMessage message = null;
-
- message = pagedMessage.getMessage(storageManager);
+ ServerMessage message = pagedMessage.getMessage(storageManager);
if (message.isLargeMessage())
{
@@ -787,7 +788,6 @@
log.warn("Transaction " + pagedMessage.getTransactionID() +
" used during paging not found, ignoring message " +
message);
-
continue;
}
@@ -815,7 +815,7 @@
if (isTrace)
{
trace("Rollback was called after prepare, ignoring message " + message);
- }
+ }
continue;
}
@@ -827,7 +827,7 @@
}
}
- postOffice.route(message, depageTransaction);
+ postOffice.route(message, new RoutingContextImpl(depageTransaction));
}
if (!running)
Modified: trunk/src/main/org/hornetq/core/persistence/StorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/StorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.HornetQComponent;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
@@ -100,7 +101,8 @@
void deleteHeuristicCompletion(long id) throws Exception;
- void loadMessageJournal(PagingManager pagingManager,
+ void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception;
Modified: trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/journal/JournalStorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,6 +51,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.remoting.impl.wireformat.XidCodecSupport;
import org.hornetq.core.remoting.spi.HornetQBuffer;
import org.hornetq.core.server.JournalType;
@@ -143,11 +144,11 @@
private final String journalDir;
private final String largeMessagesDirectory;
-
+
public JournalStorageManager(final Configuration config, final Executor executor)
{
this.executor = executor;
-
+
if (config.getJournalType() != JournalType.NIO && config.getJournalType() != JournalType.ASYNCIO)
{
throw new IllegalArgumentException("Only NIO and AsyncIO are supported journals");
@@ -250,7 +251,7 @@
this.persistentID = id;
}
-
+
public long generateUniqueID()
{
long id = idGenerator.generateID();
@@ -518,7 +519,8 @@
}
- public void loadMessageJournal(final PagingManager pagingManager,
+ public void loadMessageJournal(final PostOffice postOffice,
+ final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
final Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -730,9 +732,9 @@
{
record.message.putLongProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME, scheduledDeliveryTime);
}
+
+ MessageReference ref = postOffice.reroute(record.message, queue, null);
- MessageReference ref = queue.reroute(record.message, null);
-
ref.setDeliveryCount(record.deliveryCount);
if (scheduledDeliveryTime != 0)
@@ -742,13 +744,13 @@
}
}
- loadPreparedTransactions(pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
+ loadPreparedTransactions(postOffice, pagingManager, resourceManager, queues, preparedTransactions, duplicateIDMap);
for (LargeServerMessage msg : largeMessages)
{
if (msg.getRefCount() == 0)
{
- log.info("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
+ log.debug("Large message: " + msg.getMessageID() + " didn't have any associated reference, file will be deleted");
msg.decrementRefCount();
}
}
@@ -796,7 +798,8 @@
return largeMessage;
}
- private void loadPreparedTransactions(final PagingManager pagingManager,
+ private void loadPreparedTransactions(final PostOffice postOffice,
+ final PagingManager pagingManager,
final ResourceManager resourceManager,
final Map<Long, Queue> queues,
final List<PreparedTransactionInfo> preparedTransactions,
@@ -867,7 +870,7 @@
throw new IllegalStateException("Cannot find message with id " + messageID);
}
- queue.reroute(message, tx);
+ postOffice.reroute(message, queue, tx);
break;
}
Modified: trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java
===================================================================
--- trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/persistence/impl/nullpm/NullStorageManager.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -25,6 +25,7 @@
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.Binding;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
@@ -232,7 +233,8 @@
{
}
- public void loadMessageJournal(PagingManager pagingManager,
+ public void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
Modified: trunk/src/main/org/hornetq/core/postoffice/Bindings.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/Bindings.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -16,8 +16,8 @@
import java.util.Collection;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
/**
* A Bindings
@@ -32,13 +32,13 @@
{
Collection<Binding> getBindings();
- boolean route(ServerMessage message, Transaction tx) throws Exception;
-
void addBinding(Binding binding);
void removeBinding(Binding binding);
void setRouteWhenNoConsumers(boolean takePriorityIntoAccount);
- boolean redistribute(ServerMessage message, Queue originatingQueue, Transaction tx) throws Exception;
+ void redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception;
+
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/postoffice/PostOffice.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/PostOffice.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -15,7 +15,9 @@
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.server.HornetQComponent;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -50,11 +52,11 @@
Bindings getMatchingBindings(SimpleString address);
- void route(ServerMessage message) throws Exception;
+ void route(ServerMessage message, RoutingContext context) throws Exception;
- void route(ServerMessage message, Transaction tx) throws Exception;
+ MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception;
- boolean redistribute(ServerMessage message, final Queue originatingQueue, Transaction tx) throws Exception;
+ boolean redistribute(ServerMessage message, final Queue originatingQueue, RoutingContext context) throws Exception;
PagingManager getPagingManager();
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/BindingsImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,8 +30,8 @@
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.server.Bindable;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
@@ -56,7 +56,7 @@
private final List<Binding> exclusiveBindings = new CopyOnWriteArrayList<Binding>();
private volatile boolean routeWhenNoConsumers;
-
+
public void setRouteWhenNoConsumers(final boolean routeWhenNoConsumers)
{
this.routeWhenNoConsumers = routeWhenNoConsumers;
@@ -122,51 +122,14 @@
bindingsMap.remove(binding.getID());
}
-
- private boolean routeFromCluster(final ServerMessage message, final Transaction tx) throws Exception
+
+ public void redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
{
- byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
-
- ByteBuffer buff = ByteBuffer.wrap(ids);
-
- Set<Bindable> chosen = new HashSet<Bindable>();
-
- while (buff.hasRemaining())
- {
- long bindingID = buff.getLong();
-
- Binding binding = bindingsMap.get(bindingID);
-
- if (binding == null)
- {
- return false;
- }
-
- binding.willRoute(message);
-
- chosen.add(binding.getBindable());
- }
-
- for (Bindable bindable : chosen)
- {
- bindable.preroute(message, tx);
- }
-
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
- }
-
- return true;
- }
-
- public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
- {
if (routeWhenNoConsumers)
{
- return false;
+ return;
}
-
+
SimpleString routingName = originatingQueue.getName();
List<Binding> bindings = routingNameBindingMap.get(routingName);
@@ -175,7 +138,7 @@
{
// The value can become null if it's concurrently removed while we're iterating - this is expected
// ConcurrentHashMap behaviour!
- return false;
+ return;
}
Integer ipos = routingNamePositions.get(routingName);
@@ -238,30 +201,22 @@
{
theBinding.willRoute(message);
- theBinding.getBindable().preroute(message, tx);
-
- theBinding.getBindable().route(message, tx);
-
- return true;
+ theBinding.getBindable().route(message, context);
}
- else
- {
- return false;
- }
}
- public boolean route(final ServerMessage message, final Transaction tx) throws Exception
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
boolean routed = false;
-
+
if (!exclusiveBindings.isEmpty())
{
for (Binding binding : exclusiveBindings)
{
if (binding.getFilter() == null || binding.getFilter().match(message))
{
- binding.getBindable().route(message, tx);
-
+ binding.getBindable().route(message, context);
+
routed = true;
}
}
@@ -271,7 +226,7 @@
{
if (message.getProperty(MessageImpl.HDR_FROM_CLUSTER) != null)
{
- routed = routeFromCluster(message, tx);
+ routeFromCluster(message, context);
}
else
{
@@ -393,24 +348,34 @@
routingNamePositions.put(routingName, pos);
}
-
- // TODO refactor to do this is one iteration
-
+
for (Bindable bindable : chosen)
{
- bindable.preroute(message, tx);
+ bindable.route(message, context);
}
+ }
+ }
+ }
+
+ private void routeFromCluster(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ byte[] ids = (byte[])message.removeProperty(MessageImpl.HDR_ROUTE_TO_IDS);
- for (Bindable bindable : chosen)
- {
- bindable.route(message, tx);
-
- routed = true;
- }
+ ByteBuffer buff = ByteBuffer.wrap(ids);
+
+ while (buff.hasRemaining())
+ {
+ long bindingID = buff.getLong();
+
+ Binding binding = bindingsMap.get(bindingID);
+
+ if (binding != null)
+ {
+ binding.willRoute(message);
+
+ binding.getBindable().route(message, context);
}
}
-
- return routed;
}
private final int incrementPos(int pos, int length)
Modified: trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/postoffice/impl/PostOfficeImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -36,6 +36,7 @@
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.paging.PageTransactionInfo;
import org.hornetq.core.paging.PagingManager;
+import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.paging.impl.PageTransactionInfoImpl;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.AddressManager;
@@ -45,9 +46,12 @@
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.postoffice.QueueInfo;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.server.impl.ServerMessageImpl;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
@@ -423,7 +427,7 @@
public synchronized void addBinding(final Binding binding) throws Exception
{
addressManager.addBinding(binding);
-
+
TypedProperties props = new TypedProperties();
props.putIntProperty(ManagementHelper.HDR_BINDING_TYPE, binding.getType().toInt());
@@ -515,8 +519,8 @@
return addressManager.getMatchingBindings(address);
}
- public void route(final ServerMessage message, Transaction tx) throws Exception
- {
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
+ {
SimpleString address = message.getDestination();
byte[] duplicateIDBytes = null;
@@ -540,15 +544,15 @@
if (cache.contains(duplicateIDBytes))
{
- if (tx == null)
+ if (context.getTransaction() == null)
{
- log.trace("Duplicate message detected - message will not be routed");
+ log.trace("Duplicate message detected - message will not be routed");
}
else
{
- log.trace("Duplicate message detected - transaction will be rejected");
+ log.trace("Duplicate message detected - transaction will be rejected");
- tx.markAsRollbackOnly(null);
+ context.getTransaction().markAsRollbackOnly(null);
}
return;
@@ -559,23 +563,26 @@
if (cache != null)
{
- if (tx == null)
+ if (context.getTransaction() == null)
{
// We need to store the duplicate id atomically with the message storage, so we need to create a tx for this
- tx = new TransactionImpl(storageManager);
+ Transaction tx = new TransactionImpl(storageManager);
+
+ context.setTransaction(tx);
startedTx = true;
}
- cache.addToCache(duplicateIDBytes, tx);
+ cache.addToCache(duplicateIDBytes, context.getTransaction());
}
- if (tx == null)
+ if (context.getTransaction() == null)
{
if (pagingManager.page(message, true))
{
message.setStored();
+
return;
}
}
@@ -583,80 +590,142 @@
{
SimpleString destination = message.getDestination();
- boolean depage = tx.getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
+ boolean depage = context.getTransaction().getProperty(TransactionPropertyIndexes.IS_DEPAGE) != null;
if (!depage && pagingManager.isPaging(destination))
{
- getPageOperation(tx).addMessageToPage(message);
+ getPageOperation(context.getTransaction()).addMessageToPage(message);
return;
}
}
Bindings bindings = addressManager.getBindingsForRoutingAddress(address);
-
- boolean routed;
if (bindings != null)
{
- routed = bindings.route(message, tx);
+ context.incrementDepth();
+
+ bindings.route(message, context);
+
+ context.decrementDepth();
}
- else
- {
- routed = false;
- }
- if (!routed)
+ //The depth allows for recursion e.g. with diverts - we only want to process the route after any recursed routes
+ //have been processed
+
+ if (context.getDepth() == 0)
{
- AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
-
- boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
-
- if (sendToDLA)
+ if (context.getQueues().isEmpty())
{
- //Send to the DLA for the address
-
- SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
-
- if (dlaAddress == null)
+ // Send to DLA if appropriate
+
+ AddressSettings addressSettings = addressSettingsRepository.getMatch(address.toString());
+
+ boolean sendToDLA = addressSettings.isSendToDLAOnNoRoute();
+
+ if (sendToDLA)
{
- log.warn("Did not route to any bindings for address " + address + " and sendToDLAOnNoRoute is true " +
- "but there is no DLA configured for the address, the message will be ignored.");
+ // Send to the DLA for the address
+
+ SimpleString dlaAddress = addressSettings.getDeadLetterAddress();
+
+ if (dlaAddress == null)
+ {
+ log.warn("Did not route to any bindings for address " + address +
+ " and sendToDLAOnNoRoute is true " +
+ "but there is no DLA configured for the address, the message will be ignored.");
+ }
+ else
+ {
+ message.setOriginalHeaders(message, false);
+
+ message.setDestination(dlaAddress);
+
+ route(message, context);
+ }
}
- else
- {
- message.setOriginalHeaders(message, false);
-
- message.setDestination(dlaAddress);
-
- route(message, tx);
- }
}
+ else
+ {
+ processRoute(message, context);
+ }
+
+ if (startedTx)
+ {
+ context.getTransaction().commit();
+ }
}
+ }
+
+ public MessageReference reroute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+ {
+ MessageReference reference = message.createReference(queue);
- if (startedTx)
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (scheduledDeliveryTime != null)
{
- tx.commit();
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
}
+
+ message.incrementDurableRefCount();
+
+ message.setStored();
+
+ int refCount = message.incrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (refCount == 1)
+ {
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ store.addSize(reference.getMemoryEstimate());
+
+ if (tx == null)
+ {
+ queue.addLast(reference);
+ }
+ else
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>(1);
+
+ refs.add(reference);
+
+ tx.addOperation(new AddOperation(refs));
+ }
+
+ return reference;
}
-
+
public void route(final ServerMessage message) throws Exception
{
- route(message, null);
+ route(message, new RoutingContextImpl(null));
}
- public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final Transaction tx) throws Exception
- {
+ public boolean redistribute(final ServerMessage message, final Queue originatingQueue, final RoutingContext context) throws Exception
+ {
Bindings bindings = addressManager.getBindingsForRoutingAddress(message.getDestination());
+ boolean res = false;
+
if (bindings != null)
{
- return bindings.redistribute(message, originatingQueue, tx);
+ bindings.redistribute(message, originatingQueue, context);
+
+ if (!context.getQueues().isEmpty())
+ {
+ processRoute(message, context);
+
+ res = true;
+ }
}
- else
- {
- return false;
- }
+
+ log.info("redistribute called res is " + res);
+
+ return res;
}
public PagingManager getPagingManager()
@@ -711,8 +780,9 @@
message.setBody(ChannelBuffers.EMPTY_BUFFER);
message.setDestination(queueName);
message.putBooleanProperty(HDR_RESET_QUEUE_DATA, true);
- queue.preroute(message, null);
- queue.route(message, null);
+ // queue.preroute(message, null);
+ // queue.route(message, null);
+ routeDirect(message, queue, false);
for (QueueInfo info : queueInfos.values())
{
@@ -727,7 +797,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, info.getFilterString());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
int consumersWithFilters = info.getFilterStrings() != null ? info.getFilterStrings().size() : 0;
@@ -740,7 +810,7 @@
message.putStringProperty(ManagementHelper.HDR_ROUTING_NAME, info.getRoutingName());
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
}
if (info.getFilterStrings() != null)
@@ -755,7 +825,7 @@
message.putStringProperty(ManagementHelper.HDR_FILTERSTRING, filterString);
message.putIntProperty(ManagementHelper.HDR_DISTANCE, info.getDistance());
- routeDirect(queue, message);
+ routeDirect(message, queue, true);
}
}
}
@@ -766,9 +836,107 @@
// Private -----------------------------------------------------------------
+ private void routeDirect(final ServerMessage message, final Queue queue, final boolean applyFilters) throws Exception
+ {
+ if (!applyFilters || queue.getFilter() == null || queue.getFilter().match(message))
+ {
+ RoutingContext context = new RoutingContextImpl(null);
+
+ queue.route(message, context);
+
+ processRoute(message, context);
+ }
+ }
+
+ private void processRoute(final ServerMessage message, final RoutingContext context) throws Exception
+ {
+ List<MessageReference> refs = new ArrayList<MessageReference>();
+
+ Transaction tx = context.getTransaction();
+
+ for (Queue queue : context.getQueues())
+ {
+ MessageReference reference = message.createReference(queue);
+
+ refs.add(reference);
+
+ Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
+
+ if (scheduledDeliveryTime != null)
+ {
+ reference.setScheduledDeliveryTime(scheduledDeliveryTime);
+ }
+
+ if (message.isDurable() && queue.isDurable())
+ {
+ int durableRefCount = message.incrementDurableRefCount();
+
+ if (durableRefCount == 1)
+ {
+ if (tx != null)
+ {
+ storageManager.storeMessageTransactional(tx.getID(), message);
+ }
+ else
+ {
+ storageManager.storeMessage(message);
+ }
+
+ message.setStored();
+ }
+
+ if (tx != null)
+ {
+ storageManager.storeReferenceTransactional(tx.getID(), queue.getID(), message.getMessageID());
+
+ tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
+ }
+ else
+ {
+ storageManager.storeReference(queue.getID(), message.getMessageID());
+ }
+
+ if (scheduledDeliveryTime != null)
+ {
+ if (tx != null)
+ {
+ storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), reference);
+ }
+ else
+ {
+ storageManager.updateScheduledDeliveryTime(reference);
+ }
+ }
+ }
+
+ int refCount = message.incrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (refCount == 1)
+ {
+ store.addSize(message.getMemoryEstimate());
+ }
+
+ store.addSize(reference.getMemoryEstimate());
+ }
+
+ if (tx != null)
+ {
+ tx.addOperation(new AddOperation(refs));
+ }
+ else
+ {
+ for (MessageReference ref : refs)
+ {
+
+ ref.getQueue().addLast(ref);
+ }
+ }
+ }
+
private synchronized void startExpiryScanner()
{
-
if (reaperPeriod > 0)
{
reaperThread = new Thread(reaperRunnable, "HornetQ-expiry-reaper");
@@ -779,15 +947,6 @@
}
}
- private void routeDirect(final Queue queue, final ServerMessage message) throws Exception
- {
- if (queue.getFilter() == null || queue.getFilter().match(message))
- {
- queue.preroute(message, null);
- queue.route(message, null);
- }
- }
-
private ServerMessage createQueueInfoMessage(final NotificationType type, final SimpleString queueName)
{
ServerMessage message = new ServerMessageImpl(storageManager.generateUniqueID());
@@ -1008,7 +1167,7 @@
// This could happen when the PageStore left the pageState
// TODO is this correct - don't we lose transactionality here???
- route(message, null);
+ route(message, new RoutingContextImpl(null));
}
first = false;
}
@@ -1026,4 +1185,64 @@
}
}
}
+
+ private class AddOperation implements TransactionOperation
+ {
+ private final List<MessageReference> refs;
+
+ AddOperation(final List<MessageReference> refs)
+ {
+ this.refs = refs;
+ }
+
+ public void afterCommit(Transaction tx) throws Exception
+ {
+ for (MessageReference ref : refs)
+ {
+ ref.getQueue().addLast(ref);
+ }
+ }
+
+ public void afterPrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void afterRollback(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeCommit(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforePrepare(Transaction tx) throws Exception
+ {
+ }
+
+ public void beforeRollback(Transaction tx) throws Exception
+ {
+ // Reverse the ref counts, and paging sizes
+
+ for (MessageReference ref : refs)
+ {
+ ServerMessage message = ref.getMessage();
+
+ if (message.isDurable() && ref.getQueue().isDurable())
+ {
+ message.decrementDurableRefCount();
+ }
+
+ int count = message.decrementRefCount();
+
+ PagingStore store = pagingManager.getPageStore(message.getDestination());
+
+ if (count == 0)
+ {
+ store.addSize(-message.getMemoryEstimate());
+ }
+
+ store.addSize(-ref.getMemoryEstimate());
+ }
+ }
+ }
}
Modified: trunk/src/main/org/hornetq/core/server/Bindable.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Bindable.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Bindable.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,7 +13,6 @@
package org.hornetq.core.server;
-import org.hornetq.core.transaction.Transaction;
/**
* A Bindable
@@ -26,8 +25,6 @@
*/
public interface Bindable
{
- void preroute(ServerMessage message, Transaction tx) throws Exception;
-
- void route(ServerMessage message, Transaction tx) throws Exception;
+ void route(ServerMessage message, RoutingContext context) throws Exception;
}
Modified: trunk/src/main/org/hornetq/core/server/MessageReference.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/MessageReference.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -51,4 +51,6 @@
void decrementDeliveryCount();
Queue getQueue();
+
+ void handled();
}
Modified: trunk/src/main/org/hornetq/core/server/Queue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/Queue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -19,7 +19,6 @@
import java.util.concurrent.Executor;
import org.hornetq.core.filter.Filter;
-import org.hornetq.core.remoting.Channel;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -34,8 +33,6 @@
*/
public interface Queue extends Bindable
{
- MessageReference reroute(ServerMessage message, Transaction tx) throws Exception;
-
SimpleString getName();
long getID();
Added: trunk/src/main/org/hornetq/core/server/RoutingContext.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/RoutingContext.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/RoutingContext.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,42 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server;
+
+import java.util.List;
+
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContext
+ *
+ * @author Tim Fox
+ *
+ *
+ */
+public interface RoutingContext
+{
+ Transaction getTransaction();
+
+ void setTransaction(Transaction transaction);
+
+ void addQueue(Queue queue);
+
+ List<Queue> getQueues();
+
+ void incrementDepth();
+
+ void decrementDepth();
+
+ int getDepth();
+}
Modified: trunk/src/main/org/hornetq/core/server/ServerMessage.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/ServerMessage.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -44,6 +44,7 @@
int getMemoryEstimate();
+ //TODO - do we really need this? Can't we use durable ref count?
void setStored() throws Exception;
boolean isStored();
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/BridgeImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -392,7 +392,7 @@
synchronized (this)
{
- ref.getQueue().referenceHandled();
+ ref.handled();
ServerMessage message = ref.getMessage();
@@ -500,10 +500,6 @@
private void fail()
{
- log.info("bridge " + name + " has failed");
-
- //executor.execute(new FailRunnable());
-
//This will get called even after the bridge reconnects - in this case
//we want to cancel all unacked refs so they get resent
//duplicate detection will ensure no dups are routed on the other side
@@ -670,8 +666,6 @@
queue.deliverAsync(executor);
- log.info("Bridge " + name + " is now connected to destination ");
-
return true;
}
catch (Exception e)
Modified: trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/cluster/impl/Redistributor.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,6 +23,7 @@
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.utils.Future;
@@ -122,7 +123,7 @@
final Transaction tx = new TransactionImpl(storageManager);
- boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
+ boolean routed = postOffice.redistribute(reference.getMessage(), queue, new RoutingContextImpl(tx));
if (routed)
{
@@ -138,7 +139,7 @@
private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
{
- queue.referenceHandled();
+ reference.handled();
queue.acknowledge(tx, reference);
Modified: trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/DivertImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -17,14 +17,11 @@
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
-import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.Divert;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.Transformer;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
@@ -54,19 +51,13 @@
private final Transformer transformer;
- private final PagingManager pagingManager;
-
- private final StorageManager storageManager;
-
public DivertImpl(final SimpleString forwardAddress,
final SimpleString uniqueName,
final SimpleString routingName,
final boolean exclusive,
final Filter filter,
final Transformer transformer,
- final PostOffice postOffice,
- final PagingManager pagingManager,
- final StorageManager storageManager)
+ final PostOffice postOffice)
{
this.forwardAddress = forwardAddress;
@@ -81,40 +72,14 @@
this.transformer = transformer;
this.postOffice = postOffice;
-
- this.pagingManager = pagingManager;
-
- this.storageManager = storageManager;
}
- public void preroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- //We need to increment ref count here to ensure that the message doesn't get stored, deleted and stored again in a single route which
- //can occur if the message is routed to a queue, then acked before it's routed here
-
- //TODO - combine with similar code in QueueImpl.accept()
-
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- if (message.isDurable())
- {
- message.incrementDurableRefCount();
- }
- }
-
- public void route(ServerMessage message, final Transaction tx) throws Exception
+ public void route(ServerMessage message, final RoutingContext context) throws Exception
{
SimpleString originalDestination = message.getDestination();
message.setDestination(forwardAddress);
-
+
message.putStringProperty(HDR_ORIGINAL_DESTINATION, originalDestination);
if (transformer != null)
@@ -122,30 +87,7 @@
message = transformer.transform(message);
}
- postOffice.route(message, tx);
-
- //Decrement the ref count here - and delete the message if necessary
-
- //TODO combine this with code in QueueImpl::postAcknowledge
-
- if (message.isDurable())
- {
- int count = message.decrementDurableRefCount();
-
- if (count == 0)
- {
- storageManager.deleteMessage(message.getMessageID());
- }
- }
-
- // TODO: We could optimize this by storing the paging-store for the address on the Queue. We would need to know
- // the Address for the Queue
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (message.decrementRefCount() == 0)
- {
- store.addSize(-message.getMemoryEstimate());
- }
+ postOffice.route(message, context);
}
public SimpleString getRoutingName()
Modified: trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/HornetQServerImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -80,7 +80,6 @@
import org.hornetq.core.server.Divert;
import org.hornetq.core.server.HornetQServer;
import org.hornetq.core.server.MemoryManager;
-import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.QueueFactory;
import org.hornetq.core.server.ServerSession;
@@ -91,9 +90,7 @@
import org.hornetq.core.settings.impl.AddressSettings;
import org.hornetq.core.settings.impl.HierarchicalObjectRepository;
import org.hornetq.core.transaction.ResourceManager;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
-import org.hornetq.core.transaction.impl.TransactionImpl;
import org.hornetq.core.version.Version;
import org.hornetq.utils.ExecutorFactory;
import org.hornetq.utils.HornetQThreadFactory;
@@ -572,7 +569,7 @@
executorFactory.getExecutor(),
channel,
managementService,
- queueFactory,
+ // queueFactory,
this,
configuration.getManagementAddress());
@@ -763,35 +760,6 @@
return nodeID;
}
- public void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception
- {
- Binding binding = postOffice.getBinding(queueName);
-
- if (binding == null)
- {
- throw new IllegalStateException("Cannot find queue " + queueName);
- }
-
- Queue queue = (Queue)binding.getBindable();
-
- MessageReference reference = queue.removeFirstReference(messageID);
-
- Transaction tx = new TransactionImpl(storageManager);
-
- boolean routed = postOffice.redistribute(reference.getMessage(), queue, tx);
-
- if (routed)
- {
- queue.acknowledge(tx, reference);
-
- tx.commit();
- }
- else
- {
- throw new IllegalStateException("Must be routed");
- }
- }
-
public Queue createQueue(final SimpleString address,
final SimpleString queueName,
final SimpleString filterString,
@@ -924,8 +892,6 @@
{
// Complete the startup procedure
- log.info("Activating server");
-
configuration.setBackup(false);
initialisePart2();
@@ -977,9 +943,8 @@
private void initialiseLogging()
{
- LogDelegateFactory logDelegateFactory =
- (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
-
+ LogDelegateFactory logDelegateFactory = (LogDelegateFactory)instantiateInstance(configuration.getLogDelegateFactoryClassName());
+
Logger.setDelegateFactory(logDelegateFactory);
}
@@ -1188,7 +1153,7 @@
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
- storageManager.loadMessageJournal(pagingManager, resourceManager, queues, duplicateIDMap);
+ storageManager.loadMessageJournal(postOffice, pagingManager, resourceManager, queues, duplicateIDMap);
for (Map.Entry<SimpleString, List<Pair<byte[], Long>>> entry : duplicateIDMap.entrySet())
{
@@ -1267,8 +1232,13 @@
filter = new FilterImpl(filterString);
}
- final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(), address, queueName, filter, durable, temporary);
-
+ final Queue queue = queueFactory.createQueue(storageManager.generateUniqueID(),
+ address,
+ queueName,
+ filter,
+ durable,
+ temporary);
+
binding = new LocalQueueBinding(address, queue, nodeID);
if (durable)
@@ -1335,9 +1305,9 @@
config.isExclusive(),
filter,
transformer,
- postOffice,
- pagingManager,
- storageManager);
+ postOffice);
+ // pagingManager,
+ // storageManager);
Binding binding = new DivertBinding(storageManager.generateUniqueID(), sAddress, divert);
Modified: trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/LastValueQueue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,55 +12,48 @@
*/
package org.hornetq.core.server.impl;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ScheduledExecutorService;
import org.hornetq.core.filter.Filter;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
-import org.hornetq.core.paging.PagingManager;
-import org.hornetq.core.paging.PagingStore;
import org.hornetq.core.persistence.StorageManager;
import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.MessageReference;
+import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.settings.HierarchicalRepository;
import org.hornetq.core.settings.impl.AddressSettings;
-import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
/**
* A queue that will discard messages if a newer message with the same MessageImpl.HDR_LAST_VALUE_NAME property value.
* In other words it only retains the last value
+ *
* This is useful for example, for stock prices, where you're only interested in the latest value
* for a particular stock
+ *
* @author <a href="mailto:andy.taylor at jboss.org">Andy Taylor</a>
+ * @author <a href="mailto:tim.fox at jboss.com">Tim Fox</a> rewrite
*/
public class LastValueQueue extends QueueImpl
{
private static final Logger log = Logger.getLogger(LastValueQueue.class);
- private final Map<SimpleString, ServerMessage> map = new HashMap<SimpleString, ServerMessage>();
+ private final Map<SimpleString, HolderReference> map = new ConcurrentHashMap<SimpleString, HolderReference>();
- private final PagingManager pagingManager;
-
- private final StorageManager storageManager;
-
public LastValueQueue(final long persistenceID,
- final SimpleString address,
- final SimpleString name,
- final Filter filter,
- final boolean durable,
- final boolean temporary,
- final ScheduledExecutorService scheduledExecutor,
- final PostOffice postOffice,
- final StorageManager storageManager,
- final HierarchicalRepository<AddressSettings> addressSettingsRepository)
+ final SimpleString address,
+ final SimpleString name,
+ final Filter filter,
+ final boolean durable,
+ final boolean temporary,
+ final ScheduledExecutorService scheduledExecutor,
+ final PostOffice postOffice,
+ final StorageManager storageManager,
+ final HierarchicalRepository<AddressSettings> addressSettingsRepository)
{
super(persistenceID,
address,
@@ -72,213 +65,158 @@
postOffice,
storageManager,
addressSettingsRepository);
- this.pagingManager = postOffice.getPagingManager();
- this.storageManager = storageManager;
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public synchronized void add(final MessageReference ref, final boolean first)
{
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+ SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
+
if (prop != null)
{
- synchronized (map)
- {
- ServerMessage msg = map.put(prop, message);
- // if an older message existed then we discard it
- if (msg != null)
+ HolderReference hr = map.get(prop);
+
+ if (!first)
+ {
+ if (hr != null)
{
- MessageReference ref;
- if (tx != null)
+ // We need to overwrite the old ref with the new one and ack the old one
+
+ MessageReference oldRef = hr.getReference();
+
+ super.referenceHandled();
+
+ try
{
- discardMessage(msg.getMessageID(), tx);
+ super.acknowledge(oldRef);
}
- else
+ catch (Exception e)
{
- ref = removeReferenceWithID(msg.getMessageID());
- if (ref != null)
- {
- discardMessage(ref, tx);
- }
+ log.error("Failed to ack old reference", e);
}
+ hr.setReference(ref);
+
}
- }
- }
- super.route(message, tx);
- }
+ else
+ {
+ hr = new HolderReference(prop, ref);
- public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- SimpleString prop = (SimpleString)message.getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
- {
- synchronized (map)
+ map.put(prop, hr);
+
+ super.add(hr, first);
+ }
+ }
+ else
{
- ServerMessage msg = map.put(prop, message);
- if (msg != null)
+ // Add to front
+
+ if (hr != null)
{
- if (tx != null)
+ // We keep the current ref and ack the one we are returning
+
+ super.referenceHandled();
+
+ try
{
- rediscardMessage(msg.getMessageID(), tx);
+ super.acknowledge(ref);
}
- else
+ catch (Exception e)
{
- MessageReference ref = removeReferenceWithID(msg.getMessageID());
- rediscardMessage(ref);
+ log.error("Failed to ack old reference", e);
}
}
+ else
+ {
+ map.put(prop, (HolderReference)ref);
+
+ super.add(ref, first);
+ }
}
}
- return super.reroute(message, tx);
+ else
+ {
+ super.add(ref, first);
+ }
}
- public void acknowledge(final MessageReference ref) throws Exception
+ private class HolderReference implements MessageReference
{
- super.acknowledge(ref);
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
+ private final SimpleString prop;
+
+ private volatile MessageReference ref;
+
+ HolderReference(final SimpleString prop, final MessageReference ref)
{
- synchronized (map)
- {
- ServerMessage serverMessage = map.get(prop);
- if (serverMessage != null && ref.getMessage().getMessageID() == serverMessage.getMessageID())
- {
- map.remove(prop);
- }
- }
+ this.prop = prop;
+
+ this.ref = ref;
}
- }
- public void cancel(final Transaction tx, final MessageReference ref) throws Exception
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
+ MessageReference getReference()
{
- synchronized (map)
- {
- ServerMessage msg = map.get(prop);
- if (msg.getMessageID() == ref.getMessage().getMessageID())
- {
- super.cancel(tx, ref);
- }
- else
- {
- discardMessage(ref, tx);
- }
- }
+ return ref;
}
- else
+
+ public void handled()
{
- super.cancel(tx, ref);
+ // We need to remove the entry from the map just before it gets delivered
+
+ map.remove(prop);
}
- }
- void postRollback(final LinkedList<MessageReference> refs) throws Exception
- {
- List<MessageReference> refsToDiscard = new ArrayList<MessageReference>();
- List<SimpleString> refsToClear = new ArrayList<SimpleString>();
- synchronized (map)
+ void setReference(final MessageReference ref)
{
- for (MessageReference ref : refs)
- {
- SimpleString prop = (SimpleString)ref.getMessage().getProperty(MessageImpl.HDR_LAST_VALUE_NAME);
- if (prop != null)
- {
- ServerMessage msg = map.get(prop);
- if (msg != null)
- {
- if (msg.getMessageID() != ref.getMessage().getMessageID())
- {
- refsToDiscard.add(ref);
- }
- else
- {
- refsToClear.add(prop);
- }
- }
- }
- }
- for (SimpleString simpleString : refsToClear)
- {
- map.remove(simpleString);
- }
+ this.ref = ref;
}
- for (MessageReference ref : refsToDiscard)
+
+ public MessageReference copy(Queue queue)
{
- refs.remove(ref);
- discardMessage(ref, null);
+ return ref.copy(queue);
}
- super.postRollback(refs);
- }
- final void discardMessage(MessageReference ref, Transaction tx) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- QueueImpl queue = (QueueImpl)ref.getQueue();
- ServerMessage msg = ref.getMessage();
- boolean durableRef = msg.isDurable() && queue.isDurable();
+ public void decrementDeliveryCount()
+ {
+ ref.decrementDeliveryCount();
+ }
- if (durableRef)
+ public int getDeliveryCount()
{
- int count = msg.decrementDurableRefCount();
+ return ref.getDeliveryCount();
+ }
- if (count == 0)
- {
- if (tx == null)
- {
- storageManager.deleteMessage(msg.getMessageID());
- }
- else
- {
- storageManager.deleteMessageTransactional(tx.getID(), getID(), msg.getMessageID());
- }
- }
+ public int getMemoryEstimate()
+ {
+ return ref.getMemoryEstimate();
}
- }
- final void discardMessage(Long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
+ public ServerMessage getMessage()
+ {
+ return ref.getMessage();
+ }
- while (iterator.hasNext())
+ public Queue getQueue()
{
- MessageReference ref = iterator.next();
+ return ref.getQueue();
+ }
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- discardMessage(ref, tx);
- break;
- }
+ public long getScheduledDeliveryTime()
+ {
+ return ref.getScheduledDeliveryTime();
}
- }
+ public void incrementDeliveryCount()
+ {
+ ref.incrementDeliveryCount();
+ }
- final void rediscardMessage(long id, Transaction tx) throws Exception
- {
- RefsOperation oper = getRefsOperation(tx);
- Iterator<MessageReference> iterator = oper.refsToAdd.iterator();
-
- while (iterator.hasNext())
+ public void setDeliveryCount(int deliveryCount)
{
- MessageReference ref = iterator.next();
+ ref.setDeliveryCount(deliveryCount);
+ }
- if (ref.getMessage().getMessageID() == id)
- {
- iterator.remove();
- rediscardMessage(ref);
- break;
- }
+ public void setScheduledDeliveryTime(long scheduledDeliveryTime)
+ {
+ ref.setScheduledDeliveryTime(scheduledDeliveryTime);
}
}
-
- final void rediscardMessage(MessageReference ref) throws Exception
- {
- deliveringCount.decrementAndGet();
- PagingStore store = pagingManager.getPageStore(ref.getMessage().getDestination());
- store.addSize(-ref.getMemoryEstimate());
- }
}
Modified: trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/MessageReferenceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -119,6 +119,11 @@
{
return queue;
}
+
+ public void handled()
+ {
+ queue.referenceHandled();
+ }
// Public --------------------------------------------------------
Modified: trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/QueueImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,7 +14,6 @@
package org.hornetq.core.server.impl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -47,6 +46,7 @@
import org.hornetq.core.server.HandleStatus;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ScheduledDeliveryHandler;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.cluster.impl.Redistributor;
@@ -210,135 +210,12 @@
{
return false;
}
-
- public void preroute(final ServerMessage message, final Transaction tx) throws Exception
+
+ public void route(final ServerMessage message, final RoutingContext context) throws Exception
{
- int count = message.incrementRefCount();
-
- if (count == 1)
- {
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(message.getMemoryEstimate());
- }
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
- {
- message.incrementDurableRefCount();
- }
+ context.addQueue(this);
}
- public void route(final ServerMessage message, final Transaction tx) throws Exception
- {
- boolean durableRef = message.isDurable() && durable;
-
- // If durable, must be persisted before anything is routed
- MessageReference ref = message.createReference(this);
-
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- store.addSize(ref.getMemoryEstimate());
-
- Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- if (scheduledDeliveryTime != null)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (tx == null)
- {
- if (durableRef)
- {
- if (!message.isStored())
- {
- storageManager.storeMessage(message);
-
- message.setStored();
- }
-
- storageManager.storeReference(ref.getQueue().getID(), message.getMessageID());
- }
-
- if (scheduledDeliveryTime != null && durableRef)
- {
- storageManager.updateScheduledDeliveryTime(ref);
- }
-
- addLast(ref);
- }
- else
- {
- if (durableRef)
- {
- if (!message.isStored())
- {
- storageManager.storeMessageTransactional(tx.getID(), message);
-
- message.setStored();
- }
-
- tx.putProperty(TransactionPropertyIndexes.CONTAINS_PERSISTENT, true);
-
- storageManager.storeReferenceTransactional(tx.getID(),
- ref.getQueue().getID(),
- message.getMessageID());
- }
-
- if (scheduledDeliveryTime != null && durableRef)
- {
- storageManager.updateScheduledDeliveryTimeTransactional(tx.getID(), ref);
- }
-
- getRefsOperation(tx).addRef(ref);
- }
- }
-
- public MessageReference reroute(final ServerMessage message, final Transaction tx) throws Exception
- {
- MessageReference ref = message.createReference(this);
-
- int count = message.incrementRefCount();
-
- PagingStore store = pagingManager.getPageStore(message.getDestination());
-
- if (count == 1)
- {
- store.addSize(message.getMemoryEstimate());
- }
-
- store.addSize(ref.getMemoryEstimate());
-
- boolean durableRef = message.isDurable() && durable;
-
- if (durableRef)
- {
- message.incrementDurableRefCount();
- }
-
- Long scheduledDeliveryTime = (Long)message.getProperty(MessageImpl.HDR_SCHEDULED_DELIVERY_TIME);
-
- if (scheduledDeliveryTime != null)
- {
- ref.setScheduledDeliveryTime(scheduledDeliveryTime);
- }
-
- if (tx == null)
- {
- addLast(ref);
- }
- else
- {
- getRefsOperation(tx).addRef(ref);
- }
-
- message.setStored();
-
- return ref;
- }
-
// Queue implementation ----------------------------------------------------------------------------------------
public void lockDelivery()
@@ -384,7 +261,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
@@ -1071,7 +948,7 @@
copyMessage.setDestination(toAddress);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, new RoutingContextImpl(tx));
acknowledge(tx, ref);
}
@@ -1158,7 +1035,7 @@
copyMessage.setDestination(address);
- postOffice.route(copyMessage, tx);
+ postOffice.route(copyMessage, new RoutingContextImpl(tx));
acknowledge(tx, ref);
@@ -1253,7 +1130,7 @@
iterator.remove();
}
- referenceHandled();
+ reference.handled();
try
{
@@ -1316,7 +1193,7 @@
}
}
- private synchronized void add(final MessageReference ref, final boolean first)
+ protected synchronized void add(final MessageReference ref, final boolean first)
{
if (!first)
{
@@ -1558,14 +1435,11 @@
{
synchronized (this)
{
+ direct = false;
+
for (MessageReference ref : refs)
{
- ServerMessage msg = ref.getMessage();
-
- if (!scheduledDeliveryHandler.checkAndSchedule(ref))
- {
- messageReferences.addFirst(ref, msg.getPriority());
- }
+ add(ref, true);
}
deliver();
@@ -1633,15 +1507,8 @@
final class RefsOperation implements TransactionOperation
{
- List<MessageReference> refsToAdd = new ArrayList<MessageReference>();
-
List<MessageReference> refsToAck = new ArrayList<MessageReference>();
- synchronized void addRef(final MessageReference ref)
- {
- refsToAdd.add(ref);
- }
-
synchronized void addAck(final MessageReference ref)
{
refsToAck.add(ref);
@@ -1689,28 +1556,8 @@
}
}
- /* (non-Javadoc)
- * @see org.hornetq.core.transaction.TransactionOperation#getDistinctQueues()
- */
- public synchronized Collection<Queue> getDistinctQueues()
- {
- HashSet<Queue> queues = new HashSet<Queue>();
-
- for (MessageReference ref : refsToAck)
- {
- queues.add(ref.getQueue());
- }
-
- return queues;
- }
-
public void afterCommit(final Transaction tx) throws Exception
{
- for (MessageReference ref : refsToAdd)
- {
- ref.getQueue().addLast(ref);
- }
-
for (MessageReference ref : refsToAck)
{
synchronized (ref.getQueue())
@@ -1726,25 +1573,6 @@
public void beforeRollback(final Transaction tx) throws Exception
{
- Set<ServerMessage> msgs = new HashSet<ServerMessage>();
-
- for (MessageReference ref : refsToAdd)
- {
- ServerMessage msg = ref.getMessage();
-
- // Optimise this
- PagingStore store = pagingManager.getPageStore(msg.getDestination());
-
- store.addSize(-ref.getMemoryEstimate());
-
- if (!msgs.contains(msg))
- {
- store.addSize(-msg.getMemoryEstimate());
- msg.decrementRefCount();
- }
-
- msgs.add(msg);
- }
}
}
Added: trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java (rev 0)
+++ trunk/src/main/org/hornetq/core/server/impl/RoutingContextImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2009 Red Hat, Inc.
+ * Red Hat licenses this file to you under the Apache License, version
+ * 2.0 (the "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+ * implied. See the License for the specific language governing
+ * permissions and limitations under the License.
+ */
+
+package org.hornetq.core.server.impl;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
+import org.hornetq.core.transaction.Transaction;
+
+/**
+ * A RoutingContextImpl
+ *
+ * @author tim
+ *
+ *
+ */
+public class RoutingContextImpl implements RoutingContext
+{
+ private List<Queue> queues = new ArrayList<Queue>();
+
+ private Transaction transaction;
+
+ private int depth;
+
+ public RoutingContextImpl(final Transaction transaction)
+ {
+ this.transaction = transaction;
+ }
+
+ public void addQueue(final Queue queue)
+ {
+ queues.add(queue);
+ }
+
+ public Transaction getTransaction()
+ {
+ return transaction;
+ }
+
+ public void setTransaction(final Transaction tx)
+ {
+ this.transaction = tx;
+ }
+
+ public List<Queue> getQueues()
+ {
+ return queues;
+ }
+
+ public void decrementDepth()
+ {
+ depth--;
+ }
+
+ public int getDepth()
+ {
+ return depth;
+ }
+
+ public void incrementDepth()
+ {
+ depth++;
+ }
+
+}
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerConsumerImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -499,7 +499,7 @@
deliveringRefs.add(ref);
}
- ref.getQueue().referenceHandled();
+ ref.handled();
ref.incrementDeliveryCount();
Modified: trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/server/impl/ServerSessionImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -85,7 +85,7 @@
import org.hornetq.core.server.LargeServerMessage;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
-import org.hornetq.core.server.QueueFactory;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerConsumer;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.server.ServerSession;
@@ -159,10 +159,6 @@
private final SimpleString managementAddress;
- private final QueueFactory queueFactory;
-
- private final SimpleString nodeID;
-
// The current currentLargeMessage being processed
// In case of replication, currentLargeMessage should only be accessed within the replication callbacks
private volatile LargeServerMessage currentLargeMessage;
@@ -187,8 +183,7 @@
final SecurityStore securityStore,
final Executor executor,
final Channel channel,
- final ManagementService managementService,
- final QueueFactory queueFactory,
+ final ManagementService managementService,
final HornetQServer server,
final SimpleString managementAddress) throws Exception
{
@@ -235,10 +230,6 @@
this.managementAddress = managementAddress;
- this.queueFactory = queueFactory;
-
- this.nodeID = server.getNodeID();
-
remotingConnection.addFailureListener(this);
remotingConnection.addCloseListener(this);
@@ -1671,11 +1662,6 @@
// Public
// ----------------------------------------------------------------------------
- public Transaction getTransaction()
- {
- return tx;
- }
-
// Private
// ----------------------------------------------------------------------------
@@ -1816,14 +1802,18 @@
}
throw e;
}
-
+
+ RoutingContext context;
+
if (tx == null || autoCommitSends)
{
- postOffice.route(msg);
+ context = new RoutingContextImpl(null);
}
else
{
- postOffice.route(msg, tx);
+ context = new RoutingContextImpl(tx);
}
+
+ postOffice.route(msg, context);
}
}
Modified: trunk/src/main/org/hornetq/core/transaction/Transaction.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/Transaction.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,12 +13,9 @@
package org.hornetq.core.transaction;
-import java.util.Set;
-
import javax.transaction.xa.Xid;
import org.hornetq.core.exception.HornetQException;
-import org.hornetq.core.server.Queue;
/**
* A HornetQ internal transaction
@@ -62,8 +59,6 @@
Object getProperty(int index);
- Set<Queue> getDistinctQueues();
-
static enum State
{
ACTIVE, PREPARED, COMMITTED, ROLLEDBACK, SUSPENDED, ROLLBACK_ONLY
Modified: trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/TransactionOperation.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -13,10 +13,7 @@
package org.hornetq.core.transaction;
-import java.util.Collection;
-import org.hornetq.core.server.Queue;
-
/**
*
* A TransactionOperation
@@ -26,10 +23,6 @@
*/
public interface TransactionOperation
{
-
- /** rollback will need a distinct list of Queues in order to lock those queues before calling rollback */
- Collection<Queue> getDistinctQueues();
-
void beforePrepare(Transaction tx) throws Exception;
void beforeCommit(Transaction tx) throws Exception;
Modified: trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java
===================================================================
--- trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/core/transaction/impl/TransactionImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -97,29 +97,6 @@
// Transaction implementation
// -----------------------------------------------------------
- public Set<Queue> getDistinctQueues()
- {
- HashSet<Queue> queues = new HashSet<Queue>();
-
- if (operations != null)
- {
- for (TransactionOperation op : operations)
- {
- Collection<Queue> q = op.getDistinctQueues();
- if (q == null)
- {
- log.warn("Operation " + op + " returned null getDistinctQueues");
- }
- else
- {
- queues.addAll(q);
- }
- }
- }
-
- return queues;
- }
-
public long getID()
{
return id;
@@ -184,7 +161,7 @@
}
public void commit(boolean onePhase) throws Exception
- {
+ {
synchronized (timeoutLock)
{
if (state == State.ROLLBACK_ONLY)
Modified: trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java
===================================================================
--- trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/src/main/org/hornetq/jms/server/management/impl/JMSManagementServiceImpl.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -59,8 +59,7 @@
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSServerObjectName();
JMSServerControlImpl control = new JMSServerControlImpl(server);
- managementService.registerInJMX(objectName,
- control);
+ managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_SERVER, control);
return control;
}
@@ -72,8 +71,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_SERVER);
}
- public synchronized void registerQueue(final HornetQQueue queue,
- final String jndiBinding) throws Exception
+ public synchronized void registerQueue(final HornetQQueue queue, final String jndiBinding) throws Exception
{
QueueControl coreQueueControl = (QueueControl)managementService.getResource(ResourceNames.CORE_QUEUE + queue.getAddress());
MessageCounterManager messageCounterManager = managementService.getMessageCounterManager();
@@ -85,10 +83,7 @@
messageCounterManager.getMaxDayCount());
messageCounterManager.registerMessageCounter(queue.getName(), counter);
ObjectName objectName = managementService.getObjectNameBuilder().getJMSQueueObjectName(queue.getQueueName());
- JMSQueueControlImpl control = new JMSQueueControlImpl(queue,
- coreQueueControl,
- jndiBinding,
- counter);
+ JMSQueueControlImpl control = new JMSQueueControlImpl(queue, coreQueueControl, jndiBinding, counter);
managementService.registerInJMX(objectName, control);
managementService.registerInRegistry(ResourceNames.JMS_QUEUE + queue.getQueueName(), control);
}
@@ -100,8 +95,7 @@
managementService.unregisterFromRegistry(ResourceNames.JMS_QUEUE + name);
}
- public synchronized void registerTopic(final HornetQTopic topic,
- final String jndiBinding) throws Exception
+ public synchronized void registerTopic(final HornetQTopic topic, final String jndiBinding) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getJMSTopicObjectName(topic.getTopicName());
AddressControl addressControl = (AddressControl)managementService.getResource(ResourceNames.CORE_ADDRESS + topic.getAddress());
@@ -118,8 +112,8 @@
}
public synchronized void registerConnectionFactory(final String name,
- final HornetQConnectionFactory connectionFactory,
- final List<String> bindings) throws Exception
+ final HornetQConnectionFactory connectionFactory,
+ final List<String> bindings) throws Exception
{
ObjectName objectName = managementService.getObjectNameBuilder().getConnectionFactoryObjectName(name);
JMSConnectionFactoryControlImpl control = new JMSConnectionFactoryControlImpl(connectionFactory, name, bindings);
Modified: trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/client/PagingTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -149,6 +149,8 @@
ClientMessage message2 = consumer.receive(RECEIVE_TIMEOUT);
assertNotNull(message2);
+
+ log.info("got message " + message2.getProperty(new SimpleString("id")));
assertEquals(i, ((Integer)message2.getProperty(new SimpleString("id"))).intValue());
Modified: trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/divert/DivertTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -92,7 +92,7 @@
ClientConsumer consumer2 = session.createConsumer(queueName2);
- final int numMessages = 10;
+ final int numMessages = 1;
final SimpleString propKey = new SimpleString("testkey");
Modified: trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/jms/connection/CloseConnectionOnGCTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -12,6 +12,8 @@
*/
package org.hornetq.tests.integration.jms.connection;
+import java.lang.ref.WeakReference;
+
import javax.jms.Connection;
import javax.jms.Session;
@@ -54,16 +56,14 @@
public void testCloseOneConnectionOnGC() throws Exception
{
Connection conn = cf.createConnection();
+
+ WeakReference<Connection> wr = new WeakReference<Connection>(conn);
assertEquals(1, server.getRemotingService().getConnections().size());
conn = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr);
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -74,17 +74,17 @@
Connection conn2 = cf.createConnection();
Connection conn3 = cf.createConnection();
+ WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+ WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+ WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+
assertEquals(1, server.getRemotingService().getConnections().size());
conn1 = null;
conn2 = null;
conn3 = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr1, wr2, wr3);
assertEquals(0, server.getRemotingService().getConnections().size());
}
@@ -93,8 +93,12 @@
{
Connection conn1 = cf.createConnection();
Connection conn2 = cf.createConnection();
- Connection conn3 = cf.createConnection();
+ Connection conn3 = cf.createConnection();
+ WeakReference<Connection> wr1 = new WeakReference<Connection>(conn1);
+ WeakReference<Connection> wr2 = new WeakReference<Connection>(conn2);
+ WeakReference<Connection> wr3 = new WeakReference<Connection>(conn3);
+
Session sess1 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess2 = conn1.createSession(false, Session.AUTO_ACKNOWLEDGE);
Session sess3 = conn2.createSession(false, Session.AUTO_ACKNOWLEDGE);
@@ -109,13 +113,40 @@
conn2 = null;
conn3 = null;
- System.gc();
- System.gc();
- System.gc();
-
- Thread.sleep(2000);
+ checkWeakReferences(wr1, wr2, wr3);
assertEquals(0, server.getRemotingService().getConnections().size());
}
+ public static void checkWeakReferences(WeakReference<?>... references)
+ {
+
+ int i = 0;
+ boolean hasValue = false;
+
+ do
+ {
+ hasValue = false;
+
+ if (i > 0)
+ {
+ forceGC();
+ }
+
+ for (WeakReference<?> ref : references)
+ {
+ if (ref.get() != null)
+ {
+ hasValue = true;
+ }
+ }
+ }
+ while (i++ <= 30 && hasValue);
+
+ for (WeakReference<?> ref : references)
+ {
+ assertNull(ref.get());
+ }
+ }
+
}
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/JournalStorageManagerIntegrationTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -47,7 +47,7 @@
public void testLargeMessageCopy() throws Exception
{
clearData();
-
+
Configuration configuration = createDefaultConfig();
configuration.start();
@@ -63,13 +63,15 @@
byte[] data = new byte[1024];
for (int i = 0; i < 110; i++)
+ {
msg.addBytes(data);
+ }
ServerMessage msg2 = msg.copy(2);
-
+
assertEquals(110 * 1024, msg.getBodySize());
assertEquals(110 * 1024, msg2.getBodySize());
-
+
}
// Package protected ---------------------------------------------
Modified: trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/persistence/RestartSMTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -24,8 +24,10 @@
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.QueueBindingInfo;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.ServiceTestBase;
/**
@@ -42,7 +44,7 @@
// Constants -----------------------------------------------------
private static final Logger log = Logger.getLogger(RestartSMTest.class);
-
+
// Attributes ----------------------------------------------------
// Static --------------------------------------------------------
@@ -62,6 +64,8 @@
configuration.setJournalType(JournalType.ASYNCIO);
+ PostOffice postOffice = new FakePostOffice();
+
final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
try
{
@@ -74,7 +78,7 @@
Map<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
journal.stop();
@@ -84,7 +88,7 @@
queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
queueBindingInfos = new ArrayList<QueueBindingInfo>();
Modified: trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/integration/server/LVQTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -21,6 +21,7 @@
import org.hornetq.core.config.TransportConfiguration;
import org.hornetq.core.config.impl.ConfigurationImpl;
import org.hornetq.core.exception.HornetQException;
+import org.hornetq.core.logging.Logger;
import org.hornetq.core.message.impl.MessageImpl;
import org.hornetq.core.server.HornetQ;
import org.hornetq.core.server.HornetQServer;
@@ -33,6 +34,8 @@
*/
public class LVQTest extends UnitTestCase
{
+ private static final Logger log = Logger.getLogger(LVQTest.class);
+
private HornetQServer server;
private ClientSession clientSession;
@@ -127,13 +130,14 @@
clientSession.start();
ClientMessage m = consumer.receive(1000);
assertNotNull(m);
+ assertEquals(m.getBody().readString(), "m1");
producer.send(m2);
consumer.close();
consumer = clientSession.createConsumer(qName1);
m = consumer.receive(1000);
- assertNotNull(m);
- m.acknowledge();
- assertEquals(m.getBody().readString(), "m2");
+ assertNotNull(m);
+ assertEquals("m2", m.getBody().readString());
+ m.acknowledge();
m = consumer.receive(1000);
assertNull(m);
}
Modified: trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/performance/persistence/StorageManagerTimingTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,9 +23,11 @@
import org.hornetq.core.config.impl.FileConfiguration;
import org.hornetq.core.logging.Logger;
import org.hornetq.core.persistence.impl.journal.JournalStorageManager;
+import org.hornetq.core.postoffice.PostOffice;
import org.hornetq.core.server.JournalType;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.impl.ServerMessageImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.UnitTestCase;
import org.hornetq.utils.SimpleString;
@@ -117,12 +119,15 @@
configuration.setJournalType(journalType);
- final JournalStorageManager journal = new JournalStorageManager(configuration, Executors.newCachedThreadPool());
+ PostOffice postOffice = new FakePostOffice();
+
+ final JournalStorageManager journal = new JournalStorageManager(configuration,
+ Executors.newCachedThreadPool());
journal.start();
HashMap<Long, Queue> queues = new HashMap<Long, Queue>();
- journal.loadMessageJournal(null, null, queues, null);
+ journal.loadMessageJournal(postOffice, null, null, queues, null);
final byte[] bytes = new byte[900];
@@ -163,11 +168,10 @@
final SimpleString address = new SimpleString("Destination " + i);
ServerMessageImpl implMsg = new ServerMessageImpl(/* type */(byte)1, /* durable */
- true, /* expiration */
- 0,
- /* timestamp */0, /* priority */
- (byte)0,
- ChannelBuffers.wrappedBuffer(new byte[1024]));
+ true, /* expiration */
+ 0,
+ /* timestamp */0, /* priority */
+ (byte)0, ChannelBuffers.wrappedBuffer(new byte[1024]));
implMsg.putStringProperty(new SimpleString("Key"), new SimpleString("This String is worthless!"));
Modified: trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/timing/jms/bridge/impl/JMSBridgeImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -414,7 +414,7 @@
bridge.setSourceDestinationFactory(sourceDF);
bridge.setTargetConnectionFactoryFactory(targetCFF);
bridge.setTargetDestinationFactory(targetDF);
- bridge.setFailureRetryInterval(10);
+ bridge.setFailureRetryInterval(100);
bridge.setMaxRetries(1);
bridge.setMaxBatchSize(1);
bridge.setMaxBatchTime(-1);
Modified: trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/paging/impl/PagingStoreImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -741,7 +741,7 @@
{
return 0;
}
-
+
public SimpleString[] getStoreNames()
{
return null;
@@ -940,7 +940,8 @@
/* (non-Javadoc)
* @see org.hornetq.core.persistence.StorageManager#loadMessageJournal(org.hornetq.core.paging.PagingManager, java.util.Map, org.hornetq.core.transaction.ResourceManager, java.util.Map)
*/
- public void loadMessageJournal(PagingManager pagingManager,
+ public void loadMessageJournal(PostOffice postOffice,
+ PagingManager pagingManager,
ResourceManager resourceManager,
Map<Long, Queue> queues,
Map<SimpleString, List<Pair<byte[], Long>>> duplicateIDMap) throws Exception
@@ -1038,7 +1039,7 @@
{
return -1;
}
-
+
public void deleteHeuristicCompletion(long txID) throws Exception
{
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/BindingsImplTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -30,6 +30,7 @@
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
+import org.hornetq.core.server.impl.RoutingContextImpl;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.core.transaction.TransactionOperation;
import org.hornetq.tests.util.UnitTestCase;
@@ -107,11 +108,11 @@
{
if (route)
{
- bind.route(new FakeMessage(), new FakeTransaction());
+ bind.route(new FakeMessage(), new RoutingContextImpl(new FakeTransaction()));
}
else
{
- bind.redistribute(new FakeMessage(), queue, new FakeTransaction());
+ bind.redistribute(new FakeMessage(), queue, new RoutingContextImpl(new FakeTransaction()));
}
}
}
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/DuplicateDetectionUnitTest.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -33,6 +33,7 @@
import org.hornetq.core.server.Queue;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.impl.ResourceManagerImpl;
+import org.hornetq.tests.unit.core.server.impl.fakes.FakePostOffice;
import org.hornetq.tests.util.RandomUtil;
import org.hornetq.tests.util.ServiceTestBase;
import org.hornetq.utils.Pair;
@@ -47,7 +48,6 @@
*/
public class DuplicateDetectionUnitTest extends ServiceTestBase
{
-
// Constants -----------------------------------------------------
// Attributes ----------------------------------------------------
@@ -77,6 +77,8 @@
Configuration configuration = createDefaultConfig();
+ PostOffice postOffice = new FakePostOffice();
+
configuration.start();
configuration.setJournalType(JournalType.ASYNCIO);
@@ -90,7 +92,8 @@
HashMap<SimpleString, List<Pair<byte[], Long>>> mapDups = new HashMap<SimpleString, List<Pair<byte[], Long>>>();
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -110,7 +113,8 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -137,7 +141,8 @@
journal.start();
journal.loadBindingJournal(new ArrayList<QueueBindingInfo>());
- journal.loadMessageJournal(new FakePagingManager(),
+ journal.loadMessageJournal(postOffice,
+ new FakePagingManager(),
new ResourceManagerImpl(0, 0, scheduledThreadPool),
new HashMap<Long, Queue>(),
mapDups);
@@ -196,7 +201,7 @@
{
return 0;
}
-
+
public SimpleString[] getStoreNames()
{
return null;
Modified: trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/postoffice/impl/FakeQueue.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -23,32 +23,27 @@
import org.hornetq.core.server.Distributor;
import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
public class FakeQueue implements Queue
{
-
private SimpleString name;
public FakeQueue(SimpleString name)
{
this.name = name;
}
-
- public void setExpiryAddress(SimpleString expiryAddress)
- {
- // TODO Auto-generated method stub
-
- }
/* (non-Javadoc)
* @see org.hornetq.core.server.Queue#acknowledge(org.hornetq.core.server.MessageReference)
*/
public void acknowledge(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -56,32 +51,17 @@
*/
public void acknowledge(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#activate()
- */
- public boolean activate()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#activateNow(java.util.concurrent.Executor)
- */
- public void activateNow(Executor executor)
- {
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#addConsumer(org.hornetq.core.server.Consumer)
*/
public void addConsumer(Consumer consumer) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -89,7 +69,8 @@
*/
public void addFirst(MessageReference ref)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -97,7 +78,8 @@
*/
public void addLast(MessageReference ref)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -109,21 +91,22 @@
}
-
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
+ * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
*/
- public void cancel(Transaction tx, MessageReference ref) throws Exception
+ public void cancel(MessageReference reference) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.server.MessageReference)
+ * @see org.hornetq.core.server.Queue#cancel(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
*/
- public void cancel(MessageReference reference) throws Exception
+ public void cancel(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -131,7 +114,8 @@
*/
public void cancelRedistributor() throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -139,7 +123,7 @@
*/
public boolean changeReferencePriority(long messageID, byte newPriority) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -148,25 +132,16 @@
*/
public boolean checkDLQ(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#consumerFailedOver()
- */
- public boolean consumerFailedOver()
- {
-
- return false;
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#deleteAllReferences()
*/
public int deleteAllReferences() throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -175,7 +150,7 @@
*/
public int deleteMatchingReferences(Filter filter) throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -184,7 +159,7 @@
*/
public boolean deleteReference(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -193,7 +168,8 @@
*/
public void deliverAsync(Executor executor)
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -201,7 +177,8 @@
*/
public void deliverNow()
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -209,7 +186,8 @@
*/
public void expire(MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -217,25 +195,26 @@
*/
public boolean expireReference(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
+ * @see org.hornetq.core.server.Queue#expireReferences()
*/
- public int expireReferences(Filter filter) throws Exception
+ public void expireReferences() throws Exception
{
-
- return 0;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#expireReferences()
+ * @see org.hornetq.core.server.Queue#expireReferences(org.hornetq.core.filter.Filter)
*/
- public void expireReferences() throws Exception
+ public int expireReferences(Filter filter) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return 0;
}
/* (non-Javadoc)
@@ -243,7 +222,7 @@
*/
public int getConsumerCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -252,7 +231,7 @@
*/
public Set<Consumer> getConsumers()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -261,7 +240,7 @@
*/
public int getDeliveringCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -270,7 +249,7 @@
*/
public Distributor getDistributionPolicy()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -279,7 +258,7 @@
*/
public Filter getFilter()
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -288,7 +267,7 @@
*/
public int getMessageCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -297,7 +276,7 @@
*/
public int getMessagesAdded()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -314,7 +293,7 @@
*/
public long getID()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -323,7 +302,7 @@
*/
public MessageReference getReference(long id)
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -332,7 +311,7 @@
*/
public int getScheduledCount()
{
-
+ // TODO Auto-generated method stub
return 0;
}
@@ -341,25 +320,25 @@
*/
public List<MessageReference> getScheduledMessages()
{
-
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isBackup()
+ * @see org.hornetq.core.server.Queue#isDurable()
*/
- public boolean isBackup()
+ public boolean isDurable()
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isDurable()
+ * @see org.hornetq.core.server.Queue#isPaused()
*/
- public boolean isDurable()
+ public boolean isPaused()
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -368,22 +347,35 @@
*/
public boolean isTemporary()
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#iterator()
+ */
+ public Iterator<MessageReference> iterator()
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#list(org.hornetq.core.filter.Filter)
*/
public List<MessageReference> list(Filter filter)
{
-
+ // TODO Auto-generated method stub
return null;
}
-
- public Iterator<MessageReference> iterator()
+
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#lockDelivery()
+ */
+ public void lockDelivery()
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -391,7 +383,7 @@
*/
public boolean moveReference(long messageID, SimpleString toAddress) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -400,16 +392,26 @@
*/
public int moveReferences(Filter filter, SimpleString toAddress) throws Exception
{
-
+ // TODO Auto-generated method stub
return 0;
}
/* (non-Javadoc)
+ * @see org.hornetq.core.server.Queue#pause()
+ */
+ public void pause()
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#reacknowledge(org.hornetq.core.transaction.Transaction, org.hornetq.core.server.MessageReference)
*/
public void reacknowledge(Transaction tx, MessageReference ref) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -417,7 +419,8 @@
*/
public void referenceHandled()
{
-
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -425,7 +428,7 @@
*/
public boolean removeConsumer(Consumer consumer) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
@@ -434,7 +437,7 @@
*/
public MessageReference removeFirstReference(long id) throws Exception
{
-
+ // TODO Auto-generated method stub
return null;
}
@@ -443,17 +446,17 @@
*/
public MessageReference removeReferenceWithID(long id) throws Exception
{
-
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#resume()
*/
- public MessageReference reroute(ServerMessage message, Transaction tx) throws Exception
+ public void resume()
{
-
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
@@ -461,82 +464,48 @@
*/
public boolean sendMessageToDeadLetterAddress(long messageID) throws Exception
{
-
+ // TODO Auto-generated method stub
return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#setBackup()
- */
- public void setBackup()
- {
-
- }
-
- /* (non-Javadoc)
* @see org.hornetq.core.server.Queue#setDistributionPolicy(org.hornetq.core.server.Distributor)
*/
public void setDistributionPolicy(Distributor policy)
{
-
+ // TODO Auto-generated method stub
+
}
-
/* (non-Javadoc)
- * @see org.hornetq.core.server.Bindable#preroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#setExpiryAddress(org.hornetq.utils.SimpleString)
*/
- public void preroute(ServerMessage message, Transaction tx) throws Exception
+ public void setExpiryAddress(SimpleString expiryAddress)
{
-
+ // TODO Auto-generated method stub
+
}
+ // TODO Auto-generated method stub
+
/* (non-Javadoc)
- * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.server.Queue#unlockDelivery()
*/
- public void route(ServerMessage message, Transaction tx) throws Exception
- {
-
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#lock()
- */
- public void lockDelivery()
- {
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#unlock()
- */
public void unlockDelivery()
{
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#isPaused()
- */
- public boolean isPaused()
- {
// TODO Auto-generated method stub
- return false;
- }
-
- /* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#pause()
- */
- public void pause()
- {
- // TODO Auto-generated method stub
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.Queue#resume()
+ * @see org.hornetq.core.server.Bindable#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public void resume()
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
// TODO Auto-generated method stub
}
+
+
}
\ No newline at end of file
Modified: trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java
===================================================================
--- trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-07 11:22:11 UTC (rev 8063)
+++ trunk/tests/src/org/hornetq/tests/unit/core/server/impl/fakes/FakePostOffice.java 2009-10-07 21:01:20 UTC (rev 8064)
@@ -14,14 +14,14 @@
package org.hornetq.tests.unit.core.server.impl.fakes;
-import java.util.List;
-
import org.hornetq.core.paging.PagingManager;
import org.hornetq.core.postoffice.Binding;
import org.hornetq.core.postoffice.Bindings;
import org.hornetq.core.postoffice.DuplicateIDCache;
import org.hornetq.core.postoffice.PostOffice;
+import org.hornetq.core.server.MessageReference;
import org.hornetq.core.server.Queue;
+import org.hornetq.core.server.RoutingContext;
import org.hornetq.core.server.ServerMessage;
import org.hornetq.core.transaction.Transaction;
import org.hornetq.utils.SimpleString;
@@ -29,125 +29,141 @@
public class FakePostOffice implements PostOffice
{
- public Object getNotificationLock()
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ */
+ public boolean isStarted()
{
- return null;
+ // TODO Auto-generated method stub
+ return false;
}
- public Bindings getMatchingBindings(SimpleString address)
+ /* (non-Javadoc)
+ * @see org.hornetq.core.server.HornetQComponent#start()
+ */
+ public void start() throws Exception
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#activate()
+ * @see org.hornetq.core.server.HornetQComponent#stop()
*/
- public List<Queue> activate()
+ public void stop() throws Exception
{
- return null;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#addBinding(org.hornetq.core.postoffice.Binding)
*/
- public void addBinding(final Binding binding) throws Exception
+ public void addBinding(Binding binding) throws Exception
{
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getBinding(org.hornetq.utils.SimpleString)
*/
- public Binding getBinding(final SimpleString uniqueName)
+ public Binding getBinding(SimpleString uniqueName)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getBindingsForAddress(org.hornetq.utils.SimpleString)
*/
- public Bindings getBindingsForAddress(final SimpleString address) throws Exception
+ public Bindings getBindingsForAddress(SimpleString address) throws Exception
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
* @see org.hornetq.core.postoffice.PostOffice#getDuplicateIDCache(org.hornetq.utils.SimpleString)
*/
- public DuplicateIDCache getDuplicateIDCache(final SimpleString address)
+ public DuplicateIDCache getDuplicateIDCache(SimpleString address)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
+ * @see org.hornetq.core.postoffice.PostOffice#getMatchingBindings(org.hornetq.utils.SimpleString)
*/
- public PagingManager getPagingManager()
+ public Bindings getMatchingBindings(SimpleString address)
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.utils.SimpleString, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.postoffice.PostOffice#getNotificationLock()
*/
- public boolean redistribute(final ServerMessage message, final Queue queue, final Transaction tx) throws Exception
+ public Object getNotificationLock()
{
- return false;
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
+ * @see org.hornetq.core.postoffice.PostOffice#getPagingManager()
*/
- public Binding removeBinding(final SimpleString uniqueName) throws Exception
+ public PagingManager getPagingManager()
{
+ // TODO Auto-generated method stub
return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage)
+ * @see org.hornetq.core.postoffice.PostOffice#redistribute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.server.RoutingContext)
*/
- public void route(final ServerMessage message) throws Exception
+ public boolean redistribute(ServerMessage message, Queue originatingQueue, RoutingContext context) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return false;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.transaction.Transaction)
+ * @see org.hornetq.core.postoffice.PostOffice#removeBinding(org.hornetq.utils.SimpleString)
*/
- public void route(final ServerMessage message, final Transaction tx) throws Exception
+ public Binding removeBinding(SimpleString uniqueName) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
+ * @see org.hornetq.core.postoffice.PostOffice#reroute(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.Queue, org.hornetq.core.transaction.Transaction)
*/
- public void sendQueueInfoToQueue(final SimpleString queueName, final SimpleString address) throws Exception
+ public MessageReference reroute(ServerMessage message, Queue queue, Transaction tx) throws Exception
{
-
+ // TODO Auto-generated method stub
+ return null;
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#isStarted()
+ * @see org.hornetq.core.postoffice.PostOffice#route(org.hornetq.core.server.ServerMessage, org.hornetq.core.server.RoutingContext)
*/
- public boolean isStarted()
+ public void route(ServerMessage message, RoutingContext context) throws Exception
{
- return false;
+ // TODO Auto-generated method stub
+
}
/* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#start()
+ * @see org.hornetq.core.postoffice.PostOffice#sendQueueInfoToQueue(org.hornetq.utils.SimpleString, org.hornetq.utils.SimpleString)
*/
- public void start() throws Exception
+ public void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception
{
-
+ // TODO Auto-generated method stub
+
}
- /* (non-Javadoc)
- * @see org.hornetq.core.server.HornetQComponent#stop()
- */
- public void stop() throws Exception
- {
- }
}
\ No newline at end of file
More information about the hornetq-commits
mailing list