[jboss-cvs] JBoss Messaging SVN: r7603 - in branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core: remoting and 5 other directories.
jboss-cvs-commits at lists.jboss.org
jboss-cvs-commits at lists.jboss.org
Thu Jul 23 06:37:34 EDT 2009
Author: timfox
Date: 2009-07-23 06:37:33 -0400 (Thu, 23 Jul 2009)
New Revision: 7603
Modified:
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
Log:
MT replication
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ClientSessionImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -780,8 +780,6 @@
// log.info("unfreezing");
- backupConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
remotingConnection = backupConnection;
int lid = channel.getLastConfirmedCommandID();
@@ -792,7 +790,7 @@
Packet request = new ReattachSessionMessage(name, lid);
- Channel channel1 = backupConnection.getChannel(1, -1, false, false);
+ Channel channel1 = backupConnection.getChannel(1);
ReattachSessionResponseMessage response = (ReattachSessionResponseMessage)channel1.sendBlocking(request);
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/client/impl/ConnectionManagerImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -24,6 +24,7 @@
import static org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl.EARLY_RESPONSE;
+import java.nio.channels.Channels;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
@@ -51,6 +52,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
@@ -299,7 +301,7 @@
"Unable to connect to server using configuration " + connectorConfig);
}
- channel1 = connection.getChannel(1, -1, false, false);
+ channel1 = connection.getChannel(1);
// Lock it - this must be done while the failoverLock is held
channel1.getLock().lock();
@@ -313,10 +315,7 @@
inCreateSession = true;
}
- long sessionChannelID = connection.generateChannelID();
-
- Packet request = new CreateSessionMessage(name,
- sessionChannelID,
+ Packet request = new CreateSessionMessage(name,
clientVersion.getIncrementingVersion(),
username,
password,
@@ -344,11 +343,13 @@
{
CreateSessionResponseMessage response = (CreateSessionResponseMessage)pResponse;
- Channel sessionChannel = connection.getChannel(sessionChannelID,
- producerWindowSize,
- producerWindowSize != -1,
- false);
-
+ Channel sessionChannel = new ChannelImpl(response.getSessionChannelID(), producerWindowSize,
+ producerWindowSize != -1, null);
+
+ sessionChannel.transferConnection(connection);
+
+ connection.putChannel(sessionChannel);
+
ClientSessionInternal session = new ClientSessionImpl(this,
name,
xa,
@@ -936,7 +937,6 @@
private RemotingConnection internalGetConnection(final int initialRefCount)
{
RemotingConnection conn;
-
if (connections.size() < maxConnections)
{
@@ -1039,11 +1039,11 @@
pingers.put(conn.getID(), pinger);
Ping ping = new Ping(clientFailureCheckPeriod, connectionTTL);
+
+ Channel pingChannel = conn.getChannel(0);
- Channel channel0 = conn.getChannel(0, -1, false, false);
+ pingChannel.send(ping);
- channel0.send(ping);
-
if (clientFailureCheckPeriod != -1)
{
Future<?> pingerFuture = scheduledThreadPool.scheduleAtFixedRate(pinger,
@@ -1134,7 +1134,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+ Channel channel1 = entry.connection.getChannel(1);
channel1.getLock().lock();
}
@@ -1144,7 +1144,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+ Channel channel1 = entry.connection.getChannel(1);
channel1.getLock().unlock();
}
@@ -1154,7 +1154,7 @@
{
for (ConnectionEntry entry : connections.values())
{
- Channel channel1 = entry.connection.getChannel(1, -1, false, false);
+ Channel channel1 = entry.connection.getChannel(1);
channel1.returnBlocking();
}
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/RemotingConnection.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -32,14 +32,12 @@
String getRemoteAddress();
- Channel getChannel(long channelID, int windowSize, boolean block, boolean async);
+ Channel getChannel(long channelID);
- void putChannel(long channelID, Channel channel);
+ void putChannel(Channel channel);
boolean removeChannel(long channelID);
- long generateChannelID();
-
void addFailureListener(FailureListener listener);
boolean removeFailureListener(FailureListener listener);
@@ -58,10 +56,6 @@
void destroy();
- void syncIDGeneratorSequence(long id);
-
- long getIDGeneratorSequence();
-
void activate();
void freeze();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/ChannelImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -99,14 +99,8 @@
private CommandConfirmationHandler commandConfirmationHandler;
- public ChannelImpl(final RemotingConnection connection,
- final long id,
- final int windowSize,
- final boolean block,
- final Executor executor)
+ public ChannelImpl(final long id, final int windowSize, final boolean block, final Executor executor)
{
- this.connection = connection;
-
this.id = id;
this.windowSize = windowSize;
@@ -383,7 +377,10 @@
public void transferConnection(final RemotingConnection newConnection)
{
- connection.removeChannel(id);
+ if (connection != null)
+ {
+ connection.removeChannel(id);
+ }
// if (replicatingChannel != null)
// {
@@ -397,11 +394,9 @@
// And switch it
- final RemotingConnectionImpl rnewConnection = (RemotingConnectionImpl)newConnection;
+ newConnection.putChannel(this);
- rnewConnection.putChannel(id, this);
-
- connection = rnewConnection;
+ connection = newConnection;
}
public void replayCommands(final int otherLastConfirmedCommandID)
@@ -523,7 +518,7 @@
private volatile boolean frozen;
private volatile Thread currentThread;
-
+
public void setFrozen(final boolean f)
{
if (f)
@@ -548,29 +543,29 @@
}
}
}
-
+
public Thread getExecutingThread()
{
return currentThread;
}
-
+
public void waitForAllExecutions()
{
if (executor != null)
{
Future f = new Future();
-
+
executor.execute(f);
-
+
boolean ok = f.await(5000);
-
+
if (!ok)
{
throw new IllegalStateException("Timedout out waiting for channel executions to complete");
}
}
}
-
+
/*
* Thread sat on A) (below)
* rc set frozen
@@ -593,7 +588,7 @@
private void doHandlePacket(final Packet packet)
{
- //A
+ // A
currentThread = Thread.currentThread();
try
@@ -665,10 +660,10 @@
}
}
}
-// else
-// {
-// log.info("It's frozen");
-// }
+ // else
+ // {
+ // log.info("It's frozen");
+ // }
}
finally
{
@@ -687,8 +682,9 @@
private void clearUpTo(final int lastConfirmedCommandID)
{
- //log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID + " first stored " + firstStoredCommandID);
-
+ // log.info(System.identityHashCode(this) + " clearupto " + lastConfirmedCommandID + " first stored " +
+ // firstStoredCommandID);
+
if (lastConfirmedCommandID < firstStoredCommandID)
{
// This can legitimately happen, if the flushConfirmations() is called from the other side which causes a
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/Pinger.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -71,7 +71,7 @@
this.connectionFailedAction = connectionFailedAction;
- this.channel0 = conn.getChannel(0, -1, false, false);
+ this.channel0 = conn.getChannel(0);
this.lastPingReceived = lastPingReceived;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/RemotingConnectionImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -33,7 +33,6 @@
import org.jboss.messaging.core.remoting.spi.MessagingBuffer;
import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.OrderedExecutorFactory;
-import org.jboss.messaging.utils.SimpleIDGenerator;
/**
* @author <a href="tim.fox at jboss.com">Tim Fox</a>
@@ -74,18 +73,13 @@
// 0 is for pinging
// 1 is for session creation and attachment
// 2 is for replication
- private volatile SimpleIDGenerator idGenerator = new SimpleIDGenerator(10);
- private boolean idGeneratorSynced = false;
-
// private volatile boolean frozen;
private final Object failLock = new Object();
private final PacketDecoder decoder = new PacketDecoder();
- private final ExecutorFactory orderedFactory;
-
// Constructors
// ---------------------------------------------------------------------------------
@@ -96,7 +90,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors)
{
- this(transportConnection, null, blockingCallTimeout, interceptors, true, true, null);
+ this(transportConnection, null, blockingCallTimeout, interceptors, true, true);
}
/*
@@ -105,11 +99,10 @@
public RemotingConnectionImpl(final Connection transportConnection,
final RemotingConnection replicatingConnection,
final List<Interceptor> interceptors,
- final boolean active,
- final Executor threadPool)
+ final boolean active)
{
- this(transportConnection, replicatingConnection, -1, interceptors, active, false, threadPool);
+ this(transportConnection, replicatingConnection, -1, interceptors, active, false);
}
private RemotingConnectionImpl(final Connection transportConnection,
@@ -117,8 +110,7 @@
final long blockingCallTimeout,
final List<Interceptor> interceptors,
final boolean active,
- final boolean client,
- final Executor threadPool)
+ final boolean client)
{
this.transportConnection = transportConnection;
@@ -132,15 +124,16 @@
this.active = active;
this.client = client;
+
+ Channel pingChannel = new ChannelImpl(0, -1, false, null);
+ pingChannel.transferConnection(this);
- if (threadPool != null)
- {
- this.orderedFactory = new OrderedExecutorFactory(threadPool);
- }
- else
- {
- this.orderedFactory = null;
- }
+ putChannel(pingChannel);
+
+ Channel channel1 = new ChannelImpl(1, -1, false, null);
+ channel1.transferConnection(this);
+
+ putChannel(channel1);
}
// RemotingConnection implementation
@@ -178,21 +171,9 @@
return transportConnection.getRemoteAddress();
}
- public synchronized Channel getChannel(final long channelID,
- final int windowSize,
- final boolean block,
- final boolean async)
+ public synchronized Channel getChannel(final long channelID)
{
- Channel channel = channels.get(channelID);
-
- if (channel == null)
- {
- channel = new ChannelImpl(this, channelID, windowSize, block, async ? this.orderedFactory.getExecutor() : null);
-
- channels.put(channelID, channel);
- }
-
- return channel;
+ return channels.get(channelID);
}
public synchronized boolean removeChannel(final long channelID)
@@ -200,9 +181,9 @@
return channels.remove(channelID) != null;
}
- public synchronized void putChannel(final long channelID, final Channel channel)
+ public synchronized void putChannel(final Channel channel)
{
- channels.put(channelID, channel);
+ channels.put(channel.getID(), channel);
}
public void addFailureListener(final FailureListener listener)
@@ -295,28 +276,8 @@
internalClose();
callClosingListeners();
- }
+ }
- public long generateChannelID()
- {
- return idGenerator.generateID();
- }
-
- public synchronized void syncIDGeneratorSequence(final long id)
- {
- if (!idGeneratorSynced)
- {
- idGenerator = new SimpleIDGenerator(id);
-
- idGeneratorSynced = true;
- }
- }
-
- public long getIDGeneratorSequence()
- {
- return idGenerator.getCurrentID();
- }
-
public boolean isActive()
{
return active;
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionMessage.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -39,8 +39,6 @@
private String name;
- private long sessionChannelID;
-
private int version;
private String username;
@@ -63,8 +61,7 @@
// Constructors --------------------------------------------------
- public CreateSessionMessage(final String name,
- final long sessionChannelID,
+ public CreateSessionMessage(final String name,
final int version,
final String username,
final String password,
@@ -79,8 +76,6 @@
this.name = name;
- this.sessionChannelID = sessionChannelID;
-
this.version = version;
this.username = username;
@@ -112,11 +107,6 @@
return name;
}
- public long getSessionChannelID()
- {
- return sessionChannelID;
- }
-
public int getVersion()
{
return version;
@@ -186,7 +176,6 @@
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeString(name);
- buffer.writeLong(sessionChannelID);
buffer.writeInt(version);
buffer.writeNullableString(username);
buffer.writeNullableString(password);
@@ -202,7 +191,6 @@
public void decodeBody(final MessagingBuffer buffer)
{
name = buffer.readString();
- sessionChannelID = buffer.readLong();
version = buffer.readInt();
username = buffer.readNullableString();
password = buffer.readNullableString();
@@ -224,8 +212,7 @@
CreateSessionMessage r = (CreateSessionMessage)other;
- boolean matches = super.equals(other) && name.equals(r.name) &&
- sessionChannelID == r.sessionChannelID &&
+ boolean matches = super.equals(other) && name.equals(r.name) &&
version == r.version &&
xa == r.xa &&
autoCommitSends == r.autoCommitSends &&
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/impl/wireformat/CreateSessionResponseMessage.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -38,16 +38,20 @@
// Attributes ----------------------------------------------------
private int serverVersion;
+
+ private long sessionChannelID;
// Static --------------------------------------------------------
// Constructors --------------------------------------------------
- public CreateSessionResponseMessage(final int serverVersion)
+ public CreateSessionResponseMessage(final int serverVersion, final long sessionChannelID)
{
super(CREATESESSION_RESP);
this.serverVersion = serverVersion;
+
+ this.sessionChannelID = sessionChannelID;
}
public CreateSessionResponseMessage()
@@ -67,22 +71,29 @@
{
return serverVersion;
}
+
+ public long getSessionChannelID()
+ {
+ return sessionChannelID;
+ }
@Override
public void encodeBody(final MessagingBuffer buffer)
{
buffer.writeInt(serverVersion);
+ buffer.writeLong(sessionChannelID);
}
@Override
public void decodeBody(final MessagingBuffer buffer)
{
serverVersion = buffer.readInt();
+ sessionChannelID = buffer.readLong();
}
public int getRequiredBufferSize()
{
- return BASIC_PACKET_SIZE + DataConstants.SIZE_INT;
+ return BASIC_PACKET_SIZE + DataConstants.SIZE_INT + DataConstants.SIZE_LONG;
}
@Override
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/remoting/server/impl/RemotingServiceImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -24,7 +24,6 @@
import java.util.concurrent.Callable;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
@@ -41,6 +40,7 @@
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
import org.jboss.messaging.core.remoting.impl.AbstractBufferHandler;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.Pinger;
import org.jboss.messaging.core.remoting.impl.RemotingConnectionImpl;
import org.jboss.messaging.core.remoting.impl.invm.InVMAcceptorFactory;
@@ -231,7 +231,7 @@
for (RemotingConnection connection : connections.values())
{
- connection.getChannel(0, -1, false, false).sendAndFlush(new PacketImpl(DISCONNECT));
+ connection.getChannel(0).sendAndFlush(new PacketImpl(DISCONNECT));
}
for (Acceptor acceptor : acceptors)
@@ -313,16 +313,13 @@
RemotingConnection rc = new RemotingConnectionImpl(connection,
replicatingConnection,
interceptors,
- !config.isBackup(),
- threadPool);
-
- Channel channel1 = rc.getChannel(1, -1, false, false);
-
+ !config.isBackup());
+
final Replicator replicator;
if (replicatingConnection != null)
{
- Channel replicatingChannel = replicatingConnection.getChannel(1, -1, false, false);
+ Channel replicatingChannel = replicatingConnection.getChannel(1);
replicator = new ReplicatorImpl("mess server", replicatingChannel);
@@ -345,6 +342,8 @@
{
replicator = null;
}
+
+ Channel channel1 = rc.getChannel(1);
ChannelHandler handler = new MessagingServerPacketHandler(server, channel1, rc, replicator);
@@ -488,7 +487,7 @@
{
this.conn = conn;
- conn.getChannel(0, -1, false, false).setHandler(this);
+ conn.getChannel(0).setHandler(this);
}
public synchronized void handlePacket(final Packet packet)
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/MessagingServer.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -34,6 +34,7 @@
import org.jboss.messaging.core.settings.impl.AddressSettings;
import org.jboss.messaging.core.transaction.ResourceManager;
import org.jboss.messaging.core.version.Version;
+import org.jboss.messaging.utils.ExecutorFactory;
import org.jboss.messaging.utils.SimpleString;
import org.jboss.messaging.utils.UUID;
@@ -69,8 +70,7 @@
ReattachSessionResponseMessage reattachSession(RemotingConnection connection, String name, int lastConfirmedCommandID) throws Exception;
- CreateSessionResponseMessage createSession(String name,
- long channelID,
+ CreateSessionResponseMessage createSession(String name,
String username,
String password,
int minLargeMessageSize,
@@ -96,6 +96,8 @@
HierarchicalRepository<AddressSettings> getAddressSettingsRepository();
ExecutorService getThreadPool();
+
+ ExecutorFactory getExecutorFactory();
int getConnectionCount();
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -76,6 +76,7 @@
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionResponseMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.ReattachSessionResponseMessage;
@@ -501,6 +502,11 @@
{
return this.threadPool;
}
+
+ public ExecutorFactory getExecutorFactory()
+ {
+ return this.executorFactory;
+ }
public ReattachSessionResponseMessage reattachSession(final RemotingConnection connection,
final String name,
@@ -527,7 +533,6 @@
}
public CreateSessionResponseMessage createSession(final String name,
- final long channelID,
final String username,
final String password,
final int minLargeMessageSize,
@@ -587,8 +592,23 @@
currentSession.getChannel().close();
}
- Channel channel = connection.getChannel(channelID, sendWindowSize, false, configuration.isBackup());
+ long channelID;
+ do
+ {
+ channelID = storageManager.generateUniqueID();
+ }
+ while (channelID < 2);
+
+ Channel channel = new ChannelImpl(channelID,
+ sendWindowSize,
+ false,
+ configuration.isBackup() ? this.executorFactory.getExecutor() : null);
+
+ channel.transferConnection(connection);
+
+ connection.putChannel(channel);
+
RemotingConnection replicatingConnection = connection.getReplicatingConnection();
final Replicator replicator;
@@ -597,7 +617,11 @@
if (replicatingConnection != null)
{
- replicatingChannel = replicatingConnection.getChannel(channelID, -1, false, false);
+ replicatingChannel = new ChannelImpl(channelID, -1, false, null);
+
+ replicatingChannel.transferConnection(replicatingConnection);
+
+ replicatingConnection.putChannel(replicatingChannel);
replicator = new ReplicatorImpl("session " + channelID, replicatingChannel);
@@ -653,7 +677,7 @@
channel.setHandler(handler);
- return new CreateSessionResponseMessage(version.getIncrementingVersion());
+ return new CreateSessionResponseMessage(version.getIncrementingVersion(), channelID);
}
public void removeSession(final String name) throws Exception
@@ -844,7 +868,7 @@
if (replicator != null)
{
- Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1, -1, false, false);
+ Channel channel1 = replicator.getReplicatingChannel().getConnection().getChannel(1);
channel1.send(new UnregisterQueueReplicationChannelMessage(queue.getID()));
}
@@ -945,7 +969,7 @@
Packet packet = new ReplicateStartupInfoMessage(uuid, storageManager.getCurrentUniqueID());
- Channel channel1 = conn.getChannel(1, -1, false, false);
+ Channel channel1 = conn.getChannel(1);
ChannelHandler prevHandler = channel1.getHandler();
@@ -1030,8 +1054,7 @@
{
if (configuration.isBackup())
{
- log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed."
- );
+ log.info("A connection has been made to the backup server so it will be activated! This will result in the live server being considered failed.");
synchronized (this)
{
@@ -1100,21 +1123,21 @@
for (RemotingConnection rc : backupConnections)
{
rc.freeze();
-
+
rc.setFrozenAllChannels(true);
}
-
+
for (RemotingConnection rc : backupConnections)
{
while (true)
{
Thread thr = rc.getExecutingThread();
-
+
if (thr == null)
{
break;
}
-
+
try
{
Thread.sleep(1);
@@ -1138,13 +1161,13 @@
{
while (true)
{
- // log.info("in loop");
+ // log.info("in loop");
Set<Thread> executingThreads = rc.getExecutingThreads();
boolean exit = true;
-
- // log.info("executing threads " + executingThreads.isEmpty());
-
+
+ // log.info("executing threads " + executingThreads.isEmpty());
+
for (Thread executingThread : executingThreads)
{
if (executingThread instanceof JBMThread)
@@ -1153,16 +1176,16 @@
if (jthread.isWaitingOnMutex())
{
- //log.info("Thread " + jthread + " is waiting on mutex");
-
- jthread.setNoReplayOrRecord(0);
+ // log.info("Thread " + jthread + " is waiting on mutex");
+
+ jthread.setNoReplayOrRecord(0);
}
else if (jthread.isWaitingOnSequencedLock())
{
jthread.setFrozen();
executingThread.interrupt();
-
+
exit = false;
}
else
@@ -1175,7 +1198,7 @@
exit = false;
}
}
-
+
if (exit)
{
break;
@@ -1196,22 +1219,21 @@
}
}
-
- // log.info("all on latch");
+ // log.info("all on latch");
+
// Now we release the latch and wait for all threads to exit
ReplicationAwareMutex.setOwnerLatchAll();
start = System.currentTimeMillis();
-
// Wait for everything to exit
for (RemotingConnection rc : backupConnections)
{
while (true)
{
- // log.info("in loop2");
+ // log.info("in loop2");
Set<Thread> executingThreads = rc.getExecutingThreads();
if (executingThreads.isEmpty())
@@ -1238,27 +1260,25 @@
}
}
}
-
- // log.info("all exited");
-
-
-
- //FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
- //need to wait for all executions on the channel to complete too
+
+ // log.info("all exited");
+
+ // FIXME this is not sufficient - since there may still be queued executions waiting from before the freeze
+ // need to wait for all executions on the channel to complete too
for (RemotingConnection rc : backupConnections)
{
rc.setFrozenAllChannels(false);
}
-
- //Now we need to wait for all executions to finish on the channel - they may be queued
-
+
+ // Now we need to wait for all executions to finish on the channel - they may be queued
+
for (RemotingConnection rc : backupConnections)
{
rc.waitForAllExecutions();
}
ReplicationAwareMutex.clearLatchAll();
-
+
log.info("freeze complete");
}
}
@@ -1538,7 +1558,7 @@
if (replicatingConnection != null)
{
- Channel channel1 = replicatingConnection.getChannel(1, -1, false, false);
+ Channel channel1 = replicatingConnection.getChannel(1);
JBMThread thread = JBMThread.currentThread();
@@ -1547,9 +1567,13 @@
channel1.send(new RegisterQueueReplicationChannelMessage(queueID));
thread.resumeRecording();
+
+ Channel replChannel = new ChannelImpl(queueID, -1, false, null);
+
+ replChannel.transferConnection(replicatingConnection);
+
+ replicatingConnection.putChannel(replChannel);
- Channel replChannel = replicatingConnection.getChannel(queueID, -1, false, false);
-
replicator = new ReplicatorImpl("queue " + queueID, replChannel);
replChannel.setHandler(new ChannelHandler()
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/MessagingServerPacketHandler.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -28,6 +28,7 @@
import org.jboss.messaging.core.remoting.ChannelHandler;
import org.jboss.messaging.core.remoting.Packet;
import org.jboss.messaging.core.remoting.RemotingConnection;
+import org.jboss.messaging.core.remoting.impl.ChannelImpl;
import org.jboss.messaging.core.remoting.impl.wireformat.CreateSessionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.MessagingExceptionMessage;
import org.jboss.messaging.core.remoting.impl.wireformat.PacketImpl;
@@ -120,7 +121,11 @@
{
RegisterQueueReplicationChannelMessage msg = (RegisterQueueReplicationChannelMessage)packet;
- Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
+ Channel channel = new ChannelImpl(msg.getBindingID(), -1, false, server.getExecutorFactory().getExecutor());
+
+ channel.transferConnection(connection);
+
+ connection.putChannel(channel);
if (server.registerBackupConnection(channel.getConnection()))
{
@@ -133,7 +138,7 @@
{
UnregisterQueueReplicationChannelMessage msg = (UnregisterQueueReplicationChannelMessage)packet;
- Channel channel = connection.getChannel(msg.getBindingID(), -1, false, true);
+ Channel channel = connection.getChannel(msg.getBindingID());
channel.setHandler(null);
@@ -214,8 +219,7 @@
Packet response;
try
{
- response = server.createSession(request.getName(),
- request.getSessionChannelID(),
+ response = server.createSession(request.getName(),
request.getUsername(),
request.getPassword(),
request.getMinLargeMessageSize(),
Modified: branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java
===================================================================
--- branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-22 13:46:38 UTC (rev 7602)
+++ branches/Branch_MultiThreaded_Replication/src/main/org/jboss/messaging/core/server/impl/ServerSessionImpl.java 2009-07-23 10:37:33 UTC (rev 7603)
@@ -1571,8 +1571,6 @@
channel.transferConnection(newConnection);
- newConnection.syncIDGeneratorSequence(remotingConnection.getIDGeneratorSequence());
-
remotingConnection = newConnection;
remotingConnection.addFailureListener(this);
More information about the jboss-cvs-commits
mailing list