[jboss-cvs] JBoss Messaging SVN: r7659 - in branches/Branch_MultiThreaded_Replication: src/main/org/jboss/messaging/core/remoting/impl and 6 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Mon Aug 3 14:28:38 EDT 2009
Author: timfox
Date: 2009-08-03 14:28:37 -0400 (Mon, 03 Aug 2009)
New Revision: 7659
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -1022,6 +1022,10 @@
}
conn = new RemotingConnectionImpl(tc, callTimeout, null);
+
+ Channel channel1 = new ChannelImpl(1, conn);
+
+ conn.putChannel(channel1);
conn.addFailureListener(new DelegatingFailureListener(conn.getID()));
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -115,6 +115,11 @@
this(id, -1, false, executor);
}
+ public ChannelImpl(final long id, final RemotingConnection connection, final Executor executor)
+ {
+ this(id, -1, false, executor, connection);
+ }
+
public ChannelImpl(final long id, final RemotingConnection connection, final int windowSize, final boolean block)
{
this(id, windowSize, block, null, connection);
@@ -622,7 +627,7 @@
*/
private void doHandlePacket(final Packet packet)
- {
+ {
// A
currentThread = Thread.currentThread();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/PacketDecoder.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -22,6 +22,7 @@
package org.jboss.messaging.core.remoting.impl;
+import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.BACKUP_CONNECTION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATESESSION_RESP;
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.CREATE_QUEUE;
@@ -437,7 +438,12 @@
{
packet = new PacketImpl(REPLICATE_QUEUE_DELIVERY);
break;
- }
+ }
+ case BACKUP_CONNECTION:
+ {
+ packet = new PacketImpl(BACKUP_CONNECTION);
+ break;
+ }
default:
{
throw new IllegalArgumentException("Invalid type: " + packetType);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.remoting.Interceptor;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.spi.Connection;
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.core.server.ChannelManager;
@@ -63,7 +64,7 @@
private volatile boolean destroyed;
- private volatile boolean active;
+ private volatile boolean active = true;
private final boolean client;
@@ -90,7 +91,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
+ this(transportConnection, null, blockingCallTimeout, interceptors, true, null);
}
/*
@@ -98,19 +99,17 @@
*/
public RemotingConnectionImpl(final Connection transportConnection,
final RemotingConnection replicatingConnection,
- final List<Interceptor> interceptors,
- final boolean active,
+ final List<Interceptor> interceptors,
final ChannelManager channelManager)
{
- this(transportConnection, replicatingConnection, -1, interceptors, active, false, channelManager);
+ this(transportConnection, replicatingConnection, -1, interceptors, false, channelManager);
}
private RemotingConnectionImpl(final Connection transportConnection,
final RemotingConnection replicatingConnection,
final long blockingCallTimeout,
- final List<Interceptor> interceptors,
- final boolean active,
+ final List<Interceptor> interceptors,
final boolean client,
final ChannelManager channelManager)
@@ -133,9 +132,9 @@
putChannel(pingChannel);
- Channel channel1 = new ChannelImpl(1, this);
-
- putChannel(channel1);
+// Channel channel1 = new ChannelImpl(1, this);
+//
+// putChannel(channel1);
}
// RemotingConnection implementation
@@ -323,6 +322,13 @@
public void bufferReceived(final Object connectionID, final MessagingBuffer buffer)
{
final Packet packet = decoder.decode(buffer);
+
+ if (packet.getType() == PacketImpl.BACKUP_CONNECTION)
+ {
+ active = false;
+
+ return;
+ }
// if (packet.getType() == PacketImpl.REPLICATE_LOCK_SEQUENCES)
// {
@@ -345,10 +351,11 @@
if (channel != null)
{
- if (channel.getConnection() != null)
- {
- throw new IllegalStateException("Channel already has connection associated to it");
- }
+// if (channel.getConnection() != null)
+// {
+// throw new IllegalStateException("Channel already has connection associated to it " + packet.getChannelID() +
+// " packet " + packet);
+// }
channel.setConnection(this);
@@ -363,6 +370,7 @@
{
channel.handlePacket(packet);
}
+
// else
// {
// log.info("channel is null");
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/PacketImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -162,7 +162,9 @@
public static final byte REPLICATE_QUEUE_DELIVERY = 99;
+ public static final byte BACKUP_CONNECTION = 100;
+
// Static --------------------------------------------------------
public PacketImpl(final byte type)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -40,6 +40,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -293,36 +294,27 @@
throw new IllegalStateException("Unable to create connection, server hasn't finished starting up");
}
- Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
- {
- public RemotingConnection call()
- {
- return server.getNonPooledReplicatingConnection();
- }
- });
-
RemotingConnection replicatingConnection;
-
+
try
{
- replicatingConnection = result.get();
+ replicatingConnection = server.getNonPooledReplicatingConnection();
}
- catch (ExecutionException e)
+ catch (Exception e)
{
- log.error("Failed to get replicating conection", e);
+ log.error("Failed to get replicating connection", e);
+
return;
}
- catch (InterruptedException e)
- {
- log.error("Interrupted", e);
- return;
- }
RemotingConnection rc = new RemotingConnectionImpl(connection,
replicatingConnection,
- interceptors,
- !config.isBackup(),
+ interceptors,
channelManager);
+
+ Channel channel1 = new ChannelImpl(1, rc);
+
+ rc.putChannel(channel1);
final Replicator replicator;
@@ -352,8 +344,6 @@
replicator = null;
}
- Channel channel1 = rc.getChannel(1);
-
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
channel1.setHandler(handler);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -113,7 +113,7 @@
SimpleString getNodeID();
- RemotingConnection getNonPooledReplicatingConnection();
+ RemotingConnection getNonPooledReplicatingConnection() throws Exception;
void returnNonPooledReplicatingConnection(RemotingConnection connection);
@@ -137,5 +137,5 @@
void handleReplicateRedistribution(final SimpleString queueName, final long messageID) throws Exception;
- boolean registerBackupConnection(RemotingConnection connection);
+ //boolean registerBackupConnection(RemotingConnection connection);
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -29,8 +29,6 @@
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
import javax.management.MBeanServer;
@@ -120,7 +118,6 @@
import org.jboss.messaging.utils.UUID;
import org.jboss.messaging.utils.UUIDGenerator;
import org.jboss.messaging.utils.VersionLoader;
-import org.jboss.messaging.utils.WeakHashSet;
/**
* The messaging server implementation
@@ -498,7 +495,7 @@
{
return this.threadPool;
}
-
+
public ExecutorFactory getExecutorFactory()
{
return this.executorFactory;
@@ -551,13 +548,13 @@
version.getFullVersion());
}
- if (!direct)
- {
- if (!registerBackupConnection(connection))
- {
- return null;
- }
- }
+ // if (!direct)
+ // {
+ // if (!registerBackupConnection(connection))
+ // {
+ // return null;
+ // }
+ // }
// Is this comment relevant any more ?
@@ -596,11 +593,8 @@
}
while (channelID < 2);
- Channel channel = new ChannelImpl(channelID,
- sendWindowSize,
- false,
- configuration.isBackup() ? this.executorFactory.getExecutor() : null);
-
+ Channel channel = new ChannelImpl(channelID, sendWindowSize, false, this.executorFactory.getExecutor());
+
remotingService.getChannelManager().putChannel(channel);
RemotingConnection replicatingConnection = connection.getReplicatingConnection();
@@ -612,10 +606,10 @@
if (replicatingConnection != null)
{
replicatingChannel = new ChannelImpl(channelID, replicatingConnection);
-
+
replicatingConnection.putChannel(replicatingChannel);
- replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
+ replicator = new ReplicatorImpl("session-" + channelID, replicatingChannel);
replicatingChannel.setHandler(new ChannelHandler()
{
@@ -855,7 +849,7 @@
}
postOffice.removeBinding(queueName);
-
+
remotingService.getChannelManager().removeChannel(queue.getID());
queue.close();
@@ -920,15 +914,34 @@
private boolean activatedBackup;
- public RemotingConnection getPooledReplicatingConnection()
+ public RemotingConnection getPooledReplicatingConnection() throws Exception
{
- RemotingConnection conn = null;
-
if (pooledReplicatingConnectionManager != null)
{
- conn = pooledReplicatingConnectionManager.getConnection(1);
+ java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
+ {
+ public RemotingConnection call()
+ {
+ return doGetPooledReplicatingConnection();
+ }
+ });
+
+ return result.get();
}
+ else
+ {
+ return null;
+ }
+ }
+ private RemotingConnection doGetPooledReplicatingConnection()
+ {
+ RemotingConnection conn = pooledReplicatingConnectionManager.getConnection(1);
+
+ Channel channel1 = conn.getChannel(1);
+
+ channel1.send(new PacketImpl(PacketImpl.BACKUP_CONNECTION));
+
return conn;
}
@@ -937,38 +950,57 @@
pooledReplicatingConnectionManager.returnConnection(conn);
}
- public RemotingConnection getNonPooledReplicatingConnection()
+ public RemotingConnection getNonPooledReplicatingConnection() throws Exception
{
- RemotingConnection conn = null;
-
if (nonPooledReplicatingConnectionManager != null)
{
- conn = nonPooledReplicatingConnectionManager.getConnection(1);
-
- synchronized (this)
+ java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
{
- if (!activatedBackup)
+ public RemotingConnection call()
{
- // First time we get channel we send a message down it informing the backup of our node id -
- // backup and live must have the same node id
+ return doGetNonPooledReplicatingConnection();
+ }
+ });
- Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+ return result.get();
+ }
+ else
+ {
+ return null;
+ }
+ }
- Channel channel1 = conn.getChannel(1);
+ private RemotingConnection doGetNonPooledReplicatingConnection()
+ {
+ RemotingConnection conn = null;
- ChannelHandler prevHandler = channel1.getHandler();
+ conn = nonPooledReplicatingConnectionManager.getConnection(1);
- sendOnReplicatingAndWaitForResponse(packet, channel1);
+ Channel channel1 = conn.getChannel(1);
- channel1.setHandler(prevHandler);
+ channel1.send(new PacketImpl(PacketImpl.BACKUP_CONNECTION));
- activatedBackup = true;
- }
+ synchronized (this)
+ {
+ if (!activatedBackup)
+ {
+ // First time we get channel we send a message down it informing the backup of our node id -
+ // backup and live must have the same node id
+
+ Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
+
+ ChannelHandler prevHandler = channel1.getHandler();
+
+ sendOnReplicatingAndWaitForResponse(packet, channel1);
+
+ channel1.setHandler(prevHandler);
+
+ activatedBackup = true;
}
-
- // TODO execute outstanding results when failure occurs
}
+ // TODO execute outstanding results when failure occurs
+
return conn;
}
@@ -977,8 +1009,6 @@
nonPooledReplicatingConnectionManager.returnConnection(conn);
}
- private Set<RemotingConnection> backupConnections = new WeakHashSet<RemotingConnection>();
-
private static class NoCacheConnectionLifeCycleListener implements ConnectionLifeCycleListener
{
private RemotingConnection conn;
@@ -1085,25 +1115,37 @@
// connection 1 delivery gets replicated
// can't find message in queue since active was delivered immediately
- private boolean frozen;
+ // private boolean frozen;
// We have to do a bit of locking jiggery-pokery to ensure we don't get connections being registered and blocking
// while freezing is in progress
- private final Lock flock = new ReentrantLock();
+ // private final Lock flock = new ReentrantLock();
private static final long FREEZE_TIMEOUT = 5000;
private void freezeBackupConnections()
{
- flock.lock();
+ // flock.lock();
log.info("** freezing backup connections");
+ Set<RemotingConnection> connections = this.remotingService.getConnections();
+
+ Set<RemotingConnection> backupConnections = new HashSet<RemotingConnection>();
+
+ for (RemotingConnection connection : connections)
+ {
+ if (!connection.isActive())
+ {
+ backupConnections.add(connection);
+ }
+ }
+
synchronized (backupConnections)
{
- frozen = true;
+ // frozen = true;
- flock.unlock();
+ // flock.unlock();
for (RemotingConnection rc : backupConnections)
{
@@ -1112,6 +1154,8 @@
rc.setFrozenAllChannels(true);
}
+ // Wait for all remoting connection threads to become null
+
for (RemotingConnection rc : backupConnections)
{
while (true)
@@ -1248,8 +1292,6 @@
// log.info("all exited");
- // FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
- // need to wait for all executions on the channel to complete too
for (RemotingConnection rc : backupConnections)
{
rc.setFrozenAllChannels(false);
@@ -1276,31 +1318,31 @@
// }
// }
- public boolean registerBackupConnection(final RemotingConnection connection)
- {
- flock.lock();
+ // public boolean registerBackupConnection(final RemotingConnection connection)
+ // {
+ // flock.lock();
+ //
+ // try
+ // {
+ // if (!frozen)
+ // {
+ // synchronized (backupConnections)
+ // {
+ // backupConnections.add(connection);
+ // }
+ // return true;
+ // }
+ // else
+ // {
+ // return false;
+ // }
+ // }
+ // finally
+ // {
+ // flock.unlock();
+ // }
+ // }
- try
- {
- if (!frozen)
- {
- synchronized (backupConnections)
- {
- backupConnections.add(connection);
- }
- return true;
- }
- else
- {
- return false;
- }
- }
- finally
- {
- flock.unlock();
- }
- }
-
private void initialisePart1() throws Exception
{
// Create the pools - we have two pools - one for non scheduled - and another for scheduled
@@ -1525,29 +1567,17 @@
private Replicator getReplicatorForQueue(final long queueID) throws Exception
{
- RemotingConnection replicatingConnection;
+ RemotingConnection replicatingConnection = getPooledReplicatingConnection();
- // Needs to be excuted on different thread since netty doesn't like new connections created on
- // handler threads
- java.util.concurrent.Future<RemotingConnection> result = threadPool.submit(new Callable<RemotingConnection>()
- {
- public RemotingConnection call()
- {
- return getPooledReplicatingConnection();
- }
- });
-
- replicatingConnection = result.get();
-
final Replicator replicator;
if (replicatingConnection != null)
{
Channel replChannel = new ChannelImpl(queueID, replicatingConnection);
-
+
replicatingConnection.putChannel(replChannel);
- replicator = new ReplicatorImpl("queue " + queueID, replChannel);
+ replicator = new ReplicatorImpl("queue-" + queueID, replChannel);
replChannel.setHandler(new ChannelHandler()
{
@@ -1601,7 +1631,7 @@
queues.put(queueBindingInfo.getPersistenceID(), queue);
postOffice.addBinding(binding);
-
+
createHandlerForQueue(queue);
}
@@ -1713,18 +1743,20 @@
}
postOffice.addBinding(binding);
-
+
createHandlerForQueue(queue);
return queue;
}
-
+
private void createHandlerForQueue(final Queue queue)
{
if (configuration.isBackup())
{
Channel channel = new ChannelImpl(queue.getID(), executorFactory.getExecutor());
-
+
+ channel.setHandler(new QueuePacketHandler(queue, channel));
+
remotingService.getChannelManager().putChannel(channel);
}
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -92,7 +92,7 @@
sequences = msg.getSequences();
- // dumpSequences(sequences);
+ //dumpSequences(sequences);
return;
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/QueueImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -27,12 +27,12 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.Executor;
+import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.locks.Lock;
import org.jboss.messaging.core.filter.Filter;
import org.jboss.messaging.core.list.PriorityLinkedList;
@@ -406,10 +406,11 @@
return;
}
- // if (waitingToDeliver.compareAndSet(false, true))
- // {
- executor.execute(deliverRunner);
- // }
+ if (waitingToDeliver.compareAndSet(false, true))
+ {
+ executor.execute(deliverRunner);
+
+ }
}
public void addConsumer(final Consumer consumer) throws Exception
@@ -1698,7 +1699,7 @@
public void run()
{
// Must be set to false *before* executing to avoid race
- //waitingToDeliver.set(false);
+ waitingToDeliver.set(false);
deliverAll();
}
@@ -1711,6 +1712,8 @@
{
// direct = false;
+ //log.info("delivering all " + this.backup);
+
HandleStatus handled;
if (replicator != null)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionPacketHandler.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -142,6 +142,8 @@
if (config.isBackup())
{
// log.info(System.identityHashCode(this) + " inv on backup");
+
+ //log.info("Received packet " + packet + " on backup");
JBMThread thread = JBMThread.currentThread();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicationAwareMutex.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -68,7 +68,7 @@
private final String name;
//debug
- // private List<Exception> history = new ArrayList<Exception>();
+ private List<HistoryEntry> history = new ArrayList<HistoryEntry>();
public ReplicationAwareMutex(final String name, final int initialCount, final boolean debug)
{
@@ -90,20 +90,20 @@
public static void dumpHistory(final String name)
{
-// synchronized (allMutexes)
-// {
-// for (ReplicationAwareMutex mutex: allMutexes)
-// {
-// if (mutex.name.equals(name))
-// {
-// log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
-// for (Exception e: mutex.history)
-// {
-// log.info("acquirer", e);
-// }
-// }
-// }
-// }
+ synchronized (allMutexes)
+ {
+ for (ReplicationAwareMutex mutex: allMutexes)
+ {
+ if (mutex.name.equals(name))
+ {
+ log.info("Dumping history of mutex with name " + name + " items " + mutex.history.size());
+ for (HistoryEntry h: mutex.history)
+ {
+ log.info("acquirer, sequence " + h.sequence, h.exception);
+ }
+ }
+ }
+ }
}
public static void setOwnerLatchAll()
@@ -169,7 +169,7 @@
public void clearLatches()
{
- otherLatch = freezeLatch = null;
+ //otherLatch = freezeLatch = null;
}
public void lock(final int methodID)
@@ -280,7 +280,7 @@
if (!sequencedLock.lock(sequence, unit.toNanos(time)))
{
// dumpLocksWithName(name);
- log.error("Timedout out waiting for lock " + name + " method id " + methodID);
+ log.error("Timedout out waiting for lock " + name + " method id " + methodID, new Exception());
dumpHistory(name);
}
@@ -296,9 +296,9 @@
addOwner(thread);
-// Exception hist = new Exception();
-// hist.setStackTrace(thread.getStackTrace());
-// this.history.add(hist);
+ Exception hist = new Exception();
+ hist.setStackTrace(thread.getStackTrace());
+ this.history.add(new HistoryEntry(hist, sequence));
return true;
}
@@ -321,18 +321,33 @@
long sequence = counter.getAndIncrement();
jthread.addSequence(new Triple<Long, Long, Integer>(id, sequence, methodID));
+
+ Exception hist = new Exception();
+ hist.setStackTrace(thread.getStackTrace());
+ this.history.add(new HistoryEntry(hist, sequence));
}
addOwner(thread);
-
-// Exception hist = new Exception();
-// hist.setStackTrace(thread.getStackTrace());
-// this.history.add(hist);
+
}
return ok;
}
}
+
+ private static class HistoryEntry
+ {
+ Exception exception;
+
+ long sequence;
+
+ HistoryEntry(Exception exception, long sequence)
+ {
+ this.exception = exception;
+
+ this.sequence = sequence;
+ }
+ }
private void doUnlock()
{
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/replication/impl/ReplicatorImpl.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -33,6 +33,7 @@
import org.jboss.messaging.core.remoting.Channel;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.impl.QueuedWriteManager;
+import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.replication.ReplicateLockSequenceMessage;
import org.jboss.messaging.core.server.replication.ReplicableAction;
import org.jboss.messaging.core.server.replication.Replicator;
@@ -142,9 +143,12 @@
List<Triple<Long, Long, Integer>> sequences = thread.getSequences();
long id = seq.getAndIncrement();
- // log.info("replicating " + name + " seq " + id);
- // dumpSequences(sequences);
+// log.info("replicating " + name + " seq " + id);
+// dumpSequences(sequences);
+ // log.info("replicating packet " + action.getPacket());
+
+
// We then send the sequences to the backup
WaitingChannelsHolder holder = new WaitingChannelsHolder();
Modified: branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-08-03 16:02:26 UTC (rev 7658)
+++ branches/Branch_MultiThreaded_Replication/tests/src/org/jboss/messaging/tests/integration/cluster/failover/MultiThreadRandomFailoverTestBase.java 2009-08-03 18:28:37 UTC (rev 7659)
@@ -406,6 +406,7 @@
protected void doTestB(final ClientSessionFactory sf, final int threadNum) throws Exception
{
+ log.info("Starting test B");
long start = System.currentTimeMillis();
ClientSession s = sf.createSession(false, false, false);
@@ -437,12 +438,12 @@
ClientProducer producer = sessSend.createProducer(ADDRESS);
sendMessages(sessSend, producer, numMessages, threadNum);
-
+
for (ClientSession session : sessions)
{
session.start();
}
-
+
Set<MyHandler> handlers = new HashSet<MyHandler>();
for (ClientConsumer consumer : consumers)
@@ -489,8 +490,8 @@
long end = System.currentTimeMillis();
+ log.info("test B complete");
log.info("duration " + (end - start));
-
}
protected void doTestC(final ClientSessionFactory sf, final int threadNum) throws Exception
@@ -1286,7 +1287,7 @@
protected int getNumIterations()
{
- return 500;
+ return 100;
}
@Override
@@ -1348,9 +1349,10 @@
protected void stop() throws Exception
{
- log.info("** Stopping server");
+ log.info("** Stopping backup server");
backupServer.stop();
+ log.info("** Stopping live server");
liveServer.stop();
System.gc();
More information about the jboss-cvs-commits
mailing list