[jboss-cvs] JBoss Messaging SVN: r7596 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/management/impl and 10 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Jul 20 18:34:58 EDT 2009
Author: timfox
Date: 2009-07-20 18:34:58 -0400 (Mon, 20 Jul 2009)
New Revision: 7596
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -277,7 +277,9 @@
{
checkClosed();
+ // log.info("sending delete queue");
channel.sendBlocking(new SessionDeleteQueueMessage(queueName));
+ // log.info("sent delete queue");
}
public void deleteQueue(final String queueName) throws MessagingException
@@ -676,11 +678,11 @@
closedSent = true;
- // log.info(System.identityHashCode(this) + " session sending close message");
-
+ // log.info(System.identityHashCode(this) + " session sending close message");
+
channel.sendBlocking(new SessionCloseMessage());
-
- //log.info(System.identityHashCode(this) + " session sent close message");
+
+ // log.info(System.identityHashCode(this) + " session sent close message");
}
catch (Throwable ignore)
{
@@ -725,23 +727,58 @@
try
{
- // log.info(System.identityHashCode(this) + " session handling failover");
-
- //Prevent any more packets being handled on the old connection
- channel.getConnection().freeze();
-
- while (channel.getConnection().getExecutingThread() != null)
+ // log.info(System.identityHashCode(this) + " session handling failover");
+
+ channel.getConnection().freeze();
+
+ while (true)
+ {
+ // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
+
+ Thread thread = channel.getConnection().getExecutingThread();
+
+ if (thread == null)
+ {
+ break;
+ }
+
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+
+ // Prevent any more packets being handled on the old connection
+ channel.setFrozen(true);
+
+ while (true)
{
+ // Set<Thread> executingThreads = channel.getConnection().getExecutingThreads();
+
+ Thread thread = channel.getExecutingThread();
+
+ if (thread == null)
+ {
+ break;
+ }
+
try
{
Thread.sleep(1);
}
catch (InterruptedException ignore)
- {
+ {
}
}
+
+ channel.waitForAllExecutions();
+
+ channel.transferConnection(backupConnection);
- channel.transferConnection(backupConnection);
+ // log.info("unfreezing");
backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
@@ -749,28 +786,31 @@
int lid = channel.getLastConfirmedCommandID();
- // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+ channel.setFrozen(false);
+ // log.info(System.identityHashCode(this) + " last received command id on client side is " + lid);
+
Packet request = new ReattachSessionMessage(name, lid);
- Channel channel1 = backupConnection.getChannel(1, -1, false);
+ Channel channel1 = backupConnection.getChannel(1, -1, false, false);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
-
- // log.info(System.identityHashCode(this) + " got response from reattach session");
-
+
+ // log.info(System.identityHashCode(this) + " got response from reattach session");
+
if (!response.isRemoved())
{
- //log.info(System.identityHashCode(this) + " found session, server last received command id is " + response.getLastConfirmedCommandID());
-
+ // log.info(System.identityHashCode(this) + " found session, server last received command id is " +
+ // response.getLastConfirmedCommandID());
+
channel.replayCommands(response.getLastConfirmedCommandID());
ok = true;
}
else
{
- // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
-
+ // log.info(System.identityHashCode(this) + " didn't find session, closed sent " + closedSent);
+
if (closedSent)
{
// a session re-attach may fail, if the session close was sent before failover started, hit the server,
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionPacketHandler.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -112,14 +112,5 @@
}
channel.confirm(packet);
-
- if (packet.getType() == SESS_RECEIVE_MSG)
- {
- SessionReceiveMessage message = (SessionReceiveMessage) packet;
-
- int cnt = (Integer)message.getClientMessage().getProperty(new SimpleString("count"));
-
- // log.info("confirmed on client " + cnt);
- }
}
}
\ No newline at end of file
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -299,7 +299,7 @@
"Unable to connect to server using configuration " + connectorConfig);
}
- channel1 = connection.getChannel(1, -1, false);
+ channel1 = connection.getChannel(1, -1, false, false);
// Lock it - this must be done while the failoverLock is held
channel1.getLock().lock();
@@ -346,7 +346,8 @@
Channel sessionChannel = connection.getChannel(sessionChannelID,
producerWindowSize,
- producerWindowSize != -1);
+ producerWindowSize != -1,
+ false);
ClientSessionInternal session = new ClientSessionImpl(this,
name,
@@ -1039,7 +1040,7 @@
Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
- Channel channel0 = conn.getChannel(0, -1, false);
+ Channel channel0 = conn.getChannel(0, -1, false, false);
channel0.send(ping);
@@ -1133,7 +1134,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false);
+ Channel channel1 = entry.connection.getChannel(1, -1, false, false);
channel1.getLock().lock();
}
@@ -1143,7 +1144,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false);
+ Channel channel1 = entry.connection.getChannel(1, -1, false, false);
channel1.getLock().unlock();
}
@@ -1153,7 +1154,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false);
+ Channel channel1 = entry.connection.getChannel(1, -1, false, false);
channel1.returnBlocking();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/management/impl/ManagementServiceImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -116,8 +116,6 @@
private MessagingServerControlImpl messagingServerControl;
- // private MessagingServer messagingServer;
-
private final MessageCounterManager messageCounterManager;
private final SimpleString managementNotificationAddress;
@@ -136,8 +134,6 @@
private boolean notificationsEnabled;
- // private final Set<NotificationListener> listeners = new org.jboss.messaging.utils.ConcurrentHashSet<NotificationListener>();
-
private ReplicationOperationInvoker replicationInvoker;
private ClusterQueueStateManager clusterQueueStateManager;
@@ -210,7 +206,6 @@
this.securityRepository = securityRepository;
this.storageManager = storageManager;
this.clusterQueueStateManager = clusterQueueStateManager;
- // this.messagingServer = messagingServer;
JBMSecurityManager sm = messagingServer.getSecurityManager();
if (sm != null)
@@ -687,9 +682,6 @@
{
continue;
}
- // System.out.format("param=%s, expecting=%s\n", params[i].getClass(), paramTypes[i]);
- // System.out.println(!paramTypes[i].isAssignableFrom(params[i].getClass()));
- // System.out.println(paramTypes[i] == Long.TYPE && params[i].getClass() == Integer.class);
if (paramTypes[i].isAssignableFrom(params[i].getClass()) || (paramTypes[i] == Long.TYPE && params[i].getClass() == Integer.class) ||
(paramTypes[i] == Double.TYPE && params[i].getClass() == Integer.class) ||
(paramTypes[i] == Long.TYPE && params[i].getClass() == Long.class) ||
@@ -718,22 +710,7 @@
{
throw new IllegalArgumentException("no operation " + operation + "/" + params.length);
}
- // System.out.println(method.getName());
- // for (Class<?> parameters : method.getParameterTypes())
- // {
- // System.out.println(parameters);
- // }
- // System.out.println("===");
- // for (Object object : params)
- // {
- // if (object == null)
- // {
- // System.out.println("null");
- // } else
- // {
- // System.out.println(object.getClass());
- // }
- // }
+
Object result = method.invoke(resource, params);
return result;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/postoffice/PostOffice.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -73,9 +73,5 @@
PagingManager getPagingManager();
- DuplicateIDCache getDuplicateIDCache(SimpleString address);
-
- //void sendQueueInfoToQueue(SimpleString queueName, SimpleString address) throws Exception;
-
- // Object getNotificationLock();
+ DuplicateIDCache getDuplicateIDCache(SimpleString address);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/Channel.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -68,4 +68,13 @@
//void waitForAllReplicationResponse();
//void replicationResponseReceived(Replicator replicator, int count);
+
+ Thread getExecutingThread();
+
+ //void freeze();
+
+ void setFrozen(boolean frozen);
+
+ void waitForAllExecutions();
+
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -12,13 +12,14 @@
package org.jboss.messaging.core.remoting;
+import java.util.List;
+import java.util.Set;
+
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.remoting.spi.BufferHandler;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import java.util.List;
-
/**
* A RemotingConnection
*
@@ -31,7 +32,7 @@
String getRemoteAddress();
- Channel getChannel(long channelID, int windowSize, boolean block);
+ Channel getChannel(long channelID, int windowSize, boolean block, boolean async);
void putChannel(long channelID, Channel channel);
@@ -80,4 +81,12 @@
RemotingConnection getReplicatingConnection();
Thread getExecutingThread();
+
+ List<Interceptor> getInterceptors();
+
+ void setFrozenAllChannels(boolean frozen);
+
+ void waitForAllExecutions();
+
+ Set<Thread> getExecutingThreads();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -25,7 +25,9 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.PACKETS_CONFIRMED;
+import java.util.List;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
@@ -37,15 +39,14 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.CommandConfirmationHandler;
+import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketsConfirmedMessage;
-import org.jboss.messaging.core.remoting.impl.wireformat.SessionReceiveMessage;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
-import org.jboss.messaging.core.server.ServerMessage;
-import org.jboss.messaging.utils.SimpleString;
+import org.jboss.messaging.utils.Future;
/**
* A ChannelImpl
@@ -92,11 +93,17 @@
private final Semaphore sendSemaphore;
+ private final Executor executor;
+
private int receivedBytes;
private CommandConfirmationHandler commandConfirmationHandler;
- public ChannelImpl(final RemotingConnection connection, final long id, final int windowSize, final boolean block)
+ public ChannelImpl(final RemotingConnection connection,
+ final long id,
+ final int windowSize,
+ final boolean block,
+ final Executor executor)
{
this.connection = connection;
@@ -125,6 +132,7 @@
sendSemaphore = null;
}
+ this.executor = executor;
}
public long getID()
@@ -288,10 +296,6 @@
if (resendCache != null && packet.isRequiresConfirmations())
{
-// if (packet.getType() == PacketImpl.SESS_CLOSE)
-// {
-// log.info(System.identityHashCode(this) + " added session close to resend cache");
-// }
resendCache.add(packet);
}
@@ -402,15 +406,15 @@
public void replayCommands(final int otherLastConfirmedCommandID)
{
- // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
-
+ // log.info(connection.isClient() + " replaying, other last command id " + otherLastConfirmedCommandID);
+
if (otherLastConfirmedCommandID != -1)
{
clearUpTo(otherLastConfirmedCommandID);
}
- // log.info("Resend cache size is " + resendCache.size());
-
+ // log.info("Resend cache size is " + resendCache.size());
+
for (final Packet packet : resendCache)
{
doWrite(packet);
@@ -445,30 +449,28 @@
public void flushConfirmations()
{
int lcid = this.lastConfirmedCommandID;
-
+
if (receivedBytes != 0 && connection.isActive() && lcid != -1)
{
receivedBytes = 0;
- // log.info("Sending packets confirmed from flush");
-
- sendConfirmation(lcid);
+ // log.info("Sending packets confirmed from flush");
+
+ sendConfirmation(lcid);
}
}
-
+
private void sendConfirmation(final int lastConfirmedID)
{
final Packet confirmed = new PacketsConfirmedMessage(lastConfirmedID);
confirmed.setChannelID(id);
- //We need to queue packet confirmations too
+ // We need to queue packet confirmations too
if (!queuedWriteManager.tryQueue(confirmed))
{
- // log.info(connection.isClient() + " writing packets confirmed " + lastConfirmedID + " " + System.identityHashCode(this));
-
- doWrite(confirmed);
- }
+ doWrite(confirmed);
+ }
}
public void confirm(final Packet packet)
@@ -476,9 +478,9 @@
if (resendCache != null && packet.isRequiresConfirmations())
{
lastConfirmedCommandID++;
-
- // log.info("sending confirm from confirm");
+ // log.info("sending confirm from confirm");
+
receivedBytes += packet.getPacketSize();
if (receivedBytes >= confWindowSize)
@@ -487,7 +489,7 @@
if (connection.isActive())
{
- sendConfirmation(lastConfirmedCommandID);
+ sendConfirmation(lastConfirmedCommandID);
}
}
}
@@ -495,46 +497,183 @@
public void handlePacket(final Packet packet)
{
- if (packet.getType() == PACKETS_CONFIRMED)
+ if (executor == null)
{
- if (resendCache != null)
+ this.doHandlePacket(packet);
+ }
+ else
+ {
+ executor.execute(new Runnable()
{
- final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+ public void run()
+ {
+ try
+ {
+ doHandlePacket(packet);
+ }
+ catch (Throwable t)
+ {
+ log.error("Failed to handle packet", t);
+ }
+ }
+ });
+ }
+ }
- clearUpTo(msg.getCommandID());
- }
+ private volatile boolean frozen;
- if (!connection.isClient())
+ private volatile Thread currentThread;
+
+ public void setFrozen(final boolean f)
+ {
+ if (f)
+ {
+ this.frozen = f;
+ }
+ else
+ {
+ if (this.executor == null)
{
- handler.handlePacket(packet);
+ this.frozen = false;
}
-
- return;
+ else
+ {
+ executor.execute(new Runnable()
+ {
+ public void run()
+ {
+ frozen = false;
+ }
+ });
+ }
}
- else
+ }
+
+ public Thread getExecutingThread()
+ {
+ return currentThread;
+ }
+
+ public void waitForAllExecutions()
+ {
+ if (executor != null)
{
- if (packet.isResponse())
+ Future f = new Future();
+
+ executor.execute(f);
+
+ boolean ok = f.await(5000);
+
+ if (!ok)
{
- response = packet;
+ throw new IllegalStateException("Timedout out waiting for channel executions to complete");
+ }
+ }
+ }
+
+ /*
+ * Thread sat on A) (below)
+ * rc set frozen
+ * all channels set frozen
+ * channel waited for
+ * channel unfrozen - thread executes!!!
+ *
+ *
+ *
+ * set rc frozen and wait for all rc threads to exit
+ * freeze all channels
+ * wait for all channels to exit
+ * add an executor to unfreeze channels
+ *
+ * there's another race - if a thread is on A
+ * then set rc frozen
+ * freeze all channels
+ *
+ */
- confirm(packet);
+ private void doHandlePacket(final Packet packet)
+ {
+ //A
+ currentThread = Thread.currentThread();
- lock.lock();
+ try
+ {
+ if (!frozen)
+ {
+ List<Interceptor> interceptors = connection.getInterceptors();
- try
+ if (interceptors != null)
{
- sendCondition.signal();
+ for (final Interceptor interceptor : interceptors)
+ {
+ try
+ {
+ boolean callNext = interceptor.intercept(packet, connection);
+
+ if (!callNext)
+ {
+ // abort
+
+ return;
+ }
+ }
+ catch (final Throwable e)
+ {
+ log.warn("Failure in calling interceptor: " + interceptor, e);
+ }
+ }
}
- finally
+
+ if (packet.getType() == PACKETS_CONFIRMED)
{
- lock.unlock();
+ if (resendCache != null)
+ {
+ final PacketsConfirmedMessage msg = (PacketsConfirmedMessage)packet;
+
+ clearUpTo(msg.getCommandID());
+ }
+
+ if (!connection.isClient())
+ {
+ handler.handlePacket(packet);
+ }
+
+ return;
}
+ else
+ {
+ if (packet.isResponse())
+ {
+ response = packet;
+
+ confirm(packet);
+
+ lock.lock();
+
+ try
+ {
+ sendCondition.signal();
+ }
+ finally
+ {
+ lock.unlock();
+ }
+ }
+ else if (handler != null)
+ {
+ handler.handlePacket(packet);
+ }
+ }
}
- else if (handler != null)
- {
- handler.handlePacket(packet);
- }
+// else
+// {
+// log.info("It's frozen");
+// }
}
+ finally
+ {
+ currentThread = null;
+ }
}
private void doWrite(final Packet packet)
@@ -548,16 +687,18 @@
private void clearUpTo(final int lastConfirmedCommandID)
{
- //log.info(System.identityHashCode(this) + " clear up to " + lastConfirmedCommandID + " on client " + connection.isClient() + " fscid " + this.firstStoredCommandID);
+ //log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID + " first stored " + firstStoredCommandID);
if (lastConfirmedCommandID < firstStoredCommandID)
{
- //This can legitimately happen, if the flushConfirmations() is called from the other side which causes a packet confirmation to be sent, after that
- //another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the other side there may be overlap
- //because of the previously flush. In this case we can safely ignore it.
+ // This can legitimately happen, if the flushConfirmations() is called from the other side which causes a
+ // packet confirmation to be sent, after that
+ // another packet confirmation can come or on failover when the lastConfirmedCommandID is retrieved from the
+ // other side there may be overlap
+ // because of the previously flush. In this case we can safely ignore it.
return;
}
-
+
final int numberToClear = 1 + lastConfirmedCommandID - firstStoredCommandID;
int sizeToFree = 0;
@@ -569,13 +710,11 @@
if (packet == null)
{
throw new IllegalStateException(System.identityHashCode(this) + " Can't find packet to clear: " +
- " last received command id " +
+ " last confirmed command id " +
lastConfirmedCommandID +
" first stored command id " +
firstStoredCommandID);
}
-
- // log.info("cleared packet " + packet);
if (packet.getType() != PACKETS_CONFIRMED)
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -71,7 +71,7 @@
this.connectionFailedAction = connectionFailedAction;
- this.channel0 = conn.getChannel(0, -1, false);
+ this.channel0 = conn.getChannel(0, -1, false, false);
this.lastPingReceived = lastPingReceived;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -13,10 +13,13 @@
package org.jboss.messaging.core.remoting.impl;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
import org.jboss.messaging.core.exception.MessagingException;
import org.jboss.messaging.core.logging.Logger;
@@ -28,6 +31,8 @@
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
+import org.jboss.messaging.utils.ExecutorFactory;
+import org.jboss.messaging.utils.OrderedExecutorFactory;
import org.jboss.messaging.utils.SimpleIDGenerator;
/**
@@ -73,12 +78,14 @@
private boolean idGeneratorSynced = false;
- private volatile boolean frozen;
+ // private volatile boolean frozen;
private final Object failLock = new Object();
private final PacketDecoder decoder = new PacketDecoder();
+ private final ExecutorFactory orderedFactory;
+
// Constructors
// ---------------------------------------------------------------------------------
@@ -89,7 +96,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
+ this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
}
/*
@@ -98,10 +105,11 @@
public RemotingConnectionImpl(final Connection transportConnection,
final RemotingConnection replicatingConnection,
final List<Interceptor> interceptors,
- final boolean active)
+ final boolean active,
+ final Executor threadPool)
{
- this(transportConnection, replicatingConnection, -1, interceptors, active, false);
+ this(transportConnection, replicatingConnection, -1, interceptors, active, false, threadPool);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -109,7 +117,8 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean active,
- final boolean client)
+ final boolean client,
+ final Executor threadPool)
{
this.transportConnection = transportConnection;
@@ -122,12 +131,26 @@
this.active = active;
- this.client = client;
+ this.client = client;
+
+ if (threadPool != null)
+ {
+ this.orderedFactory = new OrderedExecutorFactory(threadPool);
+ }
+ else
+ {
+ this.orderedFactory = null;
+ }
}
// RemotingConnection implementation
// ------------------------------------------------------------
+ public List<Interceptor> getInterceptors()
+ {
+ return interceptors;
+ }
+
public Connection getTransportConnection()
{
return this.transportConnection;
@@ -155,13 +178,16 @@
return transportConnection.getRemoteAddress();
}
- public synchronized Channel getChannel(final long channelID, final int windowSize, final boolean block)
+ public synchronized Channel getChannel(final long channelID,
+ final int windowSize,
+ final boolean block,
+ final boolean async)
{
Channel channel = channels.get(channelID);
if (channel == null)
{
- channel = new ChannelImpl(this, channelID, windowSize, block);
+ channel = new ChannelImpl(this, channelID, windowSize, block, async ? this.orderedFactory.getExecutor() : null);
channels.put(channelID, channel);
}
@@ -319,51 +345,48 @@
// Buffer Handler implementation
// ----------------------------------------------------
- private volatile Thread currentThread;
-
+ // private volatile Thread currentThread;
+
public void activate()
{
active = true;
}
+ private volatile Thread currentThread;
+
+ private volatile boolean frozen;
+
+
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
+ // if (packet.getType() == PacketImpl.REPLICATE_LOCK_SEQUENCES)
+ // {
+ // ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
+ //
+ // log.info("received sequences " + msg.getID());
+ // }
+
currentThread = Thread.currentThread();
try
{
if (!frozen)
{
- if (interceptors != null)
- {
- for (final Interceptor interceptor : interceptors)
- {
- try
- {
- boolean callNext = interceptor.intercept(packet, this);
+ Channel channel = channels.get(packet.getChannelID());
- if (!callNext)
- {
- // abort
-
- return;
- }
- }
- catch (final Throwable e)
- {
- log.warn("Failure in calling interceptor: " + interceptor, e);
- }
- }
- }
-
- final Channel channel = channels.get(packet.getChannelID());
-
+ //A
+
+
if (channel != null)
{
channel.handlePacket(packet);
}
+ // else
+ // {
+ // log.info("channel is null");
+ // }
}
}
finally
@@ -372,16 +395,61 @@
}
}
+ /*
+ * Thread sitting on A)
+ *
+ * connection is frozen
+ * channels are frozen
+ * wait for all threads
+ * unfreeze channels
+ * ** channel processes invocation (by thread sat on A)
+ *
+ */
+
public void freeze()
{
- frozen = true;
+ frozen = true;
}
-
+
public Thread getExecutingThread()
{
return currentThread;
}
-
+
+ public void setFrozenAllChannels(final boolean frozen)
+ {
+ for (Channel channel : this.channels.values())
+ {
+ channel.setFrozen(frozen);
+ }
+ }
+
+ public void waitForAllExecutions()
+ {
+ for (Channel channel : this.channels.values())
+ {
+ // channel.setFrozen(frozen);
+ channel.waitForAllExecutions();
+ }
+ }
+
+ public Set<Thread> getExecutingThreads()
+ {
+ Set<Thread> threads = new HashSet<Thread>();
+
+ for (Channel channel : this.channels.values())
+ {
+ Thread thread = channel.getExecutingThread();
+
+ if (thread != null)
+ {
+ threads.add(thread);
+ }
+ }
+
+ return threads;
+ }
+
// Package protected
// ----------------------------------------------------------------------------
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/replication/ReplicateLockSequenceMessage.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -36,17 +36,26 @@
// Attributes ----------------------------------------------------
private List<Triple<Long, Long, Integer>> sequences;
+
+ private long id;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public ReplicateLockSequenceMessage(final List<Triple<Long, Long, Integer>> sequences)
+ public ReplicateLockSequenceMessage(final long id, final List<Triple<Long, Long, Integer>> sequences)
{
super(REPLICATE_LOCK_SEQUENCES);
this.sequences = sequences;
+
+ this.id = id;
}
+
+ public long getID()
+ {
+ return id;
+ }
// Public --------------------------------------------------------
@@ -60,7 +69,7 @@
return BASIC_PACKET_SIZE + DataConstants.SIZE_INT +
sequences.size() *
(2 * DataConstants.SIZE_LONG +
- DataConstants.SIZE_INT);
+ DataConstants.SIZE_INT) + DataConstants.SIZE_LONG;
}
@Override
@@ -73,6 +82,7 @@
buffer.writeLong(sequence.b);
buffer.writeInt(sequence.c);
}
+ buffer.writeLong(id);
}
@Override
@@ -87,6 +97,7 @@
buffer.readInt());
sequences.add(pair);
}
+ id = buffer.readLong();
}
public List<Triple<Long, Long, Integer>> getSequences()
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -21,8 +21,11 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -92,7 +95,7 @@
private volatile RemotingConnection serverSideReplicatingConnection;
- private final Executor threadPool;
+ private final ExecutorService threadPool;
private final ScheduledExecutorService scheduledThreadPool;
@@ -107,7 +110,7 @@
public RemotingServiceImpl(final Configuration config,
final MessagingServer server,
final ManagementService managementService,
- final Executor threadPool,
+ final ExecutorService threadPool,
final ScheduledExecutorService scheduledThreadPool,
final int managementConnectorID)
{
@@ -228,7 +231,7 @@
for (RemotingConnection connection : connections.values())
{
- connection.getChannel(0, -1, false).sendAndFlush(new PacketImpl(DISCONNECT));
+ connection.getChannel(0, -1, false, false).sendAndFlush(new PacketImpl(DISCONNECT));
}
for (Acceptor acceptor : acceptors)
@@ -282,22 +285,46 @@
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
}
- RemotingConnection replicatingConnection = server.getNonPooledReplicatingConnection();
+ Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+ {
+ public RemotingConnection call()
+ {
+ return server.getNonPooledReplicatingConnection();
+ }
+ });
+ RemotingConnection replicatingConnection;
+
+ try
+ {
+ replicatingConnection = result.get();
+ }
+ catch (ExecutionException e)
+ {
+ log.error("Failed to get replicating conection", e);
+ return;
+ }
+ catch (InterruptedException e)
+ {
+ log.error("Interrupted", e);
+ return;
+ }
+
RemotingConnection rc = new RemotingConnectionImpl(connection,
replicatingConnection,
interceptors,
- !config.isBackup());
+ !config.isBackup(),
+ threadPool);
- Channel channel1 = rc.getChannel(1, -1, false);
+ Channel channel1 = rc.getChannel(1, -1, false, false);
final Replicator replicator;
if (replicatingConnection != null)
{
- Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false);
+ Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false, false);
- replicator = new ReplicatorImpl(replicatingChannel);
+ replicator = new ReplicatorImpl("mess server", replicatingChannel);
replicatingChannel.setHandler(new ChannelHandler()
{
@@ -461,7 +488,7 @@
{
this.conn = conn;
- conn.getChannel(0, -1, false).setHandler(this);
+ conn.getChannel(0, -1, false, false).setHandler(this);
}
public synchronized void handlePacket(final Packet packet)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -20,7 +20,9 @@
import java.util.Map;
import java.util.Set;
import java.util.Map.Entry;
+import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -585,7 +587,7 @@
currentSession.getChannel().close();
}
- Channel channel = connection.getChannel(channelID, sendWindowSize, false);
+ Channel channel = connection.getChannel(channelID, sendWindowSize, false, configuration.isBackup());
RemotingConnection replicatingConnection = connection.getReplicatingConnection();
@@ -595,9 +597,9 @@
if (replicatingConnection != null)
{
- replicatingChannel = replicatingConnection.getChannel(channelID, -1, false);
+ replicatingChannel = replicatingConnection.getChannel(channelID, -1, false, false);
- replicator = new ReplicatorImpl(replicatingChannel);
+ replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
replicatingChannel.setHandler(new ChannelHandler()
{
@@ -842,7 +844,7 @@
if (replicator != null)
{
- Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false);
+ Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false, false);
channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
}
@@ -943,7 +945,7 @@
Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
- Channel channel1 = conn.getChannel(1, -1, false);
+ Channel channel1 = conn.getChannel(1, -1, false, false);
ChannelHandler prevHandler = channel1.getHandler();
@@ -1028,7 +1030,8 @@
{
if (configuration.isBackup())
{
- log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed.");
+ log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed."
+ );
synchronized (this)
{
@@ -1094,41 +1097,90 @@
flock.unlock();
- freezeConnections();
+ for (RemotingConnection rc : backupConnections)
+ {
+ rc.freeze();
+
+ rc.setFrozenAllChannels(true);
+ }
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ while (true)
+ {
+ Thread thr = rc.getExecutingThread();
+
+ if (thr == null)
+ {
+ break;
+ }
+
+ try
+ {
+ Thread.sleep(1);
+ }
+ catch (InterruptedException ignore)
+ {
+ }
+ }
+ }
- //We set a latch on each ReplicationAwareMutex
+ // We set a latch on each ReplicationAwareMutex
ReplicationAwareMutex.setLatchAll();
long start = System.currentTimeMillis();
- //We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we interrupt them
- //until they fall through to the other latch
-
+ // We wait for all executing threads to end up on this latch. If they instead end up on a SequencedLock we
+ // interrupt them
+ // until they fall through to the other latch
+
for (RemotingConnection rc : backupConnections)
{
while (true)
{
- JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+ // log.info("in loop");
+ Set<Thread> executingThreads = rc.getExecutingThreads();
- if (executingThread == null)
+ boolean exit = true;
+
+ // log.info("executing threads " + executingThreads.isEmpty());
+
+ for (Thread executingThread : executingThreads)
{
- break;
+ if (executingThread instanceof JBMThread)
+ {
+ JBMThread jthread = (JBMThread)executingThread;
+
+ if (jthread.isWaitingOnMutex())
+ {
+ //log.info("Thread " + jthread + " is waiting on mutex");
+
+ jthread.setNoReplayOrRecord(0);
+ }
+ else if (jthread.isWaitingOnSequencedLock())
+ {
+ jthread.setFrozen();
+
+ executingThread.interrupt();
+
+ exit = false;
+ }
+ else
+ {
+ exit = false;
+ }
+ }
+ else
+ {
+ exit = false;
+ }
}
-
- if (executingThread.isWaitingOnMutex())
+
+ if (exit)
{
- executingThread.setNoReplayOrRecord(0);
-
break;
}
- if (executingThread.isWaitingOnSequencedLock())
- {
- executingThread.setFrozen();
-
- executingThread.interrupt();
- }
-
try
{
Thread.sleep(1);
@@ -1141,23 +1193,28 @@
{
throw new IllegalStateException("Timed out waiting for threads to exit or reach latch");
}
+
}
}
+
+ // log.info("all on latch");
- //Now we release the latch and wait for all threads to exit
-
+ // Now we release the latch and wait for all threads to exit
+
ReplicationAwareMutex.setOwnerLatchAll();
start = System.currentTimeMillis();
+
// Wait for everything to exit
for (RemotingConnection rc : backupConnections)
{
while (true)
{
- JBMThread executingThread = (JBMThread)rc.getExecutingThread();
+ // log.info("in loop2");
+ Set<Thread> executingThreads = rc.getExecutingThreads();
- if (executingThread == null)
+ if (executingThreads.isEmpty())
{
break;
}
@@ -1173,25 +1230,46 @@
if (System.currentTimeMillis() - start >= FREEZE_TIMEOUT)
{
Exception e = new Exception();
+ Thread executingThread = executingThreads.iterator().next();
e.setStackTrace(executingThread.getStackTrace());
log.error("Waiting for this thread " + executingThread, e);
- log.error("Replay " + executingThread.isReplay());
+ // log.error("Replay " + executingThread.isReplay());
throw new IllegalStateException("Timed out waiting for threads to exit");
}
}
}
+
+ // log.info("all exited");
+
+
+
+ //FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
+ //need to wait for all executions on the channel to complete too
+ for (RemotingConnection rc : backupConnections)
+ {
+ rc.setFrozenAllChannels(false);
+ }
+
+ //Now we need to wait for all executions to finish on the channel - they may be queued
+
+ for (RemotingConnection rc : backupConnections)
+ {
+ rc.waitForAllExecutions();
+ }
ReplicationAwareMutex.clearLatchAll();
+
+ log.info("freeze complete");
}
}
- private void freezeConnections()
- {
- for (RemotingConnection rc : backupConnections)
- {
- rc.freeze();
- }
- }
+ // private void freezeConnections()
+ // {
+ // for (RemotingConnection rc : backupConnections)
+ // {
+ // rc.freeze();
+ // }
+ // }
public boolean registerBackupConnection(final RemotingConnection connection)
{
@@ -1440,15 +1518,27 @@
}
}
- private Replicator getReplicatorForQueue(final long queueID)
+ private Replicator getReplicatorForQueue(final long queueID) throws Exception
{
- RemotingConnection replicatingConnection = this.getPooledReplicatingConnection();
+ RemotingConnection replicatingConnection;
+ // Needs to be excuted on different thread since netty doesn't like new connections created on
+ // handler threads
+ java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+ {
+ public RemotingConnection call()
+ {
+ return getPooledReplicatingConnection();
+ }
+ });
+
+ replicatingConnection = result.get();
+
final Replicator replicator;
if (replicatingConnection != null)
{
- Channel channel1 = replicatingConnection.getChannel(1, -1, false);
+ Channel channel1 = replicatingConnection.getChannel(1, -1, false, false);
JBMThread thread = JBMThread.currentThread();
@@ -1458,9 +1548,9 @@
thread.resumeRecording();
- Channel replChannel = replicatingConnection.getChannel(queueID, -1, false);
+ Channel replChannel = replicatingConnection.getChannel(queueID, -1, false, false);
- replicator = new ReplicatorImpl(replChannel);
+ replicator = new ReplicatorImpl("queue " + queueID, replChannel);
replChannel.setHandler(new ChannelHandler()
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -87,6 +87,8 @@
throw new IllegalStateException("First packet must be startup info for backup " + type);
}
+ // log.info("handling packet " + packet + " on backup " + this.server.getConfiguration().isBackup());
+
switch (type)
{
case REPLICATE_LOCK_SEQUENCES:
@@ -94,6 +96,8 @@
ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
sequences = msg.getSequences();
+
+ // dumpSequences(sequences);
return;
}
@@ -116,7 +120,7 @@
{
RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
- Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+ Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
if (server.registerBackupConnection(channel.getConnection()))
{
@@ -129,7 +133,7 @@
{
UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
- Channel channel = connection.getChannel(msg.getBindingID(), -1, false);
+ Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
channel.setHandler(null);
@@ -195,7 +199,7 @@
log.error("Invalid packet " + packet);
}
}
- sequences = null;
+ //sequences = null;
// send the response message
@@ -268,5 +272,14 @@
channel1.send(response);
}
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info(Thread.currentThread() + " Got on messaging server ph Sequences size is " + sequences.size());
+ for (Triple<Long, Long, Integer> sequence: sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
}
\ No newline at end of file
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -658,7 +658,7 @@
try
{
- int count = messageReferences.size() + getScheduledCount() + getDeliveringCount();
+ int count = messageReferences.size() + scheduledDeliveryHandler.getScheduledCount() + deliveringCount.get();
// log.info(System.identityHashCode(this) + " message count is " +
// count +
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueReplicationPacketHandler.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -79,6 +79,9 @@
ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
sequences = msg.getSequences();
+
+ // log.info("got sequences on queue " + msg.getID());
+ // dumpSequences(sequences);
break;
}
@@ -118,4 +121,14 @@
}
}
}
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info(Thread.currentThread() + " Got on queue replication ph Sequences size is " + sequences.size());
+
+ for (Triple<Long, Long, Integer> sequence: sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerConsumerImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -119,11 +119,14 @@
private final Binding binding;
private boolean flowControl = true;
+
+ private boolean backup;
// Constructors ---------------------------------------------------------------------------------
public ServerConsumerImpl(final long id,
final ServerSession session,
+ final Executor executor,
final QueueBinding binding,
final Filter filter,
final boolean started,
@@ -132,9 +135,12 @@
final PagingManager pagingManager,
final Channel channel,
final boolean preAcknowledge,
- final boolean updateDeliveries,
- final ManagementService managementService) throws Exception
+ final boolean updateDeliveries,
+ final ManagementService managementService,
+ boolean backup) throws Exception
{
+ this.backup = backup;
+
this.id = id;
this.filter = filter;
@@ -145,7 +151,7 @@
this.messageQueue = binding.getQueue();
- this.executor = null;
+ this.executor = executor;
this.started = browseOnly || started;
@@ -164,11 +170,15 @@
this.minLargeMessageSize = session.getMinLargeMessageSize();
this.updateDeliveries = updateDeliveries;
+
+ name = "consumer " + session.getName() + "-" + id;
- lock = new ReplicationAwareMutex("consumer " + session.getName() + "-" + id, 0, true);
-
+ lock = new ReplicationAwareMutex(name, 0, true);
+
binding.getQueue().addConsumer(this);
}
+
+ String name;
// ServerConsumer implementation
// ----------------------------------------------------------------------
@@ -292,10 +302,12 @@
promptDelivery();
}
}
-
+
public void receiveCredits(final int credits) throws Exception
{
boolean promptDelivery = false;
+
+ // log.info(name + "backup " + backup + " received credits");
lock.lock(1);
@@ -312,7 +324,7 @@
{
promptDelivery = true;
}
-
+
availableCredits += credits;
}
}
@@ -421,19 +433,10 @@
}
// Public ---------------------------------------------------------------------------------------
-
- /** To be used on tests only */
- public int getAvailableCredits()
+
+ public boolean isFlowControl()
{
- lock.lock(2);
- try
- {
- return availableCredits;
- }
- finally
- {
- lock.unlock();
- }
+ return flowControl;
}
// Private --------------------------------------------------------------------------------------
@@ -464,7 +467,15 @@
// If we play the commands on a different order than how they were generated on the live node, we will
// eventually still be running this largeMessage before the next message come, what would reject messages
// from the cluster
- largeMessageDeliverer.deliver();
+ lock.lock(1234);
+ try
+ {
+ largeMessageDeliverer.deliver();
+ }
+ finally
+ {
+ lock.unlock();
+ }
}
else
{
@@ -474,12 +485,13 @@
private HandleStatus doHandle(final MessageReference ref) throws Exception
{
+ // log.info(name + " backup " + backup + " handle");
lock.lock(3);
try
{
if ((flowControl && availableCredits <= 0) || !started)
- {
+ {
return HandleStatus.BUSY;
}
@@ -569,7 +581,7 @@
{
availableCredits -= packet.getRequiredBufferSize();
}
-
+
channel.send(packet);
}
@@ -626,98 +638,89 @@
public boolean deliver()
{
- lock.lock(5);
+ if (pendingLargeMessage == null)
+ {
+ return true;
+ }
- try
+ if (flowControl && availableCredits <= 0)
{
- if (pendingLargeMessage == null)
- {
- return true;
- }
+ return false;
+ }
+ SessionReceiveMessage initialMessage;
- if (flowControl && availableCredits <= 0)
- {
- return false;
- }
- SessionReceiveMessage initialMessage;
+ if (sentFirstMessage)
+ {
+ initialMessage = null;
+ }
+ else
+ {
+ sentFirstMessage = true;
- if (sentFirstMessage)
- {
- initialMessage = null;
- }
- else
- {
- sentFirstMessage = true;
+ MessagingBuffer headerBuffer = ChannelBuffers.buffer(pendingLargeMessage.getPropertiesEncodeSize());
- MessagingBuffer headerBuffer = ChannelBuffers.buffer(pendingLargeMessage.getPropertiesEncodeSize());
+ pendingLargeMessage.encodeProperties(headerBuffer);
- pendingLargeMessage.encodeProperties(headerBuffer);
+ initialMessage = new SessionReceiveMessage(id,
+ headerBuffer.array(),
+ pendingLargeMessage.getLargeBodySize(),
+ ref.getDeliveryCount());
+ }
- initialMessage = new SessionReceiveMessage(id,
- headerBuffer.array(),
- pendingLargeMessage.getLargeBodySize(),
- ref.getDeliveryCount());
- }
+ int precalculateAvailableCredits;
- int precalculateAvailableCredits;
+ if (flowControl)
+ {
+ // Flow control needs to be done in advance.
+ precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
+ }
+ else
+ {
+ precalculateAvailableCredits = 0;
+ }
+ if (initialMessage != null)
+ {
+ channel.send(initialMessage);
+
if (flowControl)
{
- // Flow control needs to be done in advance.
- precalculateAvailableCredits = preCalculateFlowControl(initialMessage);
+ precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
}
- else
+ }
+
+ while (positionPendingLargeMessage < sizePendingLargeMessage)
+ {
+ if (flowControl && precalculateAvailableCredits <= 0)
{
- precalculateAvailableCredits = 0;
+ return false;
}
- if (initialMessage != null)
- {
- channel.send(initialMessage);
+ SessionReceiveContinuationMessage chunk = createChunkSend();
- if (flowControl)
- {
- precalculateAvailableCredits -= initialMessage.getRequiredBufferSize();
- }
- }
+ int chunkLen = chunk.getBody().length;
- while (positionPendingLargeMessage < sizePendingLargeMessage)
+ if (flowControl)
{
- if (flowControl && precalculateAvailableCredits <= 0)
+ if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
{
- return false;
+ log.warn("Flowcontrol logic is not working properly, too many credits were taken");
}
-
- SessionReceiveContinuationMessage chunk = createChunkSend();
-
- int chunkLen = chunk.getBody().length;
-
- if (flowControl)
- {
- if ((precalculateAvailableCredits -= chunk.getRequiredBufferSize()) < 0)
- {
- log.warn("Flowcontrol logic is not working properly, too many credits were taken");
- }
- }
-
- channel.send(chunk);
-
- positionPendingLargeMessage += chunkLen;
}
- if (precalculateAvailableCredits != 0)
- {
- log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
- }
+ channel.send(chunk);
- close();
+ positionPendingLargeMessage += chunkLen;
+ }
- return true;
- }
- finally
+ if (precalculateAvailableCredits != 0)
{
- lock.unlock();
+ log.warn("Flowcontrol logic is not working properly... creidts = " + precalculateAvailableCredits);
}
+
+ close();
+
+ return true;
}
/**
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -454,6 +454,7 @@
ServerConsumer consumer = new ServerConsumerImpl(idGenerator.generateID(),
this,
+ executorFactory.getExecutor(),
(QueueBinding)binding,
filter,
started,
@@ -463,7 +464,8 @@
channel,
preAcknowledge,
updateDeliveries,
- managementService);
+ managementService,
+ server.getConfiguration().isBackup());
consumers.put(consumer.getID(), consumer);
@@ -525,6 +527,8 @@
boolean durable = packet.isDurable();
Packet response = null;
+
+ // log.info("** handling create queue on backup " + this.server.getConfiguration().isBackup());
try
{
@@ -1575,6 +1579,8 @@
remotingConnection.addCloseListener(this);
int serverLastConfirmedCommandID = channel.getLastConfirmedCommandID();
+
+ channel.setFrozen(false);
//log.info("telling channel to replay commands up to " + lastConfirmedCommandID);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -80,7 +80,6 @@
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.core.server.replication.impl.JBMThread;
-import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.Triple;
/**
@@ -102,10 +101,10 @@
private Configuration config;
- //TODO the sequences and repl response can be encapsulated in a super class
-
+ // TODO the sequences and repl response can be encapsulated in a super class
+
private volatile List<Triple<Long, Long, Integer>> sequences;
-
+
private final Channel channel;
public ServerSessionPacketHandler(final ServerSession session,
@@ -139,57 +138,45 @@
public void handlePacket(final Packet packet)
{
this.packet = packet;
-
+
if (config.isBackup())
{
- //log.info(System.identityHashCode(this) + " inv on backup");
-
+ // log.info(System.identityHashCode(this) + " inv on backup");
+
JBMThread thread = JBMThread.currentThread();
+
+ if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
+ {
+ thread.setReplay(sequences);
+ }
- thread.setReplay(sequences);
-
handlePacket();
// send the response message
-
+
if (packet.getType() != PacketImpl.REPLICATE_LOCK_SEQUENCES)
- {
- //checkConfirm(packet);
-
- // log.info("sending confirm on sess handle packet");
+ {
channel.confirm(packet);
-
-// if (packet.getType() == PacketImpl.SESS_SEND)
-// {
-// SessionSendMessage sm = (SessionSendMessage)packet;
-//
-// ServerMessage msg = sm.getServerMessage();
-//
-// int cnt = (Integer)msg.getProperty(new SimpleString("count"));
-//
-// log.info("confirmed send " + cnt);
-// }
-
+
channel.send(new ReplicationResponseMessage());
+
+ thread.setNoReplayOrRecord(4);
}
+
- thread.setNoReplayOrRecord(4);
-
- checkCloseSessionChannels(packet);
+
+ checkCloseSessionChannels(packet);
}
else
{
- // log.info(System.identityHashCode(this) + " inv on live, repl is " + replicator);
-
if (replicator != null)
{
- replicator.execute(this,
- new Runnable()
+ replicator.execute(this, new Runnable()
{
public void run()
{
checkConfirm(packet);
-
+
checkCloseSessionChannels(packet);
}
});
@@ -197,41 +184,40 @@
else
{
handlePacket();
-
+
checkConfirm(packet);
-
+
checkCloseSessionChannels(packet);
}
}
}
-
+
private void checkConfirm(final Packet packet)
{
- //TODO this is a bit hacky
+ // TODO this is a bit hacky
if (packet.getType() != PacketImpl.SESS_CLOSE)
- {
- // log.info("sending confirm from sess close");
+ {
channel.confirm(packet);
}
}
-
+
private void checkCloseSessionChannels(final Packet packet)
{
if (packet.getType() == PacketImpl.SESS_CLOSE)
{
- //Close channels once we have the response back from the backup
+ // Close channels once we have the response back from the backup
session.closeChannels();
}
}
-// private static synchronized void dumpSequences(List<Long> sequences)
-// {
-// log.info("dumping sequences");
-// for (long sequence : sequences)
-// {
-// log.info(sequence);
-// }
-// }
+ // private static synchronized void dumpSequences(List<Long> sequences)
+ // {
+ // log.info("dumping sequences");
+ // for (long sequence : sequences)
+ // {
+ // log.info(sequence);
+ // }
+ // }
private void handlePacket()
{
@@ -244,11 +230,12 @@
case REPLICATE_LOCK_SEQUENCES:
{
ReplicateLockSequenceMessage msg = (ReplicateLockSequenceMessage)packet;
-
+
sequences = msg.getSequences();
-
- //dumpSequences(sequences);
-
+
+ // log.info("session got sequences");
+ // dumpSequences(sequences);
+
break;
}
case SESS_CREATECONSUMER:
@@ -429,4 +416,13 @@
log.error("Caught unexpected exception", t);
}
}
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info(Thread.currentThread() + " Got on serversession ph Sequences size is " + sequences.size());
+ for (Triple<Long, Long, Integer> sequence : sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b + ": " + sequence.c);
+ }
+ }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/JBMThread.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -121,6 +121,10 @@
public void setReplay(final List<Triple<Long, Long, Integer>> objectSequences)
{
+ if (objectSequences == null)
+ {
+ throw new NullPointerException("sequences cannot be null");
+ }
// log.info(this + " set replay");
this.objectSequences = objectSequences;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -22,6 +22,8 @@
package org.jboss.messaging.core.server.replication.impl;
+import java.util.ArrayList;
+import java.util.List;
import java.util.Set;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -62,14 +64,21 @@
private static final Set<ReplicationAwareMutex> allMutexes = new WeakHashSet<ReplicationAwareMutex>();
private volatile CountDownLatch otherLatch;
+
+ private final String name;
+
+ //debug
+ // private List<Exception> history = new ArrayList<Exception>();
public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
{
+ this.name = name;
+
this.id = idSequence.getAndIncrement();
lock = new ReentrantLock();
- sequencedLock = new SequencedLock(name, initialCount);
+ sequencedLock = new SequencedLock(id, name, initialCount);
counter = new AtomicInteger(initialCount);
@@ -79,6 +88,24 @@
}
}
+ public static void dumpHistory(final String name)
+ {
+// synchronized (allMutexes)
+// {
+// for (ReplicationAwareMutex mutex: allMutexes)
+// {
+// if (mutex.name.equals(name))
+// {
+// log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
+// for (Exception e: mutex.history)
+// {
+// log.info("acquirer", e);
+// }
+// }
+// }
+// }
+ }
+
public static void setOwnerLatchAll()
{
synchronized (allMutexes)
@@ -110,18 +137,18 @@
synchronized (allMutexes)
{
for (ReplicationAwareMutex mutex : allMutexes)
- {
+ {
mutex.setLatch();
}
}
}
-
+
public static void clearLatchAll()
{
synchronized (allMutexes)
{
for (ReplicationAwareMutex mutex : allMutexes)
- {
+ {
mutex.clearLatches();
}
}
@@ -139,7 +166,7 @@
otherLatch.countDown();
}
}
-
+
public void clearLatches()
{
otherLatch = freezeLatch = null;
@@ -163,8 +190,19 @@
private boolean doLock(long time, TimeUnit unit, int methodID) throws InterruptedException
{
- JBMThread thread = JBMThread.currentThread();
+ // TODO optimise this
+ Thread thread = Thread.currentThread();
+ JBMThread jthread;
+ if (thread instanceof JBMThread)
+ {
+ jthread = (JBMThread)thread;
+ }
+ else
+ {
+ jthread = null;
+ }
+
// debug only
if (owners.contains(thread))
{
@@ -176,10 +214,10 @@
throw new IllegalStateException("Lock is NOT re-entrant!");
}
- if (otherLatch != null)
+ if (jthread != null && otherLatch != null)
{
- thread.setWaitingOnMutex(true);
-
+ jthread.setWaitingOnMutex(true);
+
while (true)
{
try
@@ -187,60 +225,69 @@
otherLatch.await();
}
catch (InterruptedException e)
- {
- //This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
- //in which case we just wait again
+ {
+ // This might get interrupted by mistake when we interrupt the thread thinking it's on the sequenced lock
+ // in which case we just wait again
continue;
}
-
+
break;
}
- thread.setWaitingOnMutex(false);
+ jthread.setWaitingOnMutex(false);
}
- if (thread.isReplay())
+ if (jthread != null && jthread.isReplay())
{
- //log.info("Thread " + thread + " is replay");
- Triple<Long, Long, Integer> pair = thread.getNextSequence();
+ // log.info("Thread " + thread + " is replay");
+ Triple<Long, Long, Integer> pair = jthread.getNextSequence();
- // // Sanity check
- // String otherName = SequencedLock.getLock(pair.a).getName();
- //
- // //If sequencedLock
- //
- // if ((!otherName.equals(name) || methodID != pair.c))
- // {
- // String msg = "Invalid object id, expecting " + name + ": " + methodID + " got " + otherName + ": " + pair.c
- // +
- // " lock id is " + pair.a;
- //
- // log.error(msg);
- //
- // thread.dumpSequences();
- //
- // SequencedLock.dumpLockMap();
- //
- // throw new IllegalStateException(msg);
- // }
+// // Sanity check
+// String otherName = SequencedLock.getLock(pair.a).getName();
+//
+// // If sequencedLock
+//
+// if ((!otherName.equals(name) || methodID != pair.c))
+// {
+// String msg = "Invalid object id, expecting " + name +
+// ": " +
+// methodID +
+// " got " +
+// otherName +
+// ": " +
+// pair.c +
+// " lock id is " +
+// pair.a;
+//
+// log.error(msg);
+//
+// dumpSequences(jthread.getSequences());
+//
+// SequencedLock.dumpLockMap();
+//
+// throw new IllegalStateException(msg);
+// }
long sequence = pair.b;
try
{
- if (!thread.isReplay())
+ if (!jthread.isReplay())
{
throw new IllegalStateException("How can it be non replay?");
}
-
+
if (!sequencedLock.lock(sequence, unit.toNanos(time)))
{
// dumpLocksWithName(name);
+ log.error("Timedout out waiting for lock " + name + " method id " + methodID);
+
+ dumpHistory(name);
}
}
catch (InterruptedException e)
{
- if (thread.isFrozen())
+ if (jthread.isFrozen())
{
// We retry and this time it will use the standard mutex - this happens on freezing out
return doLock(time, unit, methodID);
@@ -248,6 +295,10 @@
}
addOwner(thread);
+
+// Exception hist = new Exception();
+// hist.setStackTrace(thread.getStackTrace());
+// this.history.add(hist);
return true;
}
@@ -265,14 +316,18 @@
if (ok)
{
- if (thread.isRecording())
+ if (jthread != null && jthread.isRecording())
{
long sequence = counter.getAndIncrement();
- thread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+ jthread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
}
addOwner(thread);
+
+// Exception hist = new Exception();
+// hist.setStackTrace(thread.getStackTrace());
+// this.history.add(hist);
}
return ok;
@@ -281,8 +336,18 @@
private void doUnlock()
{
- JBMThread thread = JBMThread.currentThread();
+ Thread thread = Thread.currentThread();
+ JBMThread jthread;
+ if (thread instanceof JBMThread)
+ {
+ jthread = (JBMThread)thread;
+ }
+ else
+ {
+ jthread = null;
+ }
+
if (thread == unfreezeOwner)
{
// Don't actually unlock this, since we never had the lock - we had the lock on the original SequencedLock
@@ -293,7 +358,7 @@
}
else
{
- if (thread.isReplay())
+ if (jthread != null && jthread.isReplay())
{
sequencedLock.unlock();
}
@@ -304,21 +369,31 @@
}
removeOwner(thread);
+
}
// debug only
- private void addOwner(final JBMThread thread)
+ private void addOwner(final Thread thread)
{
owners.add(thread);
}
// debug only
- private void removeOwner(final JBMThread thread)
+ private void removeOwner(final Thread thread)
{
owners.remove(thread);
}
// For debug
private Set<Thread> owners = new ConcurrentHashSet<Thread>();
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
+ {
+ log.info("Sequences size is " + sequences.size());
+ for (Triple<Long, Long, Integer> sequence: sequences)
+ {
+ log.info(sequence.a + ": " + sequence.b);
+ }
+ }
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareSharedCounter.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -53,16 +53,27 @@
public long getAndIncrement()
{
- JBMThread thread = JBMThread.currentThread();
+ // TODO optimise this
+ Thread thread = Thread.currentThread();
- if (thread.isReplay())
+ JBMThread jthread;
+ if (thread instanceof JBMThread)
{
- if (thread.isRecording())
+ jthread = (JBMThread)thread;
+ }
+ else
+ {
+ jthread = null;
+ }
+
+ if (jthread != null && jthread.isReplay())
+ {
+ if (jthread.isRecording())
{
throw new IllegalStateException("Thread should not be recording");
}
- Triple<Long, Long, Integer> pair = thread.getNextSequence();
+ Triple<Long, Long, Integer> pair = jthread.getNextSequence();
//Sanity check
if (pair.a != -1)
@@ -87,9 +98,9 @@
{
long sequence = al.getAndIncrement();
- if (thread.isRecording())
+ if (jthread != null && jthread.isRecording())
{
- thread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
+ jthread.addSequence(new Triple<Long, Long, Integer>(-1L, sequence, -1));
}
return sequence;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -27,6 +27,7 @@
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.atomic.AtomicLong;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.remoting.Channel;
@@ -35,7 +36,6 @@
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
-import org.jboss.messaging.utils.Pair;
import org.jboss.messaging.utils.Triple;
/**
@@ -70,9 +70,13 @@
{
int count;
}
+
+ private String name;
- public ReplicatorImpl(final Channel replicatingChannel)
+ public ReplicatorImpl(String name, final Channel replicatingChannel)
{
+ this.name = name;
+
this.replicatingChannel = replicatingChannel;
}
@@ -104,15 +108,6 @@
{
WaitingChannelsHolder waitingChannelsHolder = waitingChannelsQueue.remove();
- // log.info("got replication response in replicator for sent packet " + waitingChannelsHolder.sentPacket);
-
- // log.info("there are " + waitingChannelsHolder.channelQueuedWriteCounts.size() + " waiting channels");
-
-// if (waitingChannelsHolder.sentPacket.getType() == PacketImpl.SESS_CLOSE)
-// {
-// log.info("***Got session close response from backup");
-// }
-
for (Map.Entry<QueuedWriteManager, ChannelCount> entry: waitingChannelsHolder.channelQueuedWriteCounts.entrySet())
{
entry.getKey().replicationResponseReceived(this, entry.getValue().count);
@@ -146,13 +141,10 @@
List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
- // log.info("Replicating:");
-
- // thread.dumpSequences();
-
-
+ long id = seq.getAndIncrement();
+ // log.info("replicating " + name + " seq " + id);
// dumpSequences(sequences);
-
+
// We then send the sequences to the backup
WaitingChannelsHolder holder = new WaitingChannelsHolder();
@@ -161,22 +153,26 @@
holder.sentPacket = action.getPacket();
waitingChannelsQueue.add(holder);
+
+
- Packet packet = new ReplicateLockSequenceMessage(sequences);
+ Packet packet = new ReplicateLockSequenceMessage(id, sequences);
replicatingChannel.send(packet);
// Next we replicate the actual action
-
+
replicatingChannel.send(action.getPacket());
}
- private void dumpSequences(List<Long> sequences)
+ private static final AtomicLong seq = new AtomicLong(0);
+
+ private void dumpSequences(List<Triple<Long, Long, Integer>> sequences)
{
log.info("Sequences size is " + sequences.size());
- for (long sequence: sequences)
+ for (Triple<Long, Long, Integer> sequence: sequences)
{
- log.info(sequence);
+ log.info(sequence.a + ": " + sequence.b);
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/SequencedLock.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -53,12 +53,14 @@
private final AtomicBoolean locked = new AtomicBoolean(false);
+ private final long id;
+
// private volatile boolean strictOrder = true;
private static Map<Long, SequencedLock> locks = new HashMap<Long, SequencedLock>();
// for debug only
- // private final long id;
+ // private final long id;
private final String name;
@@ -67,15 +69,16 @@
return owner;
}
- // private static synchronized void registerLock(final SequencedLock lock)
- // {
- // locks.put(lock.id, lock);
- // }
+ private static synchronized void registerLock(final SequencedLock lock)
+ {
+ //locks.put(lock.id, lock);
+ }
+
//
- // public static synchronized SequencedLock getLock(final long id)
- // {
- // return locks.get(id);
- // }
+ public static synchronized SequencedLock getLock(final long id)
+ {
+ return locks.get(id);
+ }
public String getName()
{
@@ -130,30 +133,30 @@
JBMThread thr = (JBMThread)owner;
-// SequencedLock waitingFor = thr.getLastLock();
-//
-// if (waitingFor != null)
-// {
-// while (true)
-// {
-// log.info("waiting for...");
-// waitingFor.dump();
-//
-// thr = (JBMThread)waitingFor.owner;
-//
-// if (thr != null)
-// {
-// waitingFor = thr.getLastLock();
-//
-// if (waitingFor != null)
-// {
-// continue;
-// }
-// }
-//
-// break;
-// }
-// }
+ // SequencedLock waitingFor = thr.getLastLock();
+ //
+ // if (waitingFor != null)
+ // {
+ // while (true)
+ // {
+ // log.info("waiting for...");
+ // waitingFor.dump();
+ //
+ // thr = (JBMThread)waitingFor.owner;
+ //
+ // if (thr != null)
+ // {
+ // waitingFor = thr.getLastLock();
+ //
+ // if (waitingFor != null)
+ // {
+ // continue;
+ // }
+ // }
+ //
+ // break;
+ // }
+ // }
}
// log.info("Waiting threads: " + queue.size());
@@ -165,9 +168,9 @@
// }
}
- public SequencedLock(final String name, final long sequence)
+ public SequencedLock(final long id, final String name, final long sequence)
{
- // this.id = id;
+ this.id = id;
this.name = name;
@@ -175,10 +178,9 @@
this.currentSequence = new AtomicLong(sequence);
- // registerLock(this);
+ registerLock(this);
}
-
// TODO parking with a timeout seems to be a lot slower than parking without timeout
public boolean lock(final long sequence, final long timeout) throws InterruptedException
{
@@ -197,33 +199,33 @@
while (true)
{
QueueEntry peeked = peekEntry();
-
+
if (peeked == null || // There are higher priority threads
peeked.thread != currentThread ||
!locked.compareAndSet(false, true)) // Lock is already locked
{
currentThread.setWaitingOnSequencedLock(true);
-
+
LockSupport.parkNanos(toWait);
-
+
if (Thread.interrupted() && currentThread.isFrozen())
{
throw new InterruptedException();
}
-
+
long now = System.nanoTime();
-
+
toWait -= now - start;
-
+
if (toWait <= 0)
{
log.warn("Timed out waiting for sequenced lock, current " + currentSequence.get() +
" expected " +
sequence);
-
+
return false;
}
-
+
start = now;
}
else
@@ -240,7 +242,7 @@
queue.remove();
owner = currentThread;
-
+
return true;
}
@@ -266,7 +268,6 @@
}
-
private QueueEntry peekEntry()
{
QueueEntry entry = queue.peek();
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/concurrent/server/impl/QueueTest.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -18,19 +18,22 @@
* License along with this software; if not, write to the Free
* Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA
* 02110-1301 USA, or see the FSF site: http://www.fsf.org.
- */
+ */
package org.jboss.messaging.tests.concurrent.server.impl;
import java.util.ArrayList;
import java.util.List;
+import java.util.concurrent.Executor;
+import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.logging.Logger;
import org.jboss.messaging.core.server.HandleStatus;
import org.jboss.messaging.core.server.MessageReference;
import org.jboss.messaging.core.server.Queue;
import org.jboss.messaging.core.server.QueueFactory;
import org.jboss.messaging.core.server.ServerMessage;
+import org.jboss.messaging.core.server.replication.Replicator;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeConsumer;
import org.jboss.messaging.tests.unit.core.server.impl.fakes.FakeQueueFactory;
import org.jboss.messaging.tests.util.UnitTestCase;
@@ -48,163 +51,167 @@
public class QueueTest extends UnitTestCase
{
private static final Logger log = Logger.getLogger(QueueTest.class);
-
+
private QueueFactory queueFactory = new FakeQueueFactory();
-
+
/*
* Concurrent set consumer not busy, busy then, call deliver while messages are being added and consumed
*/
public void testConcurrentAddsDeliver() throws Exception
{
- Queue queue = queueFactory.createQueue(1, new SimpleString("address1"), new SimpleString("queue1"), null, false, false);
-
+ Queue queue = queueFactory.createQueue(1,
+ new SimpleString("address1"),
+ new SimpleString("queue1"),
+ null,
+ false,
+ false,
+ null,
+ null);
+
FakeConsumer consumer = new FakeConsumer();
-
+
queue.addConsumer(consumer);
-
+
final long testTime = 5000;
-
+
Sender sender = new Sender(queue, testTime);
-
+
Toggler toggler = new Toggler(queue, consumer, testTime);
-
+
sender.start();
-
+
toggler.start();
-
+
sender.join();
-
+
toggler.join();
-
+
consumer.setStatusImmediate(HandleStatus.HANDLED);
-
+
queue.deliverNow();
if (sender.getException() != null)
{
throw sender.getException();
}
-
+
if (toggler.getException() != null)
{
throw toggler.getException();
}
-
+
assertRefListsIdenticalRefs(sender.getReferences(), consumer.getReferences());
-
+
log.info("num refs: " + sender.getReferences().size());
-
+
log.info("num toggles: " + toggler.getNumToggles());
-
+
}
-
+
// Inner classes ---------------------------------------------------------------
-
+
class Sender extends Thread
{
private volatile Exception e;
-
+
private Queue queue;
-
+
private long testTime;
-
+
private volatile int i;
-
+
public Exception getException()
{
return e;
}
-
+
private List<MessageReference> refs = new ArrayList<MessageReference>();
-
+
public List<MessageReference> getReferences()
{
return refs;
}
-
+
Sender(Queue queue, long testTime)
{
this.testTime = testTime;
-
+
this.queue = queue;
}
-
+
public void run()
{
long start = System.currentTimeMillis();
-
+
while (System.currentTimeMillis() - start < testTime)
{
ServerMessage message = generateMessage(i);
-
+
MessageReference ref = message.createReference(queue);
-
+
queue.addLast(ref);
-
+
refs.add(ref);
-
+
i++;
}
}
}
-
+
class Toggler extends Thread
{
private volatile Exception e;
-
+
private Queue queue;
-
+
private FakeConsumer consumer;
-
+
private long testTime;
-
+
private boolean toggle;
-
+
private volatile int numToggles;
-
+
public int getNumToggles()
{
return numToggles;
}
-
+
public Exception getException()
{
return e;
}
-
+
Toggler(Queue queue, FakeConsumer consumer, long testTime)
{
this.testTime = testTime;
-
+
this.queue = queue;
-
+
this.consumer = consumer;
}
-
+
public void run()
{
long start = System.currentTimeMillis();
-
+
while (System.currentTimeMillis() - start < testTime)
{
if (toggle)
{
- consumer.setStatusImmediate(HandleStatus.BUSY);
+ consumer.setStatusImmediate(HandleStatus.BUSY);
}
else
{
consumer.setStatusImmediate(HandleStatus.HANDLED);
-
+
queue.deliverNow();
}
toggle = !toggle;
-
+
numToggles++;
}
}
}
-
+
}
-
-
-
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/client/ConsumerWindowSizeTest.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -925,12 +925,12 @@
{
ServerConsumerImpl consumerImpl = (ServerConsumerImpl)consumer;
long timeout = System.currentTimeMillis() + 5000;
- while (timeout > System.currentTimeMillis() && consumerImpl.getAvailableCredits() != null)
+ while (timeout > System.currentTimeMillis() && consumerImpl.isFlowControl())
{
Thread.sleep(10);
}
- assertNull(consumerImpl.getAvailableCredits());
+ assertFalse(consumerImpl.isFlowControl());
}
}
}
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-20 13:53:36 UTC (rev 7595)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-07-20 22:34:58 UTC (rev 7596)
@@ -1286,7 +1286,7 @@
protected int getNumIterations()
{
- return 1000;
+ return 500;
}
@Override
More information about the jboss-cvs-commits
mailing list