[jboss-cvs] JBoss Messaging SVN: r6313 - in trunk: src/main/org/jboss/messaging/core/postoffice/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Apr 6 08:00:43 EDT 2009
Author: timfox
Date: 2009-04-06 08:00:43 -0400 (Mon, 06 Apr 2009)
New Revision: 6313
Modified:
trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
trunk/src/main/org/jboss/messaging/core/server/Queue.java
trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
Log:
message redistribution with backup
Modified: trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java
===================================================================
--- trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/examples/jms/large-message/src/org/jboss/jms/example/LargeMessageExample.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -68,8 +68,11 @@
//Step 7. Create a BytesMessage with 1MB arbitrary bytes
BytesMessage message = session.createBytesMessage();
- message.writeBytes(new byte[1024 * 1024]);
+ byte[] bytes = new byte[100 * 1024 * 1024];
+ message.writeBytes(bytes);
+ System.out.println("Sending message of " + bytes.length + " bytes");
+
//Step 8. Send the Message
producer.send(message);
@@ -84,7 +87,7 @@
connection.start();
//Step 11. Receive the message
- BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(5000);
+ BytesMessage messageReceived = (BytesMessage) messageConsumer.receive(60000);
System.out.println("Received message: " + messageReceived.getBodyLength() + " bytes");
Modified: trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/postoffice/impl/PostOfficeImpl.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -377,7 +377,8 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+ server.getReplicatingChannel());
}
}
}
@@ -447,7 +448,8 @@
if (redistributionDelay != -1)
{
- queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor());
+ queue.addRedistributor(redistributionDelay, redistributorExecutorFactory.getExecutor(),
+ server.getReplicatingChannel());
}
}
}
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -29,6 +29,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_ADD_REMOTE_QUEUE_BINDING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_CREATESESSION;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REDISTRIBUTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_CONSUMER;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_REMOVE_REMOTE_QUEUE_BINDING;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.REPLICATE_STARTUP_INFO;
@@ -137,6 +138,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXASetTimeoutResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.SessionXAStartMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
@@ -881,6 +883,11 @@
packet = new ReplicateAcknowledgeMessage();
break;
}
+ case REPLICATE_REDISTRIBUTION:
+ {
+ packet = new ReplicateRedistributionMessage();
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -160,6 +160,8 @@
public static final byte REPLICATE_STARTUP_INFO = 96;
+ public static final byte REPLICATE_REDISTRIBUTION = 97;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -24,6 +24,7 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
import org.jboss.messaging.core.remoting.server.RemotingService;
import org.jboss.messaging.core.security.JBMSecurityManager;
import org.jboss.messaging.core.security.Role;
@@ -132,4 +133,6 @@
boolean isInitialised();
Queue createQueue(SimpleString address, SimpleString queueName, SimpleString filter, boolean durable, boolean temporary) throws Exception;
+
+ void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
}
Modified: trunk/src/main/org/jboss/messaging/core/server/Queue.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/Queue.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -22,14 +22,15 @@
package org.jboss.messaging.core.server;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.Executor;
+
import org.jboss.messaging.core.filter.Filter;
+import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.utils.SimpleString;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.Executor;
-
/**
*
* A Queue
@@ -138,7 +139,7 @@
boolean consumerFailedOver();
- void addRedistributor(long delay, Executor executor);
+ void addRedistributor(long delay, Executor executor, final Channel replicatingChannel);
void cancelRedistributor() throws Exception;
Modified: trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/cluster/impl/Redistributor.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -28,6 +28,9 @@
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
+import org.jboss.messaging.core.remoting.Packet;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
@@ -63,11 +66,14 @@
private int count;
+ private final Channel replicatingChannel;
+
public Redistributor(final Queue queue,
final StorageManager storageManager,
final PostOffice postOffice,
final Executor executor,
- final int batchSize)
+ final int batchSize,
+ final Channel replicatingChannel)
{
this.queue = queue;
@@ -78,6 +84,8 @@
this.executor = executor;
this.batchSize = batchSize;
+
+ this.replicatingChannel = replicatingChannel;
}
public Filter getFilter()
@@ -121,38 +129,43 @@
active = false;
}
-
+
public synchronized HandleStatus handle(final MessageReference reference) throws Exception
{
if (!active)
{
return HandleStatus.BUSY;
}
-
- Transaction tx = new TransactionImpl(storageManager);
+
+ final Transaction tx = new TransactionImpl(storageManager);
boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
if (routed)
- {
- queue.referenceHandled();
-
- queue.acknowledge(tx, reference);
-
- tx.commit();
-
- count++;
-
- if (count == batchSize)
+ {
+ if (replicatingChannel == null)
{
- // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
- // long time, in the case there are many messages in the queue
- active = false;
-
- executor.execute(new Prompter());
-
- count = 0;
+ doRedistribute(reference, tx);
}
+ else
+ {
+ Packet packet = new ReplicateRedistributionMessage(queue.getName(), reference.getMessage().getMessageID());
+
+ replicatingChannel.replicatePacket(packet, 1, new Runnable()
+ {
+ public void run()
+ {
+ try
+ {
+ doRedistribute(reference, tx);
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle redistribution", e);
+ }
+ }
+ });
+ }
return HandleStatus.HANDLED;
}
@@ -161,7 +174,29 @@
return HandleStatus.BUSY;
}
}
+
+ private void doRedistribute(final MessageReference reference, final Transaction tx) throws Exception
+ {
+ queue.referenceHandled();
+ queue.acknowledge(tx, reference);
+
+ tx.commit();
+
+ count++;
+
+ if (count == batchSize)
+ {
+ // We continue the next batch on a different thread, so as not to keep the delivery thread busy for a very
+ // long time in the case there are many messages in the queue
+ active = false;
+
+ executor.execute(new Prompter());
+
+ count = 0;
+ }
+ }
+
private class Prompter implements Runnable
{
public void run()
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -65,6 +65,7 @@
import org.jboss.messaging.core.security.SecurityStore;
import org.jboss.messaging.core.security.impl.SecurityStoreImpl;
import org.jboss.messaging.core.server.Divert;
+import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.MessagingServer;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
@@ -76,7 +77,9 @@
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.settings.impl.HierarchicalObjectRepository;
import org.jboss.messaging.core.transaction.ResourceManager;
+import org.jboss.messaging.core.transaction.Transaction;
import org.jboss.messaging.core.transaction.impl.ResourceManagerImpl;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
import org.jboss.messaging.core.version.Version;
import org.jboss.messaging.utils.Future;
import org.jboss.messaging.utils.Pair;
@@ -993,7 +996,37 @@
return queue;
}
+
+ public void handleReplicateRedistribution(final SimpleString queueName, final long messageID)
+ throws Exception
+ {
+ Binding binding = postOffice.getBinding(queueName);
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find queue " + queueName);
+ }
+
+ Queue queue = (Queue)binding.getBindable();
+
+ MessageReference reference = queue.removeFirstReference(messageID);
+
+ Transaction tx = new TransactionImpl(storageManager);
+
+ boolean routed = postOffice.redistribute(reference.getMessage(), queue.getName(), tx);
+
+ if (routed)
+ {
+ queue.acknowledge(tx, reference);
+
+ tx.commit();
+ }
+ else
+ {
+ throw new IllegalStateException("Must be routed");
+ }
+ }
+
// Public
// ---------------------------------------------------------------------------------------
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -35,6 +35,7 @@
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.ReplicateCreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateAcknowledgeMessage;
+import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRedistributionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingAddedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteBindingRemovedMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateRemoteConsumerAddedMessage;
@@ -45,6 +46,8 @@
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.cluster.ClusterConnection;
import org.jboss.messaging.core.server.cluster.RemoteQueueBinding;
+import org.jboss.messaging.core.transaction.Transaction;
+import org.jboss.messaging.core.transaction.impl.TransactionImpl;
/**
* A packet handler for all packets that need to be handled at the server level
@@ -176,6 +179,14 @@
break;
}
+ case PacketImpl.REPLICATE_REDISTRIBUTION:
+ {
+ ReplicateRedistributionMessage message = (ReplicateRedistributionMessage)packet;
+
+ handleReplicateRedistribution(message);
+
+ break;
+ }
default:
{
log.error("Invalid packet " + packet);
@@ -443,4 +454,23 @@
log.error("Failed to handle remove remote consumer", e);
}
}
+
+ private void handleReplicateRedistribution(final ReplicateRedistributionMessage request)
+ {
+ Binding binding = server.getPostOffice().getBinding(request.getQueueName());
+
+ if (binding == null)
+ {
+ throw new IllegalStateException("Cannot find binding " + request.getQueueName());
+ }
+
+ try
+ {
+ server.handleReplicateRedistribution(request.getQueueName(), request.getMessageID());
+ }
+ catch (Exception e)
+ {
+ log.error("Failed to handle remove remote consumer", e);
+ }
+ }
}
\ No newline at end of file
Modified: trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -12,19 +12,36 @@
package org.jboss.messaging.core.server.impl;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
+import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
import org.jboss.messaging.core.list.impl.PriorityLinkedListImpl;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.message.impl.MessageImpl;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ACTUAL_EXPIRY_TIME;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIGINAL_DESTINATION;
-import static org.jboss.messaging.core.message.impl.MessageImpl.HDR_ORIG_MESSAGE_ID;
import org.jboss.messaging.core.paging.PagingManager;
import org.jboss.messaging.core.paging.PagingStore;
import org.jboss.messaging.core.persistence.StorageManager;
import org.jboss.messaging.core.postoffice.Bindings;
import org.jboss.messaging.core.postoffice.PostOffice;
+import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.server.Consumer;
import org.jboss.messaging.core.server.Distributor;
import org.jboss.messaging.core.server.HandleStatus;
@@ -43,21 +60,6 @@
import org.jboss.messaging.utils.ConcurrentSet;
import org.jboss.messaging.utils.SimpleString;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.concurrent.Executor;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.ScheduledFuture;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
/**
* Implementation of a Queue TODO use Java 5 concurrent queue
*
@@ -115,7 +117,7 @@
private final StorageManager storageManager;
private final HierarchicalRepository<AddressSettings> addressSettingsRepository;
-
+
private final ScheduledExecutorService scheduledExecutor;
private volatile boolean backup;
@@ -123,17 +125,17 @@
private int consumersToFailover = -1;
private SimpleString address;
-
+
private Redistributor redistributor;
private final Set<ScheduledFuture<?>> futures = new ConcurrentHashSet<ScheduledFuture<?>>();
-
+
private ScheduledFuture<?> future;
-
- //We cache the consumers here since we don't want to include the redistributor
-
+
+ // We cache the consumers here since we don't want to include the redistributor
+
private final Set<Consumer> consumers = new HashSet<Consumer>();
-
+
public QueueImpl(final long persistenceID,
final SimpleString address,
final SimpleString name,
@@ -162,7 +164,7 @@
this.storageManager = storageManager;
this.addressSettingsRepository = addressSettingsRepository;
-
+
this.scheduledExecutor = scheduledExecutor;
if (postOffice == null)
@@ -251,7 +253,7 @@
{
storageManager.updateScheduledDeliveryTime(ref);
}
-
+
addLast(ref);
}
else
@@ -357,7 +359,7 @@
}
public void addLast(final MessageReference ref)
- {
+ {
add(ref, false);
}
@@ -387,7 +389,7 @@
cancelRedistributor();
distributionPolicy.addConsumer(consumer);
-
+
consumers.add(consumer);
}
@@ -399,58 +401,58 @@
{
promptDelivery = false;
}
-
+
consumers.remove(consumer);
return removed;
}
- public synchronized void addRedistributor(final long delay, final Executor executor)
+ public synchronized void addRedistributor(final long delay, final Executor executor, final Channel replicatingChannel)
{
if (future != null)
{
future.cancel(false);
-
+
futures.remove(future);
}
-
+
if (redistributor != null)
{
- //Just prompt delivery
+ // Just prompt delivery
deliverAsync(executor);
}
-
+
if (delay > 0)
{
- DelayedAddRedistributor dar = new DelayedAddRedistributor(executor);
-
+ DelayedAddRedistributor dar = new DelayedAddRedistributor(executor, replicatingChannel);
+
future = scheduledExecutor.schedule(dar, delay, TimeUnit.MILLISECONDS);
-
+
futures.add(future);
}
else
{
- internalAddRedistributor(executor);
+ internalAddRedistributor(executor, replicatingChannel);
}
}
-
+
public synchronized void cancelRedistributor() throws Exception
- {
+ {
if (redistributor != null)
{
redistributor.stop();
redistributor = null;
}
-
+
if (future != null)
{
future.cancel(false);
-
+
future = null;
}
}
-
+
public synchronized int getConsumerCount()
{
return consumers.size();
@@ -513,25 +515,25 @@
return removed;
}
-
+
public synchronized MessageReference removeFirstReference(final long id) throws Exception
- {
+ {
MessageReference ref = messageReferences.peekFirst();
-
+
if (ref != null && ref.getMessage().getMessageID() == id)
{
messageReferences.removeFirst();
-
+
return ref;
}
else
{
ref = scheduledDeliveryHandler.removeReferenceWithID(id);
}
-
+
return ref;
}
-
+
public synchronized MessageReference getReference(final long id)
{
Iterator<MessageReference> iterator = messageReferences.iterator();
@@ -553,15 +555,15 @@
{
int count = messageReferences.size() + getScheduledCount() + getDeliveringCount();
-// log.info(System.identityHashCode(this) + " message count is " +
-// count +
-// " ( mr:" +
-// messageReferences.size() +
-// " sc:" +
-// getScheduledCount() +
-// " dc:" +
-// getDeliveringCount() +
-// ")");
+ // log.info(System.identityHashCode(this) + " message count is " +
+ // count +
+ // " ( mr:" +
+ // messageReferences.size() +
+ // " sc:" +
+ // getScheduledCount() +
+ // " dc:" +
+ // getDeliveringCount() +
+ // ")");
return count;
}
@@ -628,16 +630,16 @@
synchronized (tx)
{
RefsOperation oper = (RefsOperation)tx.getProperty(TransactionPropertyIndexes.REFS_OPERATION);
-
+
if (oper == null)
{
oper = new RefsOperation();
-
+
tx.putProperty(TransactionPropertyIndexes.REFS_OPERATION, oper);
-
+
tx.addOperation(oper);
}
-
+
return oper;
}
}
@@ -926,7 +928,7 @@
public synchronized void setBackup()
{
backup = true;
-
+
direct = false;
}
@@ -937,7 +939,7 @@
if (consumersToFailover == 0)
{
backup = false;
-
+
return true;
}
else
@@ -1002,7 +1004,7 @@
{
return name.hashCode();
}
-
+
@Override
public String toString()
{
@@ -1012,11 +1014,16 @@
// Private
// ------------------------------------------------------------------------------
- private void internalAddRedistributor(final Executor executor)
- {
+ private void internalAddRedistributor(final Executor executor, final Channel replicatingChannel)
+ {
if (consumers.size() == 0 && messageReferences.size() > 0)
{
- redistributor = new Redistributor(this, storageManager, postOffice, executor, REDISTRIBUTOR_BATCH_SIZE);
+ redistributor = new Redistributor(this,
+ storageManager,
+ postOffice,
+ executor,
+ REDISTRIBUTOR_BATCH_SIZE,
+ replicatingChannel);
distributionPolicy.addConsumer(redistributor);
@@ -1026,7 +1033,6 @@
}
}
-
public boolean checkDLQ(final MessageReference reference) throws Exception
{
ServerMessage message = reference.getMessage();
@@ -1155,7 +1161,9 @@
else
{
- log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress + " from " + name);
+ log.warn("Message has reached maximum delivery attempts, sending it to Dead Letter Address " + deadLetterAddress +
+ " from " +
+ name);
move(deadLetterAddress, ref, false);
}
}
@@ -1198,7 +1206,7 @@
{
return;
}
-
+
direct = false;
MessageReference reference;
@@ -1232,9 +1240,10 @@
// If the queue is empty, we need to check if there are pending messages, and throw a warning
if (pagingStore.isPaging() && !pagingStore.isDropWhenMaxSize())
{
- // This is just a *request* to depage. Depage will only happens if there is space on the Address and GlobalSize
+ // This is just a *request* to depage. Depage will only happens if there is space on the Address
+ // and GlobalSize
pagingStore.startDepaging();
-
+
log.warn("The Queue " + name +
" is empty, however there are pending messages on Paging for the address " +
pagingStore.getStoreName() +
@@ -1268,12 +1277,12 @@
if (status == HandleStatus.HANDLED)
{
if (iterator == null)
- {
- messageReferences.removeFirst();
+ {
+ messageReferences.removeFirst();
}
else
{
- iterator.remove();
+ iterator.remove();
}
}
else if (status == HandleStatus.BUSY)
@@ -1304,7 +1313,7 @@
}
boolean add = false;
-
+
if (direct && !backup)
{
// Deliver directly
@@ -1340,9 +1349,9 @@
{
expiringMessageReferences.addIfAbsent(ref);
}
-
+
if (first)
- {
+ {
messageReferences.addFirst(ref, ref.getMessage().getPriority());
}
else
@@ -1362,7 +1371,7 @@
}
}
}
-
+
private HandleStatus deliver(final MessageReference reference)
{
HandleStatus status = distributionPolicy.distribute(reference);
@@ -1442,7 +1451,7 @@
ServerMessage msg = ref.getMessage();
if (!scheduledDeliveryHandler.checkAndSchedule(ref, backup))
- {
+ {
messageReferences.addFirst(ref, msg.getPriority());
}
}
@@ -1498,7 +1507,6 @@
for (MessageReference ref : refsToAck)
{
- //if (((QueueImpl)ref.getQueue()).checkDLQ(ref))
if (ref.getQueue().checkDLQ(ref))
{
LinkedList<MessageReference> toCancel = queueMap.get(ref.getQueue());
@@ -1570,22 +1578,26 @@
}
}
}
-
+
private class DelayedAddRedistributor implements Runnable
{
private final Executor executor;
+
+ private final Channel replicatingChannel;
- DelayedAddRedistributor(final Executor executor)
+ DelayedAddRedistributor(final Executor executor, final Channel replicatingChannel)
{
this.executor = executor;
+
+ this.replicatingChannel = replicatingChannel;
}
public void run()
{
synchronized (QueueImpl.this)
{
- internalAddRedistributor(executor);
-
+ internalAddRedistributor(executor, replicatingChannel);
+
futures.remove(this);
}
}
Modified: trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java
===================================================================
--- trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-04 00:30:06 UTC (rev 6312)
+++ trunk/tests/src/org/jboss/messaging/tests/integration/cluster/distribution/ClusterTestBase.java 2009-04-06 12:00:43 UTC (rev 6313)
@@ -412,13 +412,18 @@
{
sendInRange(node, address, 0, numMessages, durable, filterVal);
}
-
+
+ protected void verifyReceiveAllInRange(boolean ack, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+ {
+ verifyReceiveAllInRangeNotBefore(ack, -1, msgStart, msgEnd, consumerIDs);
+ }
+
protected void verifyReceiveAllInRange(int msgStart, int msgEnd, int... consumerIDs) throws Exception
{
- verifyReceiveAllInRangeNotBefore(-1, msgStart, msgEnd, consumerIDs);
+ verifyReceiveAllInRangeNotBefore(false, -1, msgStart, msgEnd, consumerIDs);
}
- protected void verifyReceiveAllInRangeNotBefore(long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
+ protected void verifyReceiveAllInRangeNotBefore(boolean ack, long firstReceiveTime, int msgStart, int msgEnd, int... consumerIDs) throws Exception
{
boolean outOfOrder = false;
for (int i = 0; i < consumerIDs.length; i++)
@@ -436,6 +441,11 @@
assertNotNull("consumer " + consumerIDs[i] + " did not receive message " + j, message);
+ if (ack)
+ {
+ message.acknowledge();
+ }
+
if (firstReceiveTime != -1)
{
assertTrue("Message received too soon", System.currentTimeMillis() >= firstReceiveTime);
@@ -451,15 +461,20 @@
assertFalse("Messages were consumed out of order, look at System.out for more information", outOfOrder);
}
+
+ protected void verifyReceiveAll(boolean ack, int numMessages, int... consumerIDs) throws Exception
+ {
+ verifyReceiveAllInRange(ack, 0, numMessages, consumerIDs);
+ }
protected void verifyReceiveAll(int numMessages, int... consumerIDs) throws Exception
{
- verifyReceiveAllInRange(0, numMessages, consumerIDs);
+ verifyReceiveAllInRange(false, 0, numMessages, consumerIDs);
}
protected void verifyReceiveAllNotBefore(long firstReceiveTime, int numMessages, int... consumerIDs) throws Exception
{
- verifyReceiveAllInRangeNotBefore(firstReceiveTime, 0, numMessages, consumerIDs);
+ verifyReceiveAllInRangeNotBefore(false, firstReceiveTime, 0, numMessages, consumerIDs);
}
protected void checkReceive(int... consumerIDs) throws Exception
More information about the jboss-cvs-commits
mailing list